You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2020/10/29 13:50:52 UTC

[hudi] branch master updated: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing. (#2197)

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6310a23  [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing. (#2197)
6310a23 is described below

commit 6310a2307abba94c7ff8a770f45462deae2c312e
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Thu Oct 29 06:50:37 2020 -0700

    [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing. (#2197)
    
    1. Added the --clean-input and --clean-output parameters to clean the input and output directories before starting the job
    2. Added the --delete-old-input parameter to deleted older batches for data already ingested. This helps keep number of redundant files low.
    3. Added the --input-parallelism parameter to restrict the parallelism when generating input data. This helps keeping the number of generated input files low.
    4. Added an option start_offset to Dag Nodes. Without ability to specify start offsets, data is generated into existing partitions. With start offset, DAG can control on which partition, the data is to be written.
    5. Fixed generation of records for correct number of partitions
      - In the existing implementation, the partition is chosen as a random long. This does not guarantee exact number of requested partitions to be created.
    6. Changed variable blacklistedFields to be a Set as that is faster than List for membership checks.
    7. Fixed integer division for Math.ceil. If two integers are divided, the result is not double unless one of the integer is casted to double.
---
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   | 29 +++++++++
 .../testsuite/configuration/DFSDeltaConfig.java    | 17 +++++-
 .../integ/testsuite/configuration/DeltaConfig.java | 12 +++-
 .../hudi/integ/testsuite/dag/WriterContext.java    |  3 +-
 .../integ/testsuite/generator/DeltaGenerator.java  | 48 ++++++++++++---
 .../FlexibleSchemaRecordGenerationIterator.java    | 15 +++--
 .../GenericRecordFullPayloadGenerator.java         | 68 +++++++++++++++++-----
 .../generator/UpdateGeneratorIterator.java         |  8 ++-
 .../reader/DFSHoodieDatasetInputReader.java        | 26 ++++++---
 .../TestDFSHoodieTestSuiteWriterAdapter.java       |  2 +-
 .../TestGenericRecordPayloadGenerator.java         |  5 +-
 11 files changed, 187 insertions(+), 46 deletions(-)

diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index c2c242a..7b3324e 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -96,6 +96,20 @@ public class HoodieTestSuiteJob {
       HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), cfg.targetBasePath,
           HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived");
     }
+
+    if (cfg.cleanInput) {
+      Path inputPath = new Path(cfg.inputBasePath);
+      if (fs.exists(inputPath)) {
+        fs.delete(inputPath, true);
+      }
+    }
+
+    if (cfg.cleanOutput) {
+      Path outputPath = new Path(cfg.targetBasePath);
+      if (fs.exists(outputPath)) {
+        fs.delete(outputPath, true);
+      }
+    }
   }
 
   private static HiveConf getDefaultHiveConf(Configuration cfg) {
@@ -175,9 +189,24 @@ public class HoodieTestSuiteJob {
         required = true)
     public Long limitFileSize = 1024 * 1024 * 120L;
 
+    @Parameter(names = {"--input-parallelism"}, description = "Parallelism to use when generation input files",
+        required = false)
+    public Integer inputParallelism = 0;
+
+    @Parameter(names = {"--delete-old-input"}, description = "Delete older input files once they have been ingested",
+        required = false)
+    public Boolean deleteOldInput = false;
+
     @Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to "
         + "perform ingestion. If set to false, HoodieWriteClient will be used")
     public Boolean useDeltaStreamer = false;
 
+    @Parameter(names = {"--clean-input"}, description = "Clean the input folders and delete all files within it "
+        + "before starting the Job")
+    public Boolean cleanInput = false;
+
+    @Parameter(names = {"--clean-output"}, description = "Clean the output folders and delete all files within it "
+        + "before starting the Job")
+    public Boolean cleanOutput = false;
   }
 }
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
index 2915628..0ac3668 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
@@ -36,15 +36,22 @@ public class DFSDeltaConfig extends DeltaConfig {
   private final Long maxFileSize;
   // The current batch id
   private Integer batchId;
+  // Paralleism to use when generating input data
+  private int inputParallelism;
+  // Whether to delete older input data once it has been ingested
+  private boolean deleteOldInputData;
 
   public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType,
                         SerializableConfiguration configuration,
-                        String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize) {
+                        String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize,
+                        int inputParallelism, boolean deleteOldInputData) {
     super(deltaOutputMode, deltaInputType, configuration);
     this.deltaBasePath = deltaBasePath;
     this.schemaStr = schemaStr;
     this.maxFileSize = maxFileSize;
     this.datasetOutputPath = targetBasePath;
+    this.inputParallelism = inputParallelism;
+    this.deleteOldInputData = deleteOldInputData;
   }
 
   public String getDeltaBasePath() {
@@ -70,4 +77,12 @@ public class DFSDeltaConfig extends DeltaConfig {
   public void setBatchId(Integer batchId) {
     this.batchId = batchId;
   }
+
+  public int getInputParallelism() {
+    return inputParallelism;
+  }
+
+  public boolean shouldDeleteOldInputData() {
+    return deleteOldInputData;
+  }
 }
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 7a66681..db15604 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -83,6 +83,7 @@ public class DeltaConfig implements Serializable {
     private static String DISABLE_INGEST = "disable_ingest";
     private static String HIVE_LOCAL = "hive_local";
     private static String REINIT_CONTEXT = "reinitialize_context";
+    private static String START_PARTITION = "start_partition";
 
     private Map<String, Object> configsMap;
 
@@ -118,8 +119,12 @@ public class DeltaConfig implements Serializable {
       return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_UPSERT, 0).toString());
     }
 
+    public int getStartPartition() {
+      return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString());
+    }
+
     public int getNumUpsertFiles() {
-      return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 1).toString());
+      return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 0).toString());
     }
 
     public double getFractionUpsertPerFile() {
@@ -207,6 +212,11 @@ public class DeltaConfig implements Serializable {
         return this;
       }
 
+      public Builder withStartPartition(int startPartition) {
+        this.configsMap.put(START_PARTITION, startPartition);
+        return this;
+      }
+
       public Builder withNumTimesToRepeat(int repeatCount) {
         this.configsMap.put(REPEAT_COUNT, repeatCount);
         return this;
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
index 21a84db..e457f0a 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
@@ -67,10 +67,11 @@ public class WriterContext {
       this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
       String schemaStr = schemaProvider.getSourceSchema().toString();
       this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr);
+      int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism : jsc.defaultParallelism();
       this.deltaGenerator = new DeltaGenerator(
           new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName),
               new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath,
-              schemaStr, cfg.limitFileSize),
+              schemaStr, cfg.limitFileSize, inputParallelism, cfg.deleteOldInput),
           jsc, sparkSession, schemaStr, keyGenerator);
       log.info(String.format("Initialized writerContext with: %s", schemaStr));
     } catch (Exception e) {
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index 8dc7f4b..dc991b1 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -28,9 +28,17 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.StreamSupport;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.integ.testsuite.converter.UpdateConverter;
 import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
 import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
@@ -41,7 +49,6 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter;
 import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory;
 import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
-import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -58,7 +65,7 @@ public class DeltaGenerator implements Serializable {
 
   private static Logger log = LoggerFactory.getLogger(DeltaGenerator.class);
 
-  private DeltaConfig deltaOutputConfig;
+  private DFSDeltaConfig deltaOutputConfig;
   private transient JavaSparkContext jsc;
   private transient SparkSession sparkSession;
   private String schemaStr;
@@ -66,7 +73,7 @@ public class DeltaGenerator implements Serializable {
   private List<String> partitionPathFieldNames;
   private int batchId;
 
-  public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession,
+  public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession,
                         String schemaStr, BuiltinKeyGenerator keyGenerator) {
     this.deltaOutputConfig = deltaOutputConfig;
     this.jsc = jsc;
@@ -77,6 +84,16 @@ public class DeltaGenerator implements Serializable {
   }
 
   public JavaRDD<DeltaWriteStats> writeRecords(JavaRDD<GenericRecord> records) {
+    if (deltaOutputConfig.shouldDeleteOldInputData() && batchId > 1) {
+      Path oldInputDir = new Path(deltaOutputConfig.getDeltaBasePath(), Integer.toString(batchId - 1));
+      try {
+        FileSystem fs = FSUtils.getFs(oldInputDir.toString(), deltaOutputConfig.getConfiguration());
+        fs.delete(oldInputDir, true);
+      } catch (IOException e) {
+        log.error("Failed to delete older input data direcory " + oldInputDir, e);
+      }
+    }
+
     // The following creates a new anonymous function for iterator and hence results in serialization issues
     JavaRDD<DeltaWriteStats> ws = records.mapPartitions(itr -> {
       try {
@@ -95,11 +112,22 @@ public class DeltaGenerator implements Serializable {
     int numPartitions = operation.getNumInsertPartitions();
     long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
     int minPayloadSize = operation.getRecordSize();
-    JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
-        .repartition(numPartitions).mapPartitions(p -> {
+    int startPartition = operation.getStartPartition();
+
+    // Each spark partition below will generate records for a single partition given by the integer index.
+    List<Integer> partitionIndexes = IntStream.rangeClosed(0 + startPartition, numPartitions + startPartition)
+        .boxed().collect(Collectors.toList());
+
+    JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes, numPartitions)
+        .mapPartitionsWithIndex((index, p) -> {
           return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
-            minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions));
-        });
+            minPayloadSize, schemaStr, partitionPathFieldNames, (Integer)index));
+        }, true);
+
+    if (deltaOutputConfig.getInputParallelism() < numPartitions) {
+      inputBatch = inputBatch.coalesce(deltaOutputConfig.getInputParallelism());
+    }
+
     return inputBatch;
   }
 
@@ -131,9 +159,11 @@ public class DeltaGenerator implements Serializable {
           }
         }
 
-        log.info("Repartitioning records");
         // persist this since we will make multiple passes over this
-        adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
+        int numPartition = Math.min(deltaOutputConfig.getInputParallelism(),
+            Math.max(1, config.getNumUpsertPartitions()));
+        log.info("Repartitioning records into " + numPartition + " partitions");
+        adjustedRDD = adjustedRDD.repartition(numPartition);
         log.info("Repartitioning records done");
         UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(),
             partitionPathFieldNames, recordRowKeyFieldNames);
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
index 512118f..270dcd1 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
@@ -18,8 +18,11 @@
 
 package org.apache.hudi.integ.testsuite.generator;
 
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
@@ -37,18 +40,18 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericR
   // Store last record for the partition path of the first payload to be used for all subsequent generated payloads
   private GenericRecord lastRecord;
   // Partition path field name
