You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/10/07 12:34:02 UTC

[hudi] branch master updated: [HUDI-1303] Some improvements for the HUDI Test Suite. (#2128)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 788d236  [HUDI-1303] Some improvements for the HUDI Test Suite. (#2128)
788d236 is described below

commit 788d236c443eb4ced819f9305ed8e0460b5984b7
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Wed Oct 7 05:33:51 2020 -0700

    [HUDI-1303] Some improvements for the HUDI Test Suite. (#2128)
    
    1. Use the DAG Node's label from the yaml as its name instead of UUID names which are not descriptive when debugging issues from logs.
    2. Fix CleanNode constructor which is not correctly implemented
    3. When generating upsets, allows more granualar control over the number of inserts and upserts - zero or more inserts and upserts can be specified instead of always requiring both inserts and upserts.
    4. Fixed generation of records of specific size
       - The current code was using a class variable "shouldAddMore" which was reset to false after the first record generation causing subsequent records to be of minimum size.
       - In this change, we pre-calculate the extra size of the complex fields. When generating records, for complex fields we read the field size from this map.
    5. Refresh the timeline of the DeltaSync service before calling readFromSource. This ensures that only the newest generated data is read and data generated in the older Dag Nodes is ignored (as their AVRO files will have an older timestamp).
    6. Making --workload-generator-classname an optional parameter as most probably the default will be used
---
 docker/compose/hadoop.env                          |   3 +-
 .../testsuite/HoodieDeltaStreamerWrapper.java      |   1 +
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   |   6 +-
 .../integ/testsuite/configuration/DeltaConfig.java |   5 +
 .../apache/hudi/integ/testsuite/dag/DagUtils.java  |  12 +-
 .../hudi/integ/testsuite/dag/nodes/CleanNode.java  |   4 +-
 .../integ/testsuite/generator/DeltaGenerator.java  |  65 +++++----
 .../GenericRecordFullPayloadGenerator.java         | 152 ++++++++++-----------
 .../GenericRecordPartialPayloadGenerator.java      |   2 +-
 .../reader/DFSHoodieDatasetInputReader.java        |   4 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   2 +-
 .../utilities/sources/helpers/DFSPathSelector.java |   1 +
 12 files changed, 134 insertions(+), 123 deletions(-)

diff --git a/docker/compose/hadoop.env b/docker/compose/hadoop.env
index 474c3db..4e8a942 100644
--- a/docker/compose/hadoop.env
+++ b/docker/compose/hadoop.env
@@ -21,12 +21,13 @@ HIVE_SITE_CONF_javax_jdo_option_ConnectionUserName=hive
 HIVE_SITE_CONF_javax_jdo_option_ConnectionPassword=hive
 HIVE_SITE_CONF_datanucleus_autoCreateSchema=false
 HIVE_SITE_CONF_hive_metastore_uris=thrift://hivemetastore:9083
-HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false
 
+HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false
 HDFS_CONF_dfs_webhdfs_enabled=true
 HDFS_CONF_dfs_permissions_enabled=false
 #HDFS_CONF_dfs_client_use_datanode_hostname=true
 #HDFS_CONF_dfs_namenode_use_datanode_hostname=true
+HDFS_CONF_dfs_replication=1
 
 CORE_CONF_fs_defaultFS=hdfs://namenode:8020
 CORE_CONF_hadoop_http_staticuser_user=root
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 5179e89..6e5027b 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -66,6 +66,7 @@ public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer {
 
   public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception {
     DeltaSync service = deltaSyncService.get().getDeltaSync();
+    service.refreshTimeline();
     return service.readFromSource(service.getCommitTimelineOpt());
   }
 
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index 2c4b73a..c2c242a 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -156,8 +156,7 @@ public class HoodieTestSuiteJob {
     public String inputBasePath;
 
     @Parameter(names = {
-        "--workload-generator-classname"}, description = "WorkflowDag of operations to generate the workload",
-        required = true)
+        "--workload-generator-classname"}, description = "WorkflowDag of operations to generate the workload")
     public String workloadDagGenerator = WorkflowDagGenerator.class.getName();
 
     @Parameter(names = {
@@ -177,8 +176,7 @@ public class HoodieTestSuiteJob {
     public Long limitFileSize = 1024 * 1024 * 120L;
 
     @Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to "
-        + "perform"
-        + " ingestion. If set to false, HoodieWriteClient will be used")
+        + "perform ingestion. If set to false, HoodieWriteClient will be used")
     public Boolean useDeltaStreamer = false;
 
   }
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index f20f84e..7a66681 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -257,6 +257,11 @@ public class DeltaConfig implements Serializable {
         return this;
       }
 
+      public Builder withName(String name) {
+        this.configsMap.put(CONFIG_NAME, name);
+        return this;
+      }
+
       public Config build() {
         return new Config(configsMap);
       }
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
index 2889867..d535823 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
@@ -68,7 +68,7 @@ public class DagUtils {
     Iterator<Entry<String, JsonNode>> itr = jsonNode.fields();
     while (itr.hasNext()) {
       Entry<String, JsonNode> dagNode = itr.next();
-      allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, dagNode.getValue()));
+      allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, dagNode.getKey(), dagNode.getValue()));
     }
     return new WorkflowDag(findRootNodes(allNodes));
   }
