You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2021/10/25 04:14:55 UTC

[hudi] branch master updated: [HUDI-2077] Fix TestHoodieDeltaStreamerWithMultiWriter (#3849)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d856037  [HUDI-2077] Fix TestHoodieDeltaStreamerWithMultiWriter (#3849)
d856037 is described below

commit d8560377c306e49b7e58448b6897e9c0e7719f61
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Sun Oct 24 21:14:39 2021 -0700

    [HUDI-2077] Fix TestHoodieDeltaStreamerWithMultiWriter (#3849)
    
    Remove the logic of using deltastreamer to prep test table. Use fixture (compressed test table) instead.
---
 .../SparkClientFunctionalTestHarness.java          |   8 +-
 .../hudi/testutils/providers/SparkProvider.java    |   4 +-
 .../apache/hudi/common/testutils/FixtureUtils.java |  81 +++++++++++++++++
 .../common/testutils/HoodieTestDataGenerator.java  |   5 +-
 .../functional/TestHoodieDeltaStreamer.java        |  12 +--
 .../TestHoodieDeltaStreamerWithMultiWriter.java    |  96 ++++++++++-----------
 .../functional/TestJdbcbasedSchemaProvider.java    |  11 ++-
 .../testutils/sources/AbstractBaseTestSource.java  |  26 +++++-
 ...inuousModeWithMultipleWriters.COPY_ON_WRITE.zip | Bin 0 -> 2494616 bytes
 ...inuousModeWithMultipleWriters.MERGE_ON_READ.zip | Bin 0 -> 2910151 bytes
 10 files changed, 178 insertions(+), 65 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 74ab52d..aca1d83 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -176,8 +176,14 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
     }
   }
 
+  /**
+   * To clean up Spark resources after all testcases have run in functional tests.
+   *
+   * Spark session and contexts were reused for testcases in the same test class. Some
+   * testcase may invoke this specifically to clean up in case of repeated test runs.
+   */
   @AfterAll