-  private List<String> partitionPathFieldNames;
+  private Set<String> partitionPathFieldNames;
 
   public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, String schema) {
-    this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS);
+    this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
   }
 
   public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int minPayloadSize, String schemaStr,
-      List<String> partitionPathFieldNames, int numPartitions) {
+      List<String> partitionPathFieldNames, int partitionIndex) {
     this.counter = maxEntriesToProduce;
-    this.partitionPathFieldNames = partitionPathFieldNames;
+    this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames);
     Schema schema = new Schema.Parser().parse(schemaStr);
-    this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions);
+    this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, partitionIndex);
   }
 
   @Override
@@ -60,7 +63,7 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericR
   public GenericRecord next() {
     this.counter--;
     if (lastRecord == null) {
-      GenericRecord record = this.generator.getNewPayload();
+      GenericRecord record = this.generator.getNewPayload(partitionPathFieldNames);
       lastRecord = record;
       return record;
     } else {
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
index df9a449..f61fad6 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
@@ -22,10 +22,12 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -47,14 +49,13 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
   private static Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
 
   public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
-  public static final int DEFAULT_NUM_DATE_PARTITIONS = 50;
   protected final Random random = new Random();
   // The source schema used to generate a payload
   private final transient Schema baseSchema;
   // Used to validate a generic record
   private final transient GenericData genericData = new GenericData();
-  // The number of unique dates to create
-  private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS;
+  // The index of partition for which records are being generated
+  private int partitionIndex = 0;
   // The size of a full record where every field of a generic record created contains 1 random value
   private final int estimatedFullPayloadSize;
   // Number of extra entries to add in a complex/collection field to achieve the desired record size
@@ -89,9 +90,9 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
     }
   }
 