@@ -94,9 +94,10 @@ public class DagUtils {
     }
   }
 
-  private static DagNode convertJsonToDagNode(Map<String, DagNode> allNodes, JsonNode node) throws IOException {
+  private static DagNode convertJsonToDagNode(Map<String, DagNode> allNodes, String name, JsonNode node)
+      throws IOException {
     String type = node.get(DeltaConfig.Config.TYPE).asText();
-    final DagNode retNode = convertJsonToDagNode(node, type);
+    final DagNode retNode = convertJsonToDagNode(node, type, name);
     Arrays.asList(node.get(DeltaConfig.Config.DEPENDENCIES).textValue().split(",")).stream().forEach(dep -> {
       DagNode parentNode = allNodes.get(dep);
       if (parentNode != null) {
@@ -116,9 +117,10 @@ public class DagUtils {
     return rootNodes;
   }
 
-  private static DagNode convertJsonToDagNode(JsonNode node, String type) {
+  private static DagNode convertJsonToDagNode(JsonNode node, String type, String name) {
     try {
-      DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node)).build();
+      DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node))
+          .withName(name).build();
       return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config);
     } catch (ClassNotFoundException e) {
       throw new RuntimeException(e);
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
index 2c0fcba..83a8d5e 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.integ.testsuite.dag.nodes;
 
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
 
 /**
@@ -26,7 +27,8 @@ import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
  */
 public class CleanNode extends DagNode<Boolean> {
 
-  public CleanNode() {
+  public CleanNode(Config config) {
+    this.config = config;
   }
 
   @Override
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index c42705d..8dc7f4b 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.stream.StreamSupport;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.integ.testsuite.converter.Converter;
 import org.apache.hudi.integ.testsuite.converter.UpdateConverter;
 import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
 import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
@@ -93,11 +92,11 @@ public class DeltaGenerator implements Serializable {
   }
 
   public JavaRDD<GenericRecord> generateInserts(Config operation) {
-    long recordsPerPartition = operation.getNumRecordsInsert();
     int numPartitions = operation.getNumInsertPartitions();
+    long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
     int minPayloadSize = operation.getRecordSize();
     JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
-        .repartition(operation.getNumInsertPartitions()).mapPartitions(p -> {
+        .repartition(numPartitions).mapPartitions(p -> {
           return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
             minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions));
         });
@@ -112,34 +111,44 @@ public class DeltaGenerator implements Serializable {
       }
       DeltaInputReader deltaInputReader = null;
       JavaRDD<GenericRecord> adjustedRDD = null;
-      if (config.getNumUpsertPartitions() < 1) {
-        // randomly generate updates for a given number of records without regard to partitions and files
-        deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr,
-            ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty());
-        adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert());
-        adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert());
-      } else {
-        deltaInputReader =
-            new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(),
-                schemaStr);
-        if (config.getFractionUpsertPerFile() > 0) {
-          adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(),
-              config.getFractionUpsertPerFile());
+      if (config.getNumUpsertPartitions() != 0) {
+        if (config.getNumUpsertPartitions() < 0) {
+          // randomly generate updates for a given number of records without regard to partitions and files
+          deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr,
+              ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty());
+          adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert());
+          adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert());
+        } else {
+          deltaInputReader =
+              new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(),
+                  schemaStr);
+          if (config.getFractionUpsertPerFile() > 0) {
+            adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(),
+                config.getFractionUpsertPerFile());
+          } else {
+            adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config
+                .getNumRecordsUpsert());
+          }
+        }
+
+        log.info("Repartitioning records");
+        // persist this since we will make multiple passes over this
+        adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
+        log.info("Repartitioning records done");
+        UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(),
+            partitionPathFieldNames, recordRowKeyFieldNames);
+        JavaRDD<GenericRecord> updates = converter.convert(adjustedRDD);
+
+        log.info("Records converted");
+        updates.persist(StorageLevel.DISK_ONLY());
+
+        if (inserts == null) {
+          inserts = updates;
         } else {
-          adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config
-              .getNumRecordsUpsert());
+          inserts = inserts.union(updates);
         }
       }