-  public static synchronized void cleanUpAfterAll() {
+  public static synchronized void resetSpark() {
     if (spark != null) {
       spark.close();
       spark = null;
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
index be15dc8..92b1f76 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
@@ -39,6 +39,8 @@ public interface SparkProvider extends org.apache.hudi.testutils.providers.Hoodi
     SparkConf sparkConf = new SparkConf();
     sparkConf.set("spark.app.name", getClass().getName());
     sparkConf.set("spark.master", "local[*]");
+    sparkConf.set("spark.default.parallelism", "4");
+    sparkConf.set("spark.sql.shuffle.partitions", "4");
     sparkConf.set("spark.driver.maxResultSize", "2g");
     sparkConf.set("spark.hadoop.mapred.output.compress", "true");
     sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
@@ -52,4 +54,4 @@ public interface SparkProvider extends org.apache.hudi.testutils.providers.Hoodi
   default SparkConf conf() {
     return conf(Collections.emptyMap());
   }
-}
\ No newline at end of file
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FixtureUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FixtureUtils.java
new file mode 100644
index 0000000..6dfe0da
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FixtureUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Objects;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public final class FixtureUtils {
+
+  public static Path prepareFixtureTable(URL fixtureResource, Path basePath) throws IOException {
+    File zippedFixtureTable = new File(fixtureResource.getFile());
+    try (ZipInputStream zis = new ZipInputStream(new FileInputStream(zippedFixtureTable))) {
+      byte[] buffer = new byte[1024];
+      ZipEntry zipEntry = zis.getNextEntry();
+      Path tableBasePath = basePath.resolve(Objects.requireNonNull(zipEntry).getName()
+          .replaceAll(File.separator + "$", ""));
+      while (zipEntry != null) {
+        File newFile = newFile(basePath.toFile(), zipEntry);
+        if (zipEntry.isDirectory()) {
+          if (!newFile.isDirectory() && !newFile.mkdirs()) {
+            throw new IOException("Failed to create directory " + newFile);
+          }
+        } else {
+          // fix for Windows-created archives
+          File parent = newFile.getParentFile();
+          if (!parent.isDirectory() && !parent.mkdirs()) {
+            throw new IOException("Failed to create directory " + parent);
+          }
+
+          // write file content
+          try (FileOutputStream fos = new FileOutputStream(newFile)) {
+            int len;
+            while ((len = zis.read(buffer)) > 0) {
+              fos.write(buffer, 0, len);
+            }
+          }
+        }
+        zipEntry = zis.getNextEntry();
+      }
+      zis.closeEntry();
+      return tableBasePath;
+    }
+  }
+
+  public static File newFile(File destinationDir, ZipEntry zipEntry) throws IOException {
+    File destFile = new File(destinationDir, zipEntry.getName());
+
+    String destDirPath = destinationDir.getCanonicalPath();
+    String destFilePath = destFile.getCanonicalPath();
+
+    if (!destFilePath.startsWith(destDirPath + File.separator)) {
+      throw new IOException("Entry is outside of the target dir: " + zipEntry.getName());
+    }
+
+    return destFile;
+  }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 86ea1f0..e988c9d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -160,6 +160,7 @@ public class HoodieTestDataGenerator {
     this.existingKeysBySchema = new HashMap<>();
     existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap);
     numKeysBySchema = new HashMap<>();
+    numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size());
   }
 
   /**
@@ -844,8 +845,8 @@ public class HoodieTestDataGenerator {
 
   public static class KeyPartition implements Serializable {
 
-    HoodieKey key;
-    String partitionPath;
+    public HoodieKey key;
+    public String partitionPath;
   }
 
   public void close() {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index f49c148..86c92f2 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -239,32 +239,32 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     }
 
     static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) {
-      long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count();
       sqlContext.clearCache();
+      long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count();
       assertEquals(expected, recordCount);
     }
 
     static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
+      sqlContext.clearCache();
       List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tablePath)
           .groupBy("_hoodie_commit_time").count()
           .sort("_hoodie_commit_time").collectAsList();
-      sqlContext.clearCache();
       return rows;
     }
 
     static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) {
+      sqlContext.clearCache();
       sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
       long recordCount =
-          sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance is not NULL").count();
-      sqlContext.clearCache();
+          sqlContext.sql("select * from tmp_trips where haversine_distance is not NULL").count();
       assertEquals(expected, recordCount);
     }
 
     static void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) {
+      sqlContext.clearCache();
       sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
       long recordCount =
-          sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance = 1.0").count();
-      sqlContext.clearCache();
+          sqlContext.sql("select * from tmp_trips where haversine_distance = 1.0").count();
       assertEquals(expected, recordCount);
     }
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
index c93b7d9..3cdf5f9 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -27,7 +27,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.sources.TestDataSource;
@@ -41,60 +41,60 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.function.Function;
 
+import static org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable;
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.config.HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE;
+import static org.apache.hudi.config.HoodieWriteConfig.BULK_INSERT_SORT_MODE;
+import static org.apache.hudi.config.HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE;
+import static org.apache.hudi.config.HoodieWriteConfig.INSERT_PARALLELISM_VALUE;
+import static org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
 import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER;
 import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
 import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
 import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
+import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.DEFAULT_PARTITION_NUM;
+import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.dataGeneratorMap;
+import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.initDataGen;
 
 @Tag("functional")
 public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness {
 
+  String basePath;
+  String propsFilePath;
+  String tableBasePath;
+  int totalRecords;
+
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
   void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) throws Exception {
     // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
-    final String basePath = basePath().replaceAll("/$", "");
-    final String propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
-    final String tableBasePath = basePath + "/testtable_" + tableType;
+    setUpTestTable(tableType);
     prepareInitialConfigs(fs(), basePath, "foo");
     // enable carrying forward latest checkpoint
     TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
     props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
     props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
-    props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"3");
-    props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"5000");
+    props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
+    props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
     UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
-    // Keep it higher than batch-size to test continuous mode
-    int totalRecords = 3000;
 
     HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
         propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
     cfgIngestionJob.continuousMode = true;
     cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
     cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
-    HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
-
-    // Prepare base dataset with some commits
-    deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
-      if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs());
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs());
-      } else {
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs());
-      }
-      TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
-      TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
-      return true;
-    });
 
     // create a backfill job
     HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
@@ -152,37 +152,19 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
   @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"})
   void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception {
     // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
-    final String basePath = basePath().replaceAll("/$", "");
-    final String propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
-    final String tableBasePath = basePath + "/testtable_" + tableType;
+    setUpTestTable(tableType);
     prepareInitialConfigs(fs(), basePath, "foo");
     // enable carrying forward latest checkpoint
     TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
     props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
     props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
     UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
-    // Keep it higher than batch-size to test continuous mode
-    int totalRecords = 3000;
 
     HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
         propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
     cfgIngestionJob.continuousMode = true;
     cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
     cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
-    HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
-
-    // Prepare base dataset with some commits
-    deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
-      if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs());
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs());
-      } else {
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs());
-      }
-      TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
-      TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
-      return true;
-    });
 
     // create a backfill job with checkpoint from the first instant
     HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
@@ -245,6 +227,11 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
     props.setProperty("hoodie.write.lock.num_retries", "10");
     props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table");
     props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test");
+    props.setProperty(INSERT_PARALLELISM_VALUE.key(), "4");
+    props.setProperty(UPSERT_PARALLELISM_VALUE.key(), "4");
+    props.setProperty(BULKINSERT_PARALLELISM_VALUE.key(), "4");
+    props.setProperty(FINALIZE_WRITE_PARALLELISM_VALUE.key(), "4");
+    props.setProperty(BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name());
 
     UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath);
     return props;
@@ -264,16 +251,29 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
     cfg.propsFilePath = propsFilePath;
     cfg.sourceLimit = 1000;
     cfg.schemaProviderClassName = defaultSchemaProviderClassName;
-    cfg.deltaSyncSchedulingWeight = 1;
-    cfg.deltaSyncSchedulingMinShare = 1;
-    cfg.compactSchedulingWeight = 2;
-    cfg.compactSchedulingMinShare = 1;
-    cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 10));
-    cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key(), 10));
-    cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), 10));
     return cfg;
   }
 
+  /**
+   * Specifically used for {@link TestHoodieDeltaStreamerWithMultiWriter}.
+   *
+   * The fixture test tables have random records generated by
+   * {@link org.apache.hudi.common.testutils.HoodieTestDataGenerator} using
+   * {@link org.apache.hudi.common.testutils.HoodieTestDataGenerator#TRIP_EXAMPLE_SCHEMA}.
+   *
+   * The COW fixture test table has 3000 unique records in 7 commits.
+   * The MOR fixture test table has 3000 unique records in 9 deltacommits and 1 compaction commit.
+   */
+  private void setUpTestTable(HoodieTableType tableType) throws IOException {
+    basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString();
+    propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
+    String fixtureName = String.format("fixtures/testUpsertsContinuousModeWithMultipleWriters.%s.zip", tableType.name());
+    tableBasePath = prepareFixtureTable(Objects.requireNonNull(getClass()
+        .getClassLoader().getResource(fixtureName)), Paths.get(basePath)).toString();
+    initDataGen(sqlContext(), tableBasePath + "/*/*.parquet", DEFAULT_PARTITION_NUM);
+    totalRecords = dataGeneratorMap.get(DEFAULT_PARTITION_NUM).getNumExistingKeys(TRIP_EXAMPLE_SCHEMA);
+  }
+
   private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
       HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
       HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
index 7dd8af6..938f71c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -72,11 +71,11 @@ public class TestJdbcbasedSchemaProvider extends SparkClientFunctionalTestHarnes
    * Initialize the H2 database and obtain a connection, then create a table as a test.
    * Based on the characteristics of the H2 in-memory database, we do not need to display the initialized database.
    * @throws SQLException
-   * @throws IOException
    */
-  private void initH2Database() throws SQLException, IOException {
-    Connection conn = DriverManager.getConnection("jdbc:h2:mem:test_mem", "sa", "");
-    PreparedStatement ps = conn.prepareStatement(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/triprec.sql"));
-    ps.executeUpdate();
+  private void initH2Database() throws SQLException {
+    try (Connection conn = DriverManager.getConnection("jdbc:h2:mem:test_mem", "sa", "")) {
+      PreparedStatement ps = conn.prepareStatement(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/triprec.sql"));
+      ps.executeUpdate();
+    }
   }
 }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
index 524591d..5186179 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.testutils.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
@@ -32,12 +33,18 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 public abstract class AbstractBaseTestSource extends AvroSource {
@@ -47,7 +54,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
   public static final int DEFAULT_PARTITION_NUM = 0;
 
   // Static instance, helps with reuse across a test.
-  protected static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<>();
+  public static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<>();
 
   public static void initDataGen() {
     dataGeneratorMap.putIfAbsent(DEFAULT_PARTITION_NUM,
@@ -68,6 +75,23 @@ public abstract class AbstractBaseTestSource extends AvroSource {
     }
   }
 
+  public static void initDataGen(SQLContext sqlContext, String globParquetPath, int partition) {
+    List<Row> rows = sqlContext.read().format("hudi").load(globParquetPath)
+        .select("_hoodie_record_key", "_hoodie_partition_path")
+        .collectAsList();
+    Map<Integer, HoodieTestDataGenerator.KeyPartition> keyPartitionMap = IntStream
+        .range(0, rows.size()).boxed()
+        .collect(Collectors.toMap(Function.identity(), i -> {
+          Row r = rows.get(i);
+          HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition();
+          kp.key = new HoodieKey(r.getString(0), r.getString(1));
+          kp.partitionPath = r.getString(1);
+          return kp;
+        }));
+    dataGeneratorMap.put(partition,
+        new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, keyPartitionMap));
+  }
+
   public static void resetDataGen() {
     for (HoodieTestDataGenerator dataGenerator : dataGeneratorMap.values()) {
       dataGenerator.close();
diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip
new file mode 100644
index 0000000..48bf278
Binary files /dev/null and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip differ
diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip
new file mode 100644
index 0000000..657f83c
Binary files /dev/null and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip differ