You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/02/23 12:51:07 UTC

[hudi] branch master updated: [HUDI-1477] Support copyOnWriteTable in java client (#2382)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2efd076  [HUDI-1477] Support copyOnWriteTable in java client (#2382)
2efd076 is described below

commit 2efd0760acb5de06adc0f0a4a045df792ff3b6ab
Author: Shen Hong <sh...@126.com>
AuthorDate: Tue Feb 23 20:50:55 2021 +0800

    [HUDI-1477] Support copyOnWriteTable in java client (#2382)
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  |   1 +
 hudi-client/hudi-java-client/pom.xml               |  13 +
 .../JavaBulkInsertInternalPartitionerFactory.java  |  23 +-
 .../bulkinsert/JavaGlobalSortPartitioner.java      |  67 +++
 .../bulkinsert/JavaNonSortPartitioner.java         |  46 ++
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |  28 +-
 .../commit/BaseJavaCommitActionExecutor.java       |  13 +-
 .../commit/JavaBulkInsertCommitActionExecutor.java |  67 +++
 .../table/action/commit/JavaBulkInsertHelper.java  | 113 +++++
 .../JavaBulkInsertPreppedCommitActionExecutor.java |  63 +++
 .../JavaInsertOverwriteCommitActionExecutor.java   |  79 ++++
 ...vaInsertOverwriteTableCommitActionExecutor.java |  63 +++
 .../JavaCopyOnWriteRestoreActionExecutor.java      |  66 +++
 .../JavaCopyOnWriteRollbackActionExecutor.java     |  72 ++++
 .../rollback/JavaListingBasedRollbackHelper.java   | 237 ++++++++++
 .../rollback/JavaMarkerBasedRollbackStrategy.java  |  76 ++++
 .../commit/TestJavaCopyOnWriteActionExecutor.java  | 479 +++++++++++++++++++++
 .../src/test/resources/testDataGeneratorSchema.txt | 128 ++++++
 .../org/apache/hudi/common/engine/EngineType.java  |   2 +-
 19 files changed, 1622 insertions(+), 14 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 2fbd71d..41b0a10 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -301,6 +301,7 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
         case SPARK:
           return HoodieIndex.IndexType.BLOOM.name();
         case FLINK:
+        case JAVA:
           return HoodieIndex.IndexType.INMEMORY.name();
         default:
           throw new HoodieNotSupportedException("Unsupported engine " + engineType);
diff --git a/hudi-client/hudi-java-client/pom.xml b/hudi-client/hudi-java-client/pom.xml
index 0ef741f..8a020c0 100644
--- a/hudi-client/hudi-java-client/pom.xml
+++ b/hudi-client/hudi-java-client/pom.xml
@@ -66,6 +66,19 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>${hive.groupid}</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>${hive.version}</version>
+            <scope>test</scope>
+            <classifier>${hive.exec.classifier}</classifier>
+        </dependency>
+        <dependency>
+            <groupId>${hive.groupid}</groupId>
+            <artifactId>hive-metastore</artifactId>
+            <version>${hive.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <!-- Test -->
         <dependency>
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaBulkInsertInternalPartitionerFactory.java
similarity index 51%
copy from hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java
copy to hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaBulkInsertInternalPartitionerFactory.java
index 5834fa9..62523d3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaBulkInsertInternalPartitionerFactory.java
@@ -16,11 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.common.engine;
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.BulkInsertPartitioner;
 
 /**
- * Hoodie data processing engine. support only Apache Spark and Apache Flink for now.
+ * A factory to generate built-in partitioner to repartition input records into at least
+ * expected number of output spark partitions for bulk insert operation.
  */
-public enum EngineType {
-  SPARK, FLINK
+public abstract class JavaBulkInsertInternalPartitionerFactory {
+
+  public static BulkInsertPartitioner get(BulkInsertSortMode sortMode) {
+    switch (sortMode) {
+      case NONE:
+        return new JavaNonSortPartitioner();
+      case GLOBAL_SORT:
+        return new JavaGlobalSortPartitioner();
+      default:
+        throw new HoodieException("The bulk insert sort mode \"" + sortMode.name()
+            + "\" is not supported in java client.");
+    }
+  }
 }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
new file mode 100644
index 0000000..fded0ff
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * A built-in partitioner that does global sorting for the input records across partitions
+ * after repartition for bulk insert operation, corresponding to the
+ * {@code BulkInsertSortMode.GLOBAL_SORT} mode.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public class JavaGlobalSortPartitioner<T extends HoodieRecordPayload>
+    implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
+
+  @Override
+  public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> records,
+                                                  int outputSparkPartitions) {
+    // Now, sort the records and line them up nicely for loading.
+    records.sort(new Comparator() {
+      @Override
+      public int compare(Object o1, Object o2) {
+        HoodieRecord o11 = (HoodieRecord) o1;
+        HoodieRecord o22 = (HoodieRecord) o2;
+        String left = new StringBuilder()
+            .append(o11.getPartitionPath())
+            .append("+")
+            .append(o11.getRecordKey())
+            .toString();
+        String right = new StringBuilder()
+            .append(o22.getPartitionPath())
+            .append("+")
+            .append(o22.getRecordKey())
+            .toString();
+        return left.compareTo(right);
+      }
+    });
+    return records;
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+    return true;
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java
new file mode 100644
index 0000000..b40459d
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import java.util.List;
+
+/**
+ * A built-in partitioner that only does coalesce for input records for bulk insert operation,
+ * corresponding to the {@code BulkInsertSortMode.NONE} mode.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public class JavaNonSortPartitioner<T extends HoodieRecordPayload>
+    implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
+
+  @Override
+  public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> records,
+                                                  int outputPartitions) {
+    return records;
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+    return false;
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 9895df3..157e11a 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -39,10 +39,17 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import org.apache.hudi.table.action.clean.JavaCleanActionExecutor;
 import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
+import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor;
+import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
+import org.apache.hudi.table.action.commit.JavaInsertOverwriteCommitActionExecutor;
+import org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor;
+import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 
 import java.util.List;
 import java.util.Map;
@@ -75,7 +82,8 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
                                                            String instantTime,
                                                            List<HoodieRecord<T>> records,
                                                            Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
-    throw new HoodieNotSupportedException("BulkInsert is not supported yet");
+    return new JavaBulkInsertCommitActionExecutor((HoodieJavaEngineContext) context, config,
+        this, instantTime, records, bulkInsertPartitioner).execute();
   }
 
   @Override
@@ -112,21 +120,24 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
                                                                   String instantTime,
                                                                   List<HoodieRecord<T>> preppedRecords,
                                                                   Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
-    throw new HoodieNotSupportedException("BulkInsertPrepped is not supported yet");
+    return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config,
+        this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
   }
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(HoodieEngineContext context,
                                                                 String instantTime,
                                                                 List<HoodieRecord<T>> records) {
-    throw new HoodieNotSupportedException("InsertOverwrite is not supported yet");
+    return new JavaInsertOverwriteCommitActionExecutor(
+        context, config, this, instantTime, records).execute();
   }
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(HoodieEngineContext context,
                                                                      String instantTime,
                                                                      List<HoodieRecord<T>> records) {
-    throw new HoodieNotSupportedException("InsertOverwrite is not supported yet");
+    return new JavaInsertOverwriteTableCommitActionExecutor(
+        context, config, this, instantTime, records).execute();
   }
 
   @Override
@@ -175,7 +186,8 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
                                          String rollbackInstantTime,
                                          HoodieInstant commitInstant,
                                          boolean deleteInstants) {
-    throw new HoodieNotSupportedException("Rollback is not supported yet");
+    return new JavaCopyOnWriteRollbackActionExecutor(
+        context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
   }
 
   @Override
@@ -183,13 +195,15 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
                                            String instantToSavepoint,
                                            String user,
                                            String comment) {
-    throw new HoodieNotSupportedException("Savepoint is not supported yet");
+    return new SavepointActionExecutor(
+        context, config, this, instantToSavepoint, user, comment).execute();
   }
 
   @Override
   public HoodieRestoreMetadata restore(HoodieEngineContext context,
                                        String restoreInstantTime,
                                        String instantToRestore) {
-    throw new HoodieNotSupportedException("Restore is not supported yet");
+    return new JavaCopyOnWriteRestoreActionExecutor((HoodieJavaEngineContext) context,
+        config, this, restoreInstantTime, instantToRestore).execute();
   }
 }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index 3e0b80c..a4a6a4f 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -121,6 +121,7 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
       }
     });
     updateIndex(writeStatuses, result);
+    updateIndexAndCommitIfNeeded(writeStatuses, result);
     return result;
   }
 
@@ -297,8 +298,7 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
   }
 
   @Override
-  public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
-      throws Exception {
+  public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
     // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
     if (!recordItr.hasNext()) {
       LOG.info("Empty partition");
@@ -325,4 +325,13 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
     return getUpsertPartitioner(profile);
   }
 
+  public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
+    Instant indexStartTime = Instant.now();
+    // Update the index back
+    List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
+    result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
+    result.setWriteStatuses(statuses);
+    result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses));
+    commitOnAutoCommit(result);
+  }
 }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java
new file mode 100644
index 0000000..9780262
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+import java.util.Map;
+
+public class JavaBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
+
+  private final List<HoodieRecord<T>> inputRecords;
+  private final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner;
+
+  public JavaBulkInsertCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table,
+                                            String instantTime, List<HoodieRecord<T>> inputRecords,
+                                            Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+    this(context, config, table, instantTime, inputRecords, bulkInsertPartitioner, Option.empty());
+  }
+
+  public JavaBulkInsertCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table,
+                                            String instantTime, List<HoodieRecord<T>> inputRecords,
+                                            Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner,
+                                            Option<Map<String, String>> extraMetadata) {
+    super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
+    this.inputRecords = inputRecords;
+    this.bulkInsertPartitioner = bulkInsertPartitioner;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    try {
+      return JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, table, config,
+          this, true, bulkInsertPartitioner);
+    } catch (HoodieInsertException ie) {
+      throw ie;
+    } catch (Throwable e) {
+      throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
+    }
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
new file mode 100644
index 0000000..cce8ad1
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.JavaLazyInsertIterable;
+import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
+import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A java implementation of {@link AbstractBulkInsertHelper}.
+ *
+ * @param <T>
+ */
+@SuppressWarnings("checkstyle:LineLength")
+public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends AbstractBulkInsertHelper<T, List<HoodieRecord<T>>,
+    List<HoodieKey>, List<WriteStatus>, R> {
+
+  private JavaBulkInsertHelper() {
+  }
+
+  private static class BulkInsertHelperHolder {
+    private static final JavaBulkInsertHelper JAVA_BULK_INSERT_HELPER = new JavaBulkInsertHelper();
+  }
+
+  public static JavaBulkInsertHelper newInstance() {
+    return BulkInsertHelperHolder.JAVA_BULK_INSERT_HELPER;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(final List<HoodieRecord<T>> inputRecords,
+                                                           final String instantTime,
+                                                           final HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+                                                           final HoodieWriteConfig config,
+                                                           final BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor,
+                                                           final boolean performDedupe,
+                                                           final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
+    HoodieWriteMetadata result = new HoodieWriteMetadata();
+
+    //transition bulk_insert state to inflight
+    table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
+            table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
+        config.shouldAllowMultiWriteOnSameInstant());
+    // write new files
+    List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism());
+    //update index
+    ((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
+    return result;
+  }
+
+  @Override
+  public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,
+                                      String instantTime,
+                                      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+                                      HoodieWriteConfig config,
+                                      boolean performDedupe,
+                                      Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
+                                      boolean useWriterSchema,
+                                      int parallelism) {
+
+    // De-dupe/merge if needed
+    List<HoodieRecord<T>> dedupedRecords = inputRecords;
+
+    if (performDedupe) {
+      dedupedRecords = (List<HoodieRecord<T>>) JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
+          parallelism, table);
+    }
+
+    final List<HoodieRecord<T>> repartitionedRecords;
+    BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
+        ? userDefinedBulkInsertPartitioner.get()
+        : JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
+    repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
+
+    String idPfx = FSUtils.createNewFileIdPfx();
+
+    List<WriteStatus> writeStatuses = new ArrayList<>();
+
+    new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true, config, instantTime, table, idPfx,
+        table.getTaskContextSupplier(), new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
+
+    return writeStatuses;
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
new file mode 100644
index 0000000..37b56b6
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaBulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends BaseJavaCommitActionExecutor<T> {
+
+  private final List<HoodieRecord<T>> preppedInputRecord;
+  private final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner;
+
+  public JavaBulkInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
+                                                   HoodieWriteConfig config, HoodieTable table,
+                                                   String instantTime, List<HoodieRecord<T>> preppedInputRecord,
+                                                   Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
+    super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
+    this.preppedInputRecord = preppedInputRecord;
+    this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    try {
+      return JavaBulkInsertHelper.newInstance().bulkInsert(preppedInputRecord, instantTime, table, config,
+          this, false, userDefinedBulkInsertPartitioner);
+    } catch (Throwable e) {
+      if (e instanceof HoodieInsertException) {
+        throw e;
+      }
+      throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java
new file mode 100644
index 0000000..519cb76
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class JavaInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends BaseJavaCommitActionExecutor<T> {
+
+  private final List<HoodieRecord<T>> inputRecords;
+
+  public JavaInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
+                                                 HoodieWriteConfig config, HoodieTable table,
+                                                 String instantTime, List<HoodieRecord<T>> inputRecords) {
+    this(context, config, table, instantTime, inputRecords, WriteOperationType.INSERT_OVERWRITE);
+  }
+
+  public JavaInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
+                                                  HoodieWriteConfig config, HoodieTable table,
+                                                  String instantTime, List<HoodieRecord<T>> inputRecords,
+                                                  WriteOperationType writeOperationType) {
+    super(context, config, table, instantTime, writeOperationType);
+    this.inputRecords = inputRecords;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
+        config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
+    return context.mapToPair(
+        writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toList()),
+        partitionPath ->
+            Pair.of(partitionPath, getAllExistingFileIds(partitionPath)), 1
+    );
+  }
+
+  private List<String> getAllExistingFileIds(String partitionPath) {
+    // because new commit is not complete. it is safe to mark all existing file Ids as old files
+    return table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java
new file mode 100644
index 0000000..ca6885c
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class JavaInsertOverwriteTableCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends JavaInsertOverwriteCommitActionExecutor<T> {
+
+  public JavaInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context,
+                                                      HoodieWriteConfig config, HoodieTable table,
+                                                      String instantTime, List<HoodieRecord<T>> inputRecords) {
+    super(context, config, table, instantTime, inputRecords, WriteOperationType.INSERT_OVERWRITE_TABLE);
+  }
+
+  protected List<String> getAllExistingFileIds(String partitionPath) {
+    return table.getSliceView().getLatestFileSlices(partitionPath)
+        .map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
+  }
+
+  @Override
+  protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
+    Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
+        table.getMetaClient().getBasePath(), config.useFileListingMetadata(),
+        config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
+
+    if (partitionPaths != null && partitionPaths.size() > 0) {
+      partitionToExistingFileIds = context.mapToPair(partitionPaths,
+          partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)), 1);
+    }
+    return partitionToExistingFileIds;
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
new file mode 100644
index 0000000..75c1e0e
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.restore;
+
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor;
+
+import java.util.List;
+
+public class JavaCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload> extends
+    BaseRestoreActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+
+  public JavaCopyOnWriteRestoreActionExecutor(HoodieJavaEngineContext context,
+                                              HoodieWriteConfig config,
+                                              HoodieTable table,
+                                              String instantTime,
+                                              String restoreInstantTime) {
+    super(context, config, table, instantTime, restoreInstantTime);
+  }
+
+  @Override
+  protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
+    table.getMetaClient().reloadActiveTimeline();
+    JavaCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new JavaCopyOnWriteRollbackActionExecutor(
+        context,
+        config,
+        table,
+        HoodieActiveTimeline.createNewInstantTime(),
+        instantToRollback,
+        true,
+        true,
+        false);
+    if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
+        && !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback);
+    }
+    return rollbackActionExecutor.execute();
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java
new file mode 100644
index 0000000..15e3932
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+@SuppressWarnings("checkstyle:LineLength")
+public class JavaCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
+    BaseCopyOnWriteRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+  public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
+                                               HoodieWriteConfig config,
+                                               HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+                                               String instantTime,
+                                               HoodieInstant commitInstant,
+                                               boolean deleteInstants) {
+    super(context, config, table, instantTime, commitInstant, deleteInstants);
+  }
+
+  public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
+                                               HoodieWriteConfig config,
+                                               HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+                                               String instantTime,
+                                               HoodieInstant commitInstant,
+                                               boolean deleteInstants,
+                                               boolean skipTimelinePublish,
+                                               boolean useMarkerBasedStrategy) {
+    super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
+  }
+
+  @Override
+  protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
+    if (useMarkerBasedStrategy) {
+      return new JavaMarkerBasedRollbackStrategy(table, context, config, instantTime);
+    } else {
+      return this::executeRollbackUsingFileListing;
+    }
+  }
+
+  @Override
+  protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
+    List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
+        context, table.getMetaClient().getBasePath(), config);
+    return new JavaListingBasedRollbackHelper(table.getMetaClient(), config)
+        .performRollback(context, instantToRollback, rollbackRequests);
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java
new file mode 100644
index 0000000..5331ca5
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Performs Rollback of Hoodie Tables.
+ */
+public class JavaListingBasedRollbackHelper implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(JavaListingBasedRollbackHelper.class);
+
+  private final HoodieTableMetaClient metaClient;
+  private final HoodieWriteConfig config;
+
+  public JavaListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+    this.metaClient = metaClient;
+    this.config = config;
+  }
+
+  /**
+   * Performs all rollback actions that we have collected in parallel.
+   */
+  public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
+    Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
+
+    Map<String, List<Pair<String, HoodieRollbackStat>>> collect = partitionPathRollbackStatsPairs.entrySet()
+        .stream()
+        .map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
+    return collect.values().stream()
+        .map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Collect all file info that needs to be rollbacked.
+   */
+  public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
+    Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
+    return new ArrayList<>(partitionPathRollbackStatsPairs.values());
+  }
+
+  /**
+   * May be delete interested files and collect stats or collect stats only.
+   *
+   * @param context           instance of {@link HoodieEngineContext} to use.
+   * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
+   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to be operated on.
+   * @param doDelete          {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
+   * @return stats collected with or w/o actual deletions.
+   */
+  Map<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context,
+                                                             HoodieInstant instantToRollback,
+                                                             List<ListingBasedRollbackRequest> rollbackRequests,
+                                                             boolean doDelete) {
+    return context.mapToPair(rollbackRequests, rollbackRequest -> {
+      switch (rollbackRequest.getType()) {
+        case DELETE_DATA_FILES_ONLY: {
+          final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
+              rollbackRequest.getPartitionPath(), doDelete);
+          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  .withDeletedFileResults(filesToDeletedStatus).build());
+        }
+        case DELETE_DATA_AND_LOG_FILES: {
+          final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
+          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  .withDeletedFileResults(filesToDeletedStatus).build());
+        }
+        case APPEND_ROLLBACK_BLOCK: {
+          HoodieLogFormat.Writer writer = null;
+          try {
+            writer = HoodieLogFormat.newWriterBuilder()
+                .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
+                .withFileId(rollbackRequest.getFileId().get())
+                .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
+                .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+
+            // generate metadata
+            if (doDelete) {
+              Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
+              // if update belongs to an existing log file
+              writer.appendBlock(new HoodieCommandBlock(header));
+            }
+          } catch (IOException | InterruptedException io) {
+            throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
+          } finally {
+            try {
+              if (writer != null) {
+                writer.close();
+              }
+            } catch (IOException io) {
+              throw new HoodieIOException("Error appending rollback block..", io);
+            }
+          }
+
+          // This step is intentionally done after writer is closed. Guarantees that
+          // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
+          // cloud-storage : HUDI-168
+          Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
+              metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L
+          );
+          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
+        }
+        default:
+          throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
+      }
+    }, 0);
+  }
+
+  /**
+   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
+   */
+  private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+                                                         String commit, String partitionPath, boolean doDelete) throws IOException {
+    LOG.info("Cleaning path " + partitionPath);
+    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    SerializablePathFilter filter = (path) -> {
+      if (path.toString().endsWith(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      } else if (FSUtils.isLogFile(path)) {
+        // Since the baseCommitTime is the only commit for new log files, it's okay here
+        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+
+    final Map<FileStatus, Boolean> results = new HashMap<>();
+    FileSystem fs = metaClient.getFs();
+    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+    for (FileStatus file : toBeDeleted) {
+      if (doDelete) {
+        boolean success = fs.delete(file.getPath(), false);
+        results.put(file, success);
+        LOG.info("Delete file " + file.getPath() + "\t" + success);
+      } else {
+        results.put(file, true);
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
+   */
+  private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+                                                   String commit, String partitionPath, boolean doDelete) throws IOException {
+    final Map<FileStatus, Boolean> results = new HashMap<>();
+    LOG.info("Cleaning path " + partitionPath);
+    FileSystem fs = metaClient.getFs();
+    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+    for (FileStatus file : toBeDeleted) {
+      if (doDelete) {
+        boolean success = fs.delete(file.getPath(), false);
+        results.put(file, success);
+        LOG.info("Delete file " + file.getPath() + "\t" + success);
+      } else {
+        results.put(file, true);
+      }
+    }
+    return results;
+  }
+
+  private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
+    // generate metadata
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(3);
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
+    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+    return header;
+  }
+
+  public interface SerializablePathFilter extends PathFilter, Serializable {
+
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java
new file mode 100644
index 0000000..f1e2bf3
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("checkstyle:LineLength")
+public class JavaMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+  public JavaMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+                                         HoodieEngineContext context,
+                                         HoodieWriteConfig config,
+                                         String instantTime) {
+    super(table, context, config, instantTime);
+  }
+
+  @Override
+  public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
+    try {
+      MarkerFiles markerFiles = new MarkerFiles(table, instantToRollback.getTimestamp());
+      List<HoodieRollbackStat> rollbackStats = context.map(markerFiles.allMarkerFilePaths(), markerFilePath -> {
+        String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
+        IOType type = IOType.valueOf(typeStr);
+        switch (type) {
+          case MERGE:
+            return undoMerge(MarkerFiles.stripMarkerSuffix(markerFilePath));
+          case APPEND:
+            return undoAppend(MarkerFiles.stripMarkerSuffix(markerFilePath), instantToRollback);
+          case CREATE:
+            return undoCreate(MarkerFiles.stripMarkerSuffix(markerFilePath));
+          default:
+            throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
+        }
+      }, 0);
+
+      return rollbackStats.stream().map(rollbackStat -> Pair.of(rollbackStat.getPartitionPath(), rollbackStat))
+          .collect(Collectors.groupingBy(Pair::getKey))
+          .values()
+          .stream()
+          .map(x -> x.stream().map(y -> y.getValue()).reduce(RollbackUtils::mergeRollbackStat).get())
+          .collect(Collectors.toList());
+    } catch (Exception e) {
+      throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
+    }
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
new file mode 100644
index 0000000..17b1742
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.testutils.Transformations;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hudi.testutils.HoodieJavaClientTestBase;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase {
+
+  private static final Logger LOG = LogManager.getLogger(TestJavaCopyOnWriteActionExecutor.class);
+  private static final Schema SCHEMA = getSchemaFromResource(TestJavaCopyOnWriteActionExecutor.class, "/exampleSchema.avsc");
+
+  @Test
+  public void testMakeNewPath() {
+    String fileName = UUID.randomUUID().toString();
+    String partitionPath = "2016/05/04";
+
+    String instantTime = makeNewCommitTime();
+    HoodieWriteConfig config = makeHoodieClientConfig();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieJavaTable.create(config, context, metaClient);
+
+    Pair<Path, String> newPathWithWriteToken = Arrays.asList(1).stream().map(x -> {
+      HoodieRecord record = mock(HoodieRecord.class);
+      when(record.getPartitionPath()).thenReturn(partitionPath);
+      String writeToken = FSUtils.makeWriteToken(context.getTaskContextSupplier().getPartitionIdSupplier().get(),
+          context.getTaskContextSupplier().getStageIdSupplier().get(),
+          context.getTaskContextSupplier().getAttemptIdSupplier().get());
+      HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, table, partitionPath, fileName,
+          context.getTaskContextSupplier());
+      return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken);
+    }).collect(Collectors.toList()).get(0);
+
+    assertEquals(newPathWithWriteToken.getKey().toString(), Paths.get(this.basePath, partitionPath,
+        FSUtils.makeDataFileName(instantTime, newPathWithWriteToken.getRight(), fileName)).toString());
+  }
+
+  private HoodieWriteConfig makeHoodieClientConfig() {
+    return makeHoodieClientConfigBuilder().build();
+  }
+
+  private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {
+    // Prepare the AvroParquetIO
+    return HoodieWriteConfig.newBuilder()
+        .withEngineType(EngineType.JAVA)
+        .withPath(basePath)
+        .withSchema(SCHEMA.toString());
+  }
+
+  @Test
+  public void testUpdateRecords() throws Exception {
+    // Prepare the AvroParquetIO
+    HoodieWriteConfig config = makeHoodieClientConfig();
+    String firstCommitTime = makeNewCommitTime();
+    HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
+    writeClient.startCommitWithTime(firstCommitTime);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    String partitionPath = "2016/01/31";
+    HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient);
+
+    // Get some records belong to the same partition (2016/01/31)
+    String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+    String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+    String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+    String recordStr4 = "{\"_row_key\":\"8eb5b87d-1fej-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":51}";
+
+    List<HoodieRecord> records = new ArrayList<>();
+    RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
+    records.add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1));
+    RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
+    records.add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2));
+    RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
+    records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
+
+    // Insert new records
+    final HoodieJavaCopyOnWriteTable cowTable = table;
+    writeClient.insert(records, firstCommitTime);
+
+    FileStatus[] allFiles = getIncrementalFiles(partitionPath, "0", -1);
+    assertEquals(1, allFiles.length);
+
+    // Read out the bloom filter and make sure filter can answer record exist or not
+    Path parquetFilePath = allFiles[0].getPath();
+    BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, parquetFilePath);
+    for (HoodieRecord record : records) {
+      assertTrue(filter.mightContain(record.getRecordKey()));
+    }
+
+    // Read the parquet file, check the record content
+    List<GenericRecord> fileRecords = ParquetUtils.readAvroRecords(hadoopConf, parquetFilePath);
+    GenericRecord newRecord;
+    int index = 0;
+    for (GenericRecord record : fileRecords) {
+      //System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey());
+      assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString());
+      index++;
+    }
+
+    // We update the 1st record & add a new record
+    String updateRecordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+    RawTripTestPayload updateRowChanges1 = new RawTripTestPayload(updateRecordStr1);
+    HoodieRecord updatedRecord1 = new HoodieRecord(
+        new HoodieKey(updateRowChanges1.getRowKey(), updateRowChanges1.getPartitionPath()), updateRowChanges1);
+
+    RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4);
+    HoodieRecord insertedRecord1 =
+        new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
+
+    List<HoodieRecord> updatedRecords = Arrays.asList(updatedRecord1, insertedRecord1);
+
+    Thread.sleep(1000);
+    String newCommitTime = makeNewCommitTime();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    writeClient.startCommitWithTime(newCommitTime);
+    List<WriteStatus> statuses = writeClient.upsert(updatedRecords, newCommitTime);
+
+    allFiles = getIncrementalFiles(partitionPath, firstCommitTime, -1);
+    assertEquals(1, allFiles.length);
+    // verify new incremental file group is same as the previous one
+    assertEquals(FSUtils.getFileId(parquetFilePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName()));
+
+    // Check whether the record has been updated
+    Path updatedParquetFilePath = allFiles[0].getPath();
+    BloomFilter updatedFilter =
+        ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, updatedParquetFilePath);
+    for (HoodieRecord record : records) {
+      // No change to the _row_key
+      assertTrue(updatedFilter.mightContain(record.getRecordKey()));
+    }
+
+    assertTrue(updatedFilter.mightContain(insertedRecord1.getRecordKey()));
+    records.add(insertedRecord1);// add this so it can further check below
+
+    ParquetReader updatedReader = ParquetReader.builder(new AvroReadSupport<>(), updatedParquetFilePath).build();
+    index = 0;
+    while ((newRecord = (GenericRecord) updatedReader.read()) != null) {
+      assertEquals(newRecord.get("_row_key").toString(), records.get(index).getRecordKey());
+      if (index == 0) {
+        assertEquals("15", newRecord.get("number").toString());
+      }
+      index++;
+    }
+    updatedReader.close();
+    // Also check the numRecordsWritten
+    WriteStatus writeStatus = statuses.get(0);
+    assertEquals(1, statuses.size(), "Should be only one file generated");
+    assertEquals(4, writeStatus.getStat().getNumWrites());// 3 rewritten records + 1 new record
+  }
+
+  private FileStatus[] getIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull)
+      throws Exception {
+    // initialize parquet input format
+    HoodieParquetInputFormat hoodieInputFormat = new HoodieParquetInputFormat();
+    JobConf jobConf = new JobConf(hadoopConf);
+    hoodieInputFormat.setConf(jobConf);
+    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
+    setupIncremental(jobConf, startCommitTime, numCommitsToPull);
+    FileInputFormat.setInputPaths(jobConf, Paths.get(basePath, partitionPath).toString());
+    return hoodieInputFormat.listStatus(jobConf);
+  }
+
+  private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
+    String modePropertyName =
+        String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+    jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
+
+    String startCommitTimestampName =
+        String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+    jobConf.set(startCommitTimestampName, startCommit);
+
+    String maxCommitPulls =
+        String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+    jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
+  }
+
+  private List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {
+    List<HoodieRecord> records = new ArrayList<>();
+    for (int i = 0; i < n; i++) {
+      String recordStr =
+          String.format("{\"_row_key\":\"%s\",\"time\":\"%s\",\"number\":%d}", UUID.randomUUID().toString(), time, i);
+      RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+      records.add(new HoodieRecord(new HoodieKey(rowChange.getRowKey(), rowChange.getPartitionPath()), rowChange));
+    }
+    return records;
+  }
+
+  // Check if record level metadata is aggregated properly at the end of write.
+  @Test
+  public void testMetadataAggregateFromWriteStatus() throws Exception {
+    // Prepare the AvroParquetIO
+    HoodieWriteConfig config =
+        makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build();
+    String firstCommitTime = makeNewCommitTime();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient);
+
+    // Get some records belong to the same partition (2016/01/31)
+    String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+    String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+    String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+
+    List<HoodieRecord> records = new ArrayList<>();
+    RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
+    records.add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1));
+    RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
+    records.add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2));
+    RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
+    records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
+
+    // Insert new records
+    BaseJavaCommitActionExecutor actionExecutor = new JavaInsertCommitActionExecutor(context, config, table,
+        firstCommitTime, records);
+    List<WriteStatus> writeStatuses = new ArrayList<>();
+    actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator())
+        .forEachRemaining(x -> writeStatuses.addAll((List<WriteStatus>)x));
+
+    Map<String, String> allWriteStatusMergedMetadataMap =
+        MetadataMergeWriteStatus.mergeMetadataForWriteStatuses(writeStatuses);
+    assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
+    // For metadata key InputRecordCount_1506582000, value is 2 for each record. So sum of this
+    // should be 2 * 3
+    assertEquals("6", allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000"));
+  }
+
+  private void verifyStatusResult(List<WriteStatus> statuses, Map<String, Long> expectedPartitionNumRecords) {
+    Map<String, Long> actualPartitionNumRecords = new HashMap<>();
+
+    for (int i = 0; i < statuses.size(); i++) {
+      WriteStatus writeStatus = statuses.get(i);
+      String partitionPath = writeStatus.getPartitionPath();
+      actualPartitionNumRecords.put(
+          partitionPath,
+          actualPartitionNumRecords.getOrDefault(partitionPath, 0L) + writeStatus.getTotalRecords());
+      assertEquals(0, writeStatus.getFailedRecords().size());
+    }
+
+    assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords);
+  }
+
+  @Test
+    public void testInsertRecords() throws Exception {
+    HoodieWriteConfig config = makeHoodieClientConfig();
+    String instantTime = makeNewCommitTime();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient);
+
+    // Case 1:
+    // 10 records for partition 1, 1 record for partition 2.
+    List<HoodieRecord> records = newHoodieRecords(10, "2016-01-31T03:16:41.415Z");
+    records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z"));
+
+    // Insert new records
+    final List<HoodieRecord> recs2 = records;
+    BaseJavaCommitActionExecutor actionExecutor = new JavaInsertPreppedCommitActionExecutor(context, config, table,
+        instantTime, recs2);
+
+    final List<WriteStatus> returnedStatuses = new ArrayList<>();
+    actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs2.iterator())
+        .forEachRemaining(x -> returnedStatuses.addAll((List<WriteStatus>)x));
+
+    assertEquals(2, returnedStatuses.size());
+    Map<String, Long> expectedPartitionNumRecords = new HashMap<>();
+    expectedPartitionNumRecords.put("2016/01/31", 10L);
+    expectedPartitionNumRecords.put("2016/02/01", 1L);
+    verifyStatusResult(returnedStatuses, expectedPartitionNumRecords);
+
+    // Case 2:
+    // 1 record for partition 1, 5 record for partition 2, 1 records for partition 3.
+    records = newHoodieRecords(1, "2016-01-31T03:16:41.415Z");
+    records.addAll(newHoodieRecords(5, "2016-02-01T03:16:41.415Z"));
+    records.addAll(newHoodieRecords(1, "2016-02-02T03:16:41.415Z"));
+
+    // Insert new records
+    final List<HoodieRecord> recs3 = records;
+    BaseJavaCommitActionExecutor newActionExecutor = new JavaUpsertPreppedCommitActionExecutor(context, config, table,
+        instantTime, recs3);
+
+    final List<WriteStatus> returnedStatuses1 = new ArrayList<>();
+    newActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs3.iterator())
+        .forEachRemaining(x -> returnedStatuses1.addAll((List<WriteStatus>)x));
+
+    assertEquals(3, returnedStatuses1.size());
+    expectedPartitionNumRecords.clear();
+    expectedPartitionNumRecords.put("2016/01/31", 1L);
+    expectedPartitionNumRecords.put("2016/02/01", 5L);
+    expectedPartitionNumRecords.put("2016/02/02", 1L);
+    verifyStatusResult(returnedStatuses1, expectedPartitionNumRecords);
+  }
+
+  @Test
+  public void testFileSizeUpsertRecords() throws Exception {
+    HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(HoodieStorageConfig.newBuilder()
+        .parquetMaxFileSize(64 * 1024).hfileMaxFileSize(64 * 1024)
+        .parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build();
+
+    String instantTime = makeNewCommitTime();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient);
+
+    List<HoodieRecord> records = new ArrayList<>();
+    // Approx 1150 records are written for block size of 64KB
+    for (int i = 0; i < 2000; i++) {
+      String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString()
+          + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}";
+      RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+      records.add(new HoodieRecord(new HoodieKey(rowChange.getRowKey(), rowChange.getPartitionPath()), rowChange));
+    }
+
+    // Insert new records
+    BaseJavaCommitActionExecutor actionExecutor = new JavaUpsertCommitActionExecutor(context, config, table,
+        instantTime, records);
+
+    Arrays.asList(1).stream()
+        .map(i -> actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator()))
+        .map(Transformations::flatten).collect(Collectors.toList());
+
+    // Check the updated file
+    int counts = 0;
+    for (File file : Paths.get(basePath, "2016/01/31").toFile().listFiles()) {
+      if (file.getName().endsWith(".parquet") && FSUtils.getCommitTime(file.getName()).equals(instantTime)) {
+        LOG.info(file.getName() + "-" + file.length());
+        counts++;
+      }
+    }
+    assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file");
+  }
+
+  @Test
+  public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
+    Schema schema = getSchemaFromResource(TestJavaCopyOnWriteActionExecutor.class, "/testDataGeneratorSchema.txt");
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withEngineType(EngineType.JAVA)
+        .withPath(basePath)
+        .withSchema(schema.toString())
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            .parquetMaxFileSize(1000 * 1024).hfileMaxFileSize(1000 * 1024).build())
+        .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    final HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient);
+    String instantTime = "000";
+    // Perform inserts of 100 records to test CreateHandle and BufferedExecutor
+    final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
+    BaseJavaCommitActionExecutor actionExecutor = new JavaInsertCommitActionExecutor(context, config, table,
+        instantTime, inserts);
+
+    final List<List<WriteStatus>> ws = new ArrayList<>();
+    actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator())
+        .forEachRemaining(x -> ws.add((List<WriteStatus>)x));
+
+    WriteStatus writeStatus = ws.get(0).get(0);
+    String fileId = writeStatus.getFileId();
+    metaClient.getFs().create(new Path(Paths.get(basePath, ".hoodie", "000.commit").toString())).close();
+    final List<HoodieRecord> updates = dataGen.generateUpdatesWithHoodieAvroPayload(instantTime, inserts);
+
+    String partitionPath = writeStatus.getPartitionPath();
+    long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count();
+    BaseJavaCommitActionExecutor newActionExecutor = new JavaUpsertCommitActionExecutor(context, config, table,
+        instantTime, updates);
+
+    taskContextSupplier.reset();
+    final List<List<WriteStatus>> updateStatus = new ArrayList<>();
+    newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator())
+        .forEachRemaining(x -> updateStatus.add((List<WriteStatus>)x));
+    assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords());
+  }
+
+  public void testBulkInsertRecords(String bulkInsertMode) throws Exception {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withBulkInsertParallelism(2).withBulkInsertSortMode(bulkInsertMode).build();
+    String instantTime = makeNewCommitTime();
+    HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
+    writeClient.startCommitWithTime(instantTime);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient);
+
+    // Insert new records
+    final List<HoodieRecord> inputRecords = generateTestRecordsForBulkInsert();
+    JavaBulkInsertCommitActionExecutor bulkInsertExecutor = new JavaBulkInsertCommitActionExecutor(
+        context, config, table, instantTime, inputRecords, Option.empty());
+    List<WriteStatus> returnedStatuses = (List<WriteStatus>)bulkInsertExecutor.execute().getWriteStatuses();
+    verifyStatusResult(returnedStatuses, generateExpectedPartitionNumRecords(inputRecords));
+  }
+
+  public static Map<String, Long> generateExpectedPartitionNumRecords(List<HoodieRecord> records) {
+    return records.stream().map(record -> Pair.of(record.getPartitionPath(), 1))
+        .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
+  }
+
+  public static List<HoodieRecord> generateTestRecordsForBulkInsert() {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    // RDD partition 1
+    List<HoodieRecord> records1 = dataGenerator.generateInserts("0", 100);
+    // RDD partition 2
+    List<HoodieRecord> records2 = dataGenerator.generateInserts("0", 150);
+    records1.addAll(records2);
+    return records1;
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/test/resources/testDataGeneratorSchema.txt b/hudi-client/hudi-java-client/src/test/resources/testDataGeneratorSchema.txt
new file mode 100644
index 0000000..ada01b3
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/test/resources/testDataGeneratorSchema.txt
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "triprec",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "type" : "long"
+  }, {
+    "name" : "_row_key",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "type" : "string"
+  }, {
+    "name" : "driver",
+    "type" : "string"
+  }, {
+    "name" : "begin_lat",
+    "type" : "double"
+  }, {
+    "name" : "begin_lon",
+    "type" : "double"
+  }, {
+    "name" : "end_lat",
+    "type" : "double"
+  }, {
+    "name" : "end_lon",
+    "type" : "double"
+  }, {
+    "name" : "distance_in_meters",
+    "type" : "int"
+  }, {
+    "name" : "seconds_since_epoch",
+    "type" : "long"
+  }, {
+    "name" : "weight",
+    "type" : "float"
+  },{
+    "name" : "nation",
+    "type" : "bytes"
+  },{
+    "name" : "current_date",
+    "type" : {
+      "type" : "int",
+      "logicalType" : "date"
+      }
+  },{
+    "name" : "current_ts",
+    "type" : {
+      "type" : "long"
+      }
+  },{
+    "name" : "height",
+    "type" : {
+      "type" : "fixed",
+      "name" : "abc",
+      "size" : 5,
+      "logicalType" : "decimal",
+      "precision" : 10,
+      "scale": 6
+      }
+  }, {
+    "name" :"city_to_state",
+    "type" : {
+      "type" : "map",
+      "values": "string"
+    }
+  },
+  {
+    "name" : "fare",
+    "type" : {
+      "type" : "record",
+      "name" : "fare",
+      "fields" : [
+        {
+         "name" : "amount",
+         "type" : "double"
+        },
+        {
+         "name" : "currency",
+         "type" : "string"
+        }
+      ]
+    }
+  },
+  {
+    "name" : "tip_history",
+    "type" : {
+      "type" : "array",
+      "items" : {
+        "type" : "record",
+        "name" : "tip_history",
+        "fields" : [
+          {
+            "name" : "amount",
+            "type" : "double"
+          },
+          {
+            "name" : "currency",
+            "type" : "string"
+          }
+        ]
+      }
+    }
+  },
+  {
+    "name" : "_hoodie_is_deleted",
+    "type" : "boolean",
+    "default" : false
+  } ]
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java
index 5834fa9..5010452 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java
@@ -22,5 +22,5 @@ package org.apache.hudi.common.engine;
  * Hoodie data processing engine. support only Apache Spark and Apache Flink for now.
  */
 public enum EngineType {
-  SPARK, FLINK
+  SPARK, FLINK, JAVA
 }