-      log.info("Repartitioning records");
-      // persist this since we will make multiple passes over this
-      adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
-      log.info("Repartitioning records done");
-      Converter converter = new UpdateConverter(schemaStr, config.getRecordSize(),
-          partitionPathFieldNames, recordRowKeyFieldNames);
-      JavaRDD<GenericRecord> updates = converter.convert(adjustedRDD);
-      log.info("Records converted");
-      updates.persist(StorageLevel.DISK_ONLY());
-      return inserts != null ? inserts.union(updates) : updates;
+      return inserts;
       // TODO : Generate updates for only N partitions.
     } else {
       throw new IllegalArgumentException("Other formats are not supported at the moment");
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
index cdb2196..df9a449 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
@@ -44,25 +44,22 @@ import org.slf4j.LoggerFactory;
  * Every field of a generic record created using this generator contains a random value.
  */
 public class GenericRecordFullPayloadGenerator implements Serializable {
+  private static Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
 
   public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
   public static final int DEFAULT_NUM_DATE_PARTITIONS = 50;
-  private static Logger log = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
   protected final Random random = new Random();
   // The source schema used to generate a payload
   private final transient Schema baseSchema;
   // Used to validate a generic record
   private final transient GenericData genericData = new GenericData();
-  // Number of more bytes to add based on the estimated full record payload size and min payload size
-  private int numberOfBytesToAdd;
-  // If more elements should be packed to meet the minPayloadSize
-  private boolean shouldAddMore;
-  // How many complex fields have we visited that can help us pack more entries and increase the size of the record
-  private int numberOfComplexFields;
-  // The size of a full record where every field of a generic record created contains 1 random value
-  private int estimatedFullPayloadSize;
   // The number of unique dates to create
   private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS;
+  // The size of a full record where every field of a generic record created contains 1 random value
+  private final int estimatedFullPayloadSize;
+  // Number of extra entries to add in a complex/collection field to achieve the desired record size
+  Map<String, Integer> extraEntriesMap = new HashMap<>();
+
   // LogicalTypes in Avro 1.8.2
   private static final String DECIMAL = "decimal";
   private static final String UUID_NAME = "uuid";
@@ -80,17 +77,18 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
     Pair<Integer, Integer> sizeInfo = new GenericRecordFullPayloadSizeEstimator(schema)
         .typeEstimateAndNumComplexFields();
     this.estimatedFullPayloadSize = sizeInfo.getLeft();
-    this.numberOfComplexFields = sizeInfo.getRight();
     this.baseSchema = schema;
-    this.shouldAddMore = estimatedFullPayloadSize < minPayloadSize;
-    if (this.shouldAddMore) {
-      this.numberOfBytesToAdd = minPayloadSize - estimatedFullPayloadSize;
-      if (numberOfComplexFields < 1) {
-        log.warn("The schema does not have any collections/complex fields. Cannot achieve minPayloadSize : {}",
-            minPayloadSize);
-      }
+    if (estimatedFullPayloadSize < minPayloadSize) {
+      int numberOfComplexFields = sizeInfo.getRight();
+        if (numberOfComplexFields < 1) {
+          LOG.warn("The schema does not have any collections/complex fields. "
+              + "Cannot achieve minPayloadSize => " + minPayloadSize);
+        }
+
+        determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - estimatedFullPayloadSize);
     }
   }
+
   public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int numDatePartitions) {
     this(schema, minPayloadSize);
     this.numDatePartitions = numDatePartitions;
@@ -113,7 +111,11 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
    * @return {@link GenericRecord} with random value
    */
   public GenericRecord getNewPayload() {
-    return convert(baseSchema);
+    return getNewPayload(baseSchema);
+  }
+
+  protected GenericRecord getNewPayload(Schema schema) {
+    return randomize(new GenericData.Record(schema), null);
   }
 
   /**
@@ -128,20 +130,6 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
   }
 
   /**
-   * Create a {@link GenericRecord} with random value according to given schema.
-   *
-   * @param schema Schema to create record with
-   * @return {@link GenericRecord} with random value
-   */
-  protected GenericRecord convert(Schema schema) {
-    GenericRecord result = new GenericData.Record(schema);
-    for (Schema.Field f : schema.getFields()) {
-      result.put(f.name(), typeConvert(f.schema()));
-    }
-    return result;
-  }
-
-  /**
    * Create a new {@link GenericRecord} with random values. Not all the fields have value, it is random, and its value
    * is random too.
    *
@@ -153,7 +141,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
     for (Schema.Field f : schema.getFields()) {
       boolean setNull = random.nextBoolean();
       if (!setNull) {
-        result.put(f.name(), typeConvert(f.schema()));
+        result.put(f.name(), typeConvert(f));
       } else {
         result.put(f.name(), null);
       }
@@ -173,7 +161,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
   protected GenericRecord randomize(GenericRecord record, List<String> blacklistFields) {
     for (Schema.Field f : record.getSchema().getFields()) {
       if (blacklistFields == null || !blacklistFields.contains(f.name())) {
-        record.put(f.name(), typeConvert(f.schema()));
+        record.put(f.name(), typeConvert(f));
       }
     }
     return record;
@@ -188,12 +176,12 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
   /**
    * Generate random value according to their type.
    */
-  private Object typeConvert(Schema schema) {
-    Schema localSchema = schema;
-    if (isOption(schema)) {
-      localSchema = getNonNull(schema);
+  private Object typeConvert(Schema.Field field) {
+    Schema fieldSchema = field.schema();
+    if (isOption(fieldSchema)) {
+      fieldSchema = getNonNull(fieldSchema);
     }
-    switch (localSchema.getType()) {
+    switch (fieldSchema.getType()) {
       case BOOLEAN:
         return random.nextBoolean();
       case DOUBLE:
@@ -205,45 +193,35 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
       case LONG:
         return getNextConstrainedLong();
       case STRING:
-        return UUID.randomUUID().toString();
+       return UUID.randomUUID().toString();
       case ENUM:
-        List<String> enumSymbols = localSchema.getEnumSymbols();
-        return new GenericData.EnumSymbol(localSchema, enumSymbols.get(random.nextInt(enumSymbols.size() - 1)));
+        List<String> enumSymbols = fieldSchema.getEnumSymbols();
+        return new GenericData.EnumSymbol(fieldSchema, enumSymbols.get(random.nextInt(enumSymbols.size() - 1)));
       case RECORD:
-        return convert(localSchema);
+        return getNewPayload(fieldSchema);
       case ARRAY:
-        Schema elementSchema = localSchema.getElementType();
+        Schema.Field elementField = new Schema.Field(field.name(), fieldSchema.getElementType(), "", null);
         List listRes = new ArrayList();
-        if (isPrimitive(elementSchema) && this.shouldAddMore) {
-          int numEntriesToAdd = numEntriesToAdd(elementSchema);
-          while (numEntriesToAdd > 0) {
-            listRes.add(typeConvert(elementSchema));
-            numEntriesToAdd--;
-          }
-        } else {
-          listRes.add(typeConvert(elementSchema));
+        int numEntriesToAdd = extraEntriesMap.getOrDefault(field.name(), 1);
+        while (numEntriesToAdd-- > 0) {
+          listRes.add(typeConvert(elementField));
         }
         return listRes;
       case MAP:
-        Schema valueSchema = localSchema.getValueType();
+        Schema.Field valueField = new Schema.Field(field.name(), fieldSchema.getValueType(), "", null);
         Map<String, Object> mapRes = new HashMap<String, Object>();
-        if (isPrimitive(valueSchema) && this.shouldAddMore) {
-          int numEntriesToAdd = numEntriesToAdd(valueSchema);
-          while (numEntriesToAdd > 0) {
-            mapRes.put(UUID.randomUUID().toString(), typeConvert(valueSchema));
-            numEntriesToAdd--;
-          }
-        } else {
-          mapRes.put(UUID.randomUUID().toString(), typeConvert(valueSchema));
+        numEntriesToAdd = extraEntriesMap.getOrDefault(field.name(), 1);
+        while (numEntriesToAdd > 0) {
+          mapRes.put(UUID.randomUUID().toString(), typeConvert(valueField));
+          numEntriesToAdd--;
         }
         return mapRes;
       case BYTES:
         return ByteBuffer.wrap(UUID.randomUUID().toString().getBytes(Charset.defaultCharset()));
       case FIXED:
-        return generateFixedType(localSchema);
+        return generateFixedType(fieldSchema);
       default:
-        throw new IllegalArgumentException(
-            "Cannot handle type: " + localSchema.getType());
+        throw new IllegalArgumentException("Cannot handle type: " + fieldSchema.getType());
     }
   }
 
@@ -333,23 +311,37 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
    * @param elementSchema
    * @return Number of entries to add
    */
-  private int numEntriesToAdd(Schema elementSchema) {
-    // Find the size of the primitive data type in bytes
-    int primitiveDataTypeSize = getSize(elementSchema);
-    int numEntriesToAdd = numberOfBytesToAdd / primitiveDataTypeSize;
-    // If more than 10 entries are being added for this same complex field and there are still more complex fields to
-    // be visited in the schema, reduce the number of entries to add by a factor of 10 to allow for other complex
-    // fields to pack some entries
-    if (numEntriesToAdd % 10 > 0 && this.numberOfComplexFields > 1) {
-      numEntriesToAdd = numEntriesToAdd / 10;
-      numberOfBytesToAdd -= numEntriesToAdd * primitiveDataTypeSize;
-      this.shouldAddMore = true;
-    } else {
-      this.numberOfBytesToAdd = 0;
-      this.shouldAddMore = false;
+  private void determineExtraEntriesRequired(int numberOfComplexFields, int numberOfBytesToAdd) {
+    for (Schema.Field f : baseSchema.getFields()) {
+      Schema elementSchema = f.schema();
+      // Find the size of the primitive data type in bytes
+      int primitiveDataTypeSize = 0;
+      if (elementSchema.getType() == Type.ARRAY && isPrimitive(elementSchema.getElementType())) {
+        primitiveDataTypeSize = getSize(elementSchema.getElementType());
+      } else if (elementSchema.getType() == Type.MAP && isPrimitive(elementSchema.getValueType())) {
+        primitiveDataTypeSize = getSize(elementSchema.getValueType());
+      } else {
+        continue;
+      }
+
+      int numEntriesToAdd = numberOfBytesToAdd / primitiveDataTypeSize;
+      // If more than 10 entries are being added for this same complex field and there are still more complex fields to
+      // be visited in the schema, reduce the number of entries to add by a factor of 10 to allow for other complex
+      // fields to pack some entries
+      if (numEntriesToAdd > 10 && numberOfComplexFields > 1) {
+        numEntriesToAdd = 10;
+        numberOfBytesToAdd -= numEntriesToAdd * primitiveDataTypeSize;
+      } else {
+        numberOfBytesToAdd = 0;
+      }
+
+      extraEntriesMap.put(f.name(), numEntriesToAdd);
+
+      numberOfComplexFields -= 1;
+      if (numberOfBytesToAdd <= 0) {
+        break;
+      }
     }
-    this.numberOfComplexFields -= 1;
-    return numEntriesToAdd;
   }
 }
 
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordPartialPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordPartialPayloadGenerator.java
index f7e4174..999f83d 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordPartialPayloadGenerator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordPartialPayloadGenerator.java
@@ -38,7 +38,7 @@ public class GenericRecordPartialPayloadGenerator extends GenericRecordFullPaylo
   }
 
   @Override
-  protected GenericRecord convert(Schema schema) {
+  protected GenericRecord getNewPayload(Schema schema) {
     GenericRecord record = super.convertPartial(schema);
     return record;
   }
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 209aa46..e209118 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -132,7 +132,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
       Option<Long> numRecordsToUpdate, Option<Double> percentageRecordsPerFile) throws IOException {
     log.info("NumPartitions : {}, NumFiles : {}, numRecordsToUpdate : {}, percentageRecordsPerFile : {}",
         numPartitions, numFiles, numRecordsToUpdate, percentageRecordsPerFile);
-    List<String> partitionPaths = getPartitions(numPartitions);
+    final List<String> partitionPaths = getPartitions(numPartitions);
     // Read all file slices in the partition
     JavaPairRDD<String, Iterator<FileSlice>> partitionToFileSlice = getPartitionToFileSlice(metaClient,
         partitionPaths);
@@ -156,7 +156,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
     }
     // Adjust the number of files to read per partition based on the requested partition & file counts
     Map<String, Integer> adjustedPartitionToFileIdCountMap = getFilesToReadPerPartition(partitionToFileSlice,
-        getPartitions(numPartitions).size(), numFilesToUpdate);
+        partitionPaths.size(), numFilesToUpdate);
     JavaRDD<GenericRecord> updates = projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap,
         partitionToFileSlice, numFilesToUpdate, (int) numRecordsToUpdatePerFile));
     if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && numFiles.get() != 0 && numRecordsToUpdate.get()
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 5a1756c..a268c7b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -219,7 +219,7 @@ public class DeltaSync implements Serializable {
    *
    * @throws IOException in case of any IOException
    */
-  private void refreshTimeline() throws IOException {
+  public void refreshTimeline() throws IOException {
     if (fs.exists(new Path(cfg.targetBasePath))) {
       HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath,
           cfg.payloadClassName);
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 5d56f2a..c8690f5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -78,6 +78,7 @@ public class DFSPathSelector {
       while (fitr.hasNext()) {
         LocatedFileStatus fileStatus = fitr.next();
         if (fileStatus.isDirectory()
+            || fileStatus.getLen() == 0
             || IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
           continue;
         }