You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/10/22 06:20:21 UTC

[GitHub] [hudi] prashantwason opened a new pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

prashantwason opened a new pull request #2197:
URL: https://github.com/apache/hudi/pull/2197


   ## What is the purpose of the pull request
   
   Please see HUDI-1351 for description of the issues that are being fixed here.
   
   ## Brief change log
   
   1. Added the --clean-input and --clean-output parameters to clean the input and output directories before starting the job
   2. Added the --delete-old-input parameter to deleted older batches for data already ingested. This helps keep number of redundant files low.
   3. Added the --input-parallelism parameter to restrict the parallelism when generating input data. This helps keeping the number of generated input files low.
   4. Added an option start_offset to Dag Nodes. Without ability to specify start offsets, data is generated into existing partitions. With start offset, DAG can control on which partition, the data is to be written.
   5. Fixed generation of records for correct number of partitions
     - In the existing implementation, the partition is chosen as a random long. This does not guarantee exact number of requested partitions to be created.
   6. Changed variable blacklistedFields to be a Set as that is faster than List for membership checks.
   7. Fixed integer division for Math.ceil. If two integers are divided, the result is not double unless one of the integer is casted to double.
   
   ## Verify this pull request
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r511706331



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -58,15 +63,15 @@
 
   private static Logger log = LoggerFactory.getLogger(DeltaGenerator.class);
 
-  private DeltaConfig deltaOutputConfig;
+  private DFSDeltaConfig deltaOutputConfig;

Review comment:
       What is the purpose behind this change ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] prashantwason commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
prashantwason commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r512988166



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
##########
@@ -36,15 +36,22 @@
   private final Long maxFileSize;
   // The current batch id
   private Integer batchId;