-  public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int numDatePartitions) {
+  public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int partitionIndex) {
     this(schema, minPayloadSize);
-    this.numDatePartitions = numDatePartitions;
+    this.partitionIndex = partitionIndex;
   }
 
   protected static boolean isPrimitive(Schema localSchema) {
@@ -115,7 +116,50 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
   }
 
   protected GenericRecord getNewPayload(Schema schema) {
-    return randomize(new GenericData.Record(schema), null);
+    return create(schema, null);
+  }
+
+  /**
+   * Create a new {@link GenericRecord} with random value according to given schema.
+   *
+   * Long fields which are specified within partitionPathFieldNames are constrained to the value of the partition
+   * for which records are being generated.
+   *
+   * @return {@link GenericRecord} with random value
+   */
+  public GenericRecord getNewPayload(Set<String> partitionPathFieldNames) {
+    return create(baseSchema, partitionPathFieldNames);
+  }
+
+  protected GenericRecord create(Schema schema, Set<String> partitionPathFieldNames) {
+    GenericRecord result = new GenericData.Record(schema);
+    for (Schema.Field f : schema.getFields()) {
+      if (isPartialLongField(f, partitionPathFieldNames)) {
+        // This is a long field used as partition field. Set it to seconds since epoch.
+        long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS);
+        result.put(f.name(), (long)value);
+      } else {
+        result.put(f.name(), typeConvert(f));
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Return true if this is a partition field of type long which should be set to the partition index.
+   * @return
+   */
+  private boolean isPartialLongField(Schema.Field field, Set<String> partitionPathFieldNames) {
+    if ((partitionPathFieldNames == null) || !partitionPathFieldNames.contains(field.name())) {
+      return false;
+    }
+
+    Schema fieldSchema = field.schema();
+    if (isOption(fieldSchema)) {
+      fieldSchema = getNonNull(fieldSchema);
+    }
+
+    return fieldSchema.getType() == org.apache.avro.Schema.Type.LONG;
   }
 
   /**
@@ -125,7 +169,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
    * @param blacklistFields Fields whose value should not be touched
    * @return The updated {@link GenericRecord}
    */
-  public GenericRecord getUpdatePayload(GenericRecord record, List<String> blacklistFields) {
+  public GenericRecord getUpdatePayload(GenericRecord record, Set<String> blacklistFields) {
     return randomize(record, blacklistFields);
   }
 
@@ -158,7 +202,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
    * @param blacklistFields blacklistFields where the filed will not be randomized.
    * @return Randomized GenericRecord.
    */
-  protected GenericRecord randomize(GenericRecord record, List<String> blacklistFields) {
+  protected GenericRecord randomize(GenericRecord record, Set<String> blacklistFields) {
     for (Schema.Field f : record.getSchema().getFields()) {
       if (blacklistFields == null || !blacklistFields.contains(f.name())) {
         record.put(f.name(), typeConvert(f));
@@ -167,12 +211,6 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
     return record;
   }
 
-  private long getNextConstrainedLong() {
-    int numPartitions = random.nextInt(numDatePartitions);
-    long unixTimeStamp = TimeUnit.SECONDS.convert(numPartitions, TimeUnit.DAYS);
-    return unixTimeStamp;
-  }
-
   /**
    * Generate random value according to their type.
    */
@@ -191,7 +229,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
       case INT:
         return random.nextInt();
       case LONG:
-        return getNextConstrainedLong();
+        return random.nextLong();
       case STRING:
        return UUID.randomUUID().toString();
       case ENUM:
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
index a33ef0c..d9d137a 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.integ.testsuite.generator;
 
-import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
@@ -31,14 +33,14 @@ public class UpdateGeneratorIterator implements Iterator<GenericRecord> {
 
   // Use the full payload generator as default
   private GenericRecordFullPayloadGenerator generator;
-  private List<String> blackListedFields;
+  private Set<String> blackListedFields;
   // iterator
   private Iterator<GenericRecord> itr;
 
   public UpdateGeneratorIterator(Iterator<GenericRecord> itr, String schemaStr, List<String> partitionPathFieldNames,
       List<String> recordKeyFieldNames, int minPayloadSize) {
     this.itr = itr;
-    this.blackListedFields = new ArrayList<>();
+    this.blackListedFields = new HashSet<>();
     this.blackListedFields.addAll(partitionPathFieldNames);
     this.blackListedFields.addAll(recordKeyFieldNames);
     Schema schema = new Schema.Parser().parse(schemaStr);
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index e209118..cfe7991 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -36,6 +36,7 @@ import java.util.stream.StreamSupport;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
@@ -78,8 +79,10 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
   }
 
   protected List<String> getPartitions(Option<Integer> partitionsLimit) throws IOException {
-    List<String> partitionPaths = FSUtils
-        .getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), false);
+    // Using FSUtils.getFS here instead of metaClient.getFS() since we dont want to count these listStatus
+    // calls in metrics as they are not part of normal HUDI operation.
+    FileSystem fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf());
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath(), false);
     // Sort partition so we can pick last N partitions by default
     Collections.sort(partitionPaths);
     if (!partitionPaths.isEmpty()) {
@@ -136,6 +139,9 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
     // Read all file slices in the partition
     JavaPairRDD<String, Iterator<FileSlice>> partitionToFileSlice = getPartitionToFileSlice(metaClient,
         partitionPaths);
+    Map<String, Integer> partitionToFileIdCountMap = partitionToFileSlice
+        .mapToPair(p -> new Tuple2<>(p._1, iteratorSize(p._2))).collectAsMap();
+
     // TODO : read record count from metadata
     // Read the records in a single file
     long recordsInSingleFile = iteratorSize(readParquetOrLogFiles(getSingleSliceFromRDD(partitionToFileSlice)));
@@ -144,7 +150,11 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
     if (!numFiles.isPresent() || numFiles.get() == 0) {
       // If num files are not passed, find the number of files to update based on total records to update and records
       // per file
-      numFilesToUpdate = (int) (numRecordsToUpdate.get() / recordsInSingleFile);
+      numFilesToUpdate = (int)Math.ceil((double)numRecordsToUpdate.get() / recordsInSingleFile);
+      // recordsInSingleFile is not average so we still need to account for bias is records distribution
+      // in the files. Limit to the maximum number of files available.
+      int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
+      numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount);
       log.info("Files to update {}", numFilesToUpdate);
       numRecordsToUpdatePerFile = recordsInSingleFile;
     } else {
@@ -154,9 +164,10 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
       numRecordsToUpdatePerFile = percentageRecordsPerFile.isPresent() ? (long) (recordsInSingleFile
           * percentageRecordsPerFile.get()) : numRecordsToUpdate.get() / numFilesToUpdate;
     }
+
     // Adjust the number of files to read per partition based on the requested partition & file counts
     Map<String, Integer> adjustedPartitionToFileIdCountMap = getFilesToReadPerPartition(partitionToFileSlice,
-        partitionPaths.size(), numFilesToUpdate);
+        partitionPaths.size(), numFilesToUpdate, partitionToFileIdCountMap);
     JavaRDD<GenericRecord> updates = projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap,
         partitionToFileSlice, numFilesToUpdate, (int) numRecordsToUpdatePerFile));
     if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && numFiles.get() != 0 && numRecordsToUpdate.get()
@@ -190,10 +201,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
   }
 
   private Map<String, Integer> getFilesToReadPerPartition(JavaPairRDD<String, Iterator<FileSlice>>
-      partitionToFileSlice, Integer numPartitions, Integer numFiles) {
-    int numFilesPerPartition = (int) Math.ceil(numFiles / numPartitions);
-    Map<String, Integer> partitionToFileIdCountMap = partitionToFileSlice
-        .mapToPair(p -> new Tuple2<>(p._1, iteratorSize(p._2))).collectAsMap();
+      partitionToFileSlice, Integer numPartitions, Integer numFiles, Map<String, Integer> partitionToFileIdCountMap) {
     long totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
     ValidationUtils.checkArgument(totalExistingFilesCount >= numFiles, "Cannot generate updates "
         + "for more files than present in the dataset, file requested " + numFiles + ", files present "
@@ -204,7 +212,9 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
         .sorted(comparingByValue())
         .collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2,
             LinkedHashMap::new));
+
     // Limit files to be read per partition
+    int numFilesPerPartition = (int) Math.ceil((double)numFiles / numPartitions);
     Map<String, Integer> adjustedPartitionToFileIdCountMap = new HashMap<>();
     partitionToFileIdCountSortedMap.entrySet().stream().forEach(e -> {
       if (e.getValue() <= numFilesPerPartition) {
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
index ff41b44..ff92bd0 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
@@ -125,7 +125,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends UtilitiesTestBase {
   public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws IOException {
     DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, DeltaInputType.AVRO,
         new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath, dfsBasePath,
-        schemaProvider.getSourceSchema().toString(), 10240L);
+        schemaProvider.getSourceSchema().toString(), 10240L, jsc.defaultParallelism(), false);
     DeltaWriterAdapter<GenericRecord> dfsDeltaWriterAdapter = DeltaWriterFactory
         .getDeltaWriterAdapter(dfsSinkConfig, 1);
     FlexibleSchemaRecordGenerationIterator itr = new FlexibleSchemaRecordGenerationIterator(1000,
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
index 7524d4a..9451595 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
@@ -24,7 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.IntStream;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -92,7 +94,8 @@ public class TestGenericRecordPayloadGenerator {
       insertRowKeys.add(record.get("_row_key").toString());
       insertTimeStamps.add((Long) record.get("timestamp"));
     });
-    List<String> blacklistFields = Arrays.asList("_row_key");
+    Set<String> blacklistFields = new HashSet<>();
+    blacklistFields.add("_row_key");
     records.stream().forEach(a -> {
       // Generate 10 updated records
       GenericRecord record = payloadGenerator.getUpdatePayload(a, blacklistFields);