+  // Paralleism to use when generating input data
+  int inputParallelism;

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r511706427



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -77,6 +82,16 @@ public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, Spark
   }
 
   public JavaRDD<DeltaWriteStats> writeRecords(JavaRDD<GenericRecord> records) {
+    if (deltaOutputConfig.shouldDeleteOldInputData() && batchId > 1) {
+      Path oldInputDir = new Path(deltaOutputConfig.getDeltaBasePath(), Integer.toString(batchId - 1));

Review comment:
       This may not work in case the last batches were rolled back. Can you take a look at RollbackNode and see what will be the implication ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r511704839



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
##########
@@ -36,15 +36,22 @@
   private final Long maxFileSize;
   // The current batch id
   private Integer batchId;
+  // Paralleism to use when generating input data
+  int inputParallelism;

Review comment:
       rename to inputGeneratorParallelism ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] prashantwason commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
prashantwason commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r513059843



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -95,11 +110,19 @@ public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, Spark
     int numPartitions = operation.getNumInsertPartitions();
     long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
     int minPayloadSize = operation.getRecordSize();
-    JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
-        .repartition(numPartitions).mapPartitions(p -> {
+    int startPartition = operation.getStartPartition();

Review comment:
       Suppose you insert 5 partitions. Then the following 5 new LazyRecordGeneratorIterator will be created:
      new LazyRecordGeneratorIterator(..., 0)
      new LazyRecordGeneratorIterator(..., 1)
      new LazyRecordGeneratorIterator(..., 2)
      new LazyRecordGeneratorIterator(..., 3)
      new LazyRecordGeneratorIterator(..., 4)
   
   Within the LazyRecordGeneratorIterator code, the integer for partition index (0, 1, .. above) are converted into partition timstamp (as date offset from 1970/01/01). So the first LazyRecordGeneratorIterator will be generating records from 1970/01/01, the second LazyRecordGeneratorIterator will generate records for 1970/01/02 ... and so on.
   
   With this schema, the record generation always starts at offset 0. But what if you want to generate for only a specific partition? Or add new partition? This is where the start_offset comes into play.
   
      new LazyRecordGeneratorIterator(..., 0 + start_offset)
      new LazyRecordGeneratorIterator(..., 1 + start_offset)
      new LazyRecordGeneratorIterator(..., 2 + start_offset)
      new LazyRecordGeneratorIterator(..., 3 + start_offset)
      new LazyRecordGeneratorIterator(..., 4 + start_offset)
   
   By using a start_offset you can alter where the inserts will take place. Also new partitions can be created.
   
   
   Spark retries can alter the partition numbers here. For that, we can use a pre-formatted List with partitions here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r513535190



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -95,11 +110,19 @@ public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, Spark
     int numPartitions = operation.getNumInsertPartitions();
     long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
     int minPayloadSize = operation.getRecordSize();
-    JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
-        .repartition(numPartitions).mapPartitions(p -> {
+    int startPartition = operation.getStartPartition();

Review comment:
       Okay, that makes sense @prashantwason. Spark retries are pretty common, lets handle that use-case 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] prashantwason commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
prashantwason commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r513694959



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -95,11 +110,19 @@ public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, Spark
     int numPartitions = operation.getNumInsertPartitions();
     long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
     int minPayloadSize = operation.getRecordSize();
-    JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
-        .repartition(numPartitions).mapPartitions(p -> {
+    int startPartition = operation.getStartPartition();

Review comment:
       Already done. Please see the update.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash merged pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
n3nash merged pull request #2197:
URL: https://github.com/apache/hudi/pull/2197


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] prashantwason commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
prashantwason commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r512989527



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -58,15 +63,15 @@
 
   private static Logger log = LoggerFactory.getLogger(DeltaGenerator.class);
 
-  private DeltaConfig deltaOutputConfig;
+  private DFSDeltaConfig deltaOutputConfig;

Review comment:
       DFSDeltaConfig extends DeltaConfig
   
   The two settings I have added (getInputParallelism and shouldDeleteOldInputData) are in DFSDeltaConfig.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] prashantwason commented on pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
prashantwason commented on pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#issuecomment-718148074


   @n3nash  I have made the requested change.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r511704768



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
##########
@@ -36,15 +36,22 @@
   private final Long maxFileSize;
   // The current batch id
   private Integer batchId;
+  // Paralleism to use when generating input data
+  int inputParallelism;

Review comment:
       can you explicitly add scope "private" ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r511706536



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -95,11 +110,19 @@ public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, Spark
     int numPartitions = operation.getNumInsertPartitions();
     long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
     int minPayloadSize = operation.getRecordSize();
-    JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
-        .repartition(numPartitions).mapPartitions(p -> {
+    int startPartition = operation.getStartPartition();

Review comment:
       Can you please explain the startPartition with an example ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] prashantwason commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
prashantwason commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r512991547



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -77,6 +82,16 @@ public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, Spark
   }
 
   public JavaRDD<DeltaWriteStats> writeRecords(JavaRDD<GenericRecord> records) {
+    if (deltaOutputConfig.shouldDeleteOldInputData() && batchId > 1) {
+      Path oldInputDir = new Path(deltaOutputConfig.getDeltaBasePath(), Integer.toString(batchId - 1));

Review comment:
       RollbackNode will rollback the last commit. This should not interfere will these input directories.
   
   The shouldDeleteOldInputData() setting only affects the data generated in the "input" directory (a separate directory) which is not part of the HUDI dataset under test. For each Node in the yaml, a sub-directory in the input directory (identified by batchId) is created. Within this new sub-directory, the data to be ingested as part of the Node is written as avro files. 
   
   We are deleting older input sub-directories. The default is to not delete anything. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2197: [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing.

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2197:
URL: https://github.com/apache/hudi/pull/2197#discussion_r511706536



##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -95,11 +110,19 @@ public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, Spark
     int numPartitions = operation.getNumInsertPartitions();
     long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
     int minPayloadSize = operation.getRecordSize();
-    JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
-        .repartition(numPartitions).mapPartitions(p -> {
+    int startPartition = operation.getStartPartition();

Review comment:
       Can you please explain the startPartition with an example ? What happens when a spark stage is retried ? Take a look at how spark stage retries mess up the partition numbers to understand more..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org