You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/22 02:21:30 UTC

[GitHub] [hudi] vamshigv opened a new pull request, #6176: S3 Incremental source improvements

vamshigv opened a new pull request, #6176:
URL: https://github.com/apache/hudi/pull/6176

   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contribute/how-to-contribute before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   *(For example: This pull request adds quick-start document.)*
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1192190280

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ff79e179d4ed9d6e300dfb609379860b7b9c9ce Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1193925439

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313",
       "triggerID" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e783a886bb0288647776e8645aa7a765f2edd6fd Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv closed pull request #6176: S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
vamshigv closed pull request #6176: S3 Incremental source improvements
URL: https://github.com/apache/hudi/pull/6176


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1196995967

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313",
       "triggerID" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10375",
       "triggerID" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d1d558f59d5cf997040c8a493e0c3479a1143517",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10395",
       "triggerID" : "d1d558f59d5cf997040c8a493e0c3479a1143517",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e9fd966c00d63200168f42e65b10303bcee5859d Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10375) 
   * d1d558f59d5cf997040c8a493e0c3479a1143517 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10395) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by "vamshigv (via GitHub)" <gi...@apache.org>.
vamshigv commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1550660363

   Closing this as not needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r927831833


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions";
+
+    static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists";

Review Comment:
   ditto. same naming rules here



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
     }
 
-    String filter = "s3.object.size > 0";
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key not like '" + props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING, null))) {
-      filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // add file format filtering by default
-    filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+    Column filterColumn = s3EventsColumnFilter(fileFormat);
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
     // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
-        .filter(filter)
-        .select("s3.bucket.name", "s3.object.key")
-        .distinct()
-        .collectAsList();
     // Create S3 paths
     final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+    SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
+        .filter(filterColumn)
+        .select("s3.bucket.name", "s3.object.key")
+        .distinct()
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = serializableConfiguration.newCopy();
+            String bucket = row.getString(0);
+            String filePath = s3Prefix + bucket + "/" + row.getString(1);
+            try {
+              String decodeUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name());
+              if (checkExists) {
+                FileSystem fs = FSUtils.getFs(s3Prefix + bucket, configuration);
+                try {
+                  if (fs.exists(new Path(decodeUrl))) {

Review Comment:
   creating hadoop Path gives much more memory overhead than normal instantiation. If just for checking, let's find a better way.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions";

Review Comment:
   
   ```suggestion
       static final String S3INCR_FILE_EXTENSIONS_OPTIONS = "hoodie.deltastreamer.source.s3incr.file.extensions";
   ```
   
   should align with the actual key, and suffix `OPTIONS` since it is a key not the extensions 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && !StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
+      String partitionKey = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];
+      String partitionPathPattern = String.format("%s=",partitionKey);
+      String filePath = cloudFiles.get(0);
+      List<String> nestedPartition = Arrays.stream(filePath.split("/"))
+          .filter(level -> level.contains(partitionPathPattern)).collect(Collectors.toList());
+      if (nestedPartition.size() > 1) {
+        throw new HoodieException("More than one level of partitioning exists");

Review Comment:
   multiple level partition is very common. so this is a major limitation? if push this out, how would it affect existing users?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1196150720

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313",
       "triggerID" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10375",
       "triggerID" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e9fd966c00d63200168f42e65b10303bcee5859d Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10375) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r1188065909


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -224,6 +232,6 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
     }
     LOG.debug("Extracted distinct files " + cloudFiles.size()
         + " and some samples " + cloudFiles.stream().limit(10).collect(Collectors.toList()));
-    return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight());
+    return Pair.of(dataset, sourceMetadata.getRight());
   }
-}
+}

Review Comment:
   we should have the EOL 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1197108505

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313",
       "triggerID" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10375",
       "triggerID" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d1d558f59d5cf997040c8a493e0c3479a1143517",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10395",
       "triggerID" : "d1d558f59d5cf997040c8a493e0c3479a1143517",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d1d558f59d5cf997040c8a493e0c3479a1143517 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10395) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1193812603

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ff79e179d4ed9d6e300dfb609379860b7b9c9ce Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182) 
   * e783a886bb0288647776e8645aa7a765f2edd6fd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r928818365


##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.hudi.common.testutils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+// Utility for the schema of S3 events listed here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html)

Review Comment:
   should be multi-line comment



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -274,6 +277,11 @@ public RawTripTestPayload generatePayloadForShortTripSchema(HoodieKey key, Strin
     return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), SHORT_TRIP_SCHEMA);
   }
 
+  public RawTripTestPayload generatePayloadForS3EventsSchema(HoodieKey key, String commitTime) throws IOException {

Review Comment:
   `RawTripTestPayload` assumes some form of trips schema. If you look at its constructor, we don't use the schema. And its APIs assume a few things about the schema. Should we keep all this out of `HoodieTestDataGenerator`?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.hudi.common.testutils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+// Utility for the schema of S3 events listed here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html)
+public class S3EventsSchemaUtils {
+  public static final String DEFAULT_STRING_VALUE = "default_string";
+
+  public static String generateSchemaString() {
+    return generateS3EventSchema().toString();
+  }
+
+  public static Schema generateObjInfoSchema() {
+    Schema objInfo = SchemaBuilder.record("objInfo")
+        .fields()
+        .requiredString("key")
+        .requiredLong("size")
+        .endRecord();
+    return objInfo;
+  }
+
+  public static GenericRecord generateObjInfoRecord(String key, Long size) {
+    GenericRecord rec = new GenericData.Record(generateObjInfoSchema());
+    rec.put("key", key);
+    rec.put("size", size);
+    return rec;
+  }
+
+  public static Schema generateS3MetadataSchema() {
+    Schema s3Metadata = SchemaBuilder.record("s3Metadata")
+        .fields()
+        .requiredString("configurationId")
+        .name("object")
+        .type(generateObjInfoSchema())
+        .noDefault()
+        .endRecord();
+    return s3Metadata;
+  }
+
+  public static GenericRecord generateS3MetadataRecord(GenericRecord objRecord) {
+    GenericRecord rec = new GenericData.Record(generateS3MetadataSchema());
+    rec.put("configurationId", DEFAULT_STRING_VALUE);
+    rec.put("object", objRecord);
+    return rec;
+  }
+
+  public static Schema generateS3EventSchema() {
+    Schema s3Event = SchemaBuilder.record("s3Event")
+        .fields()
+        .requiredString("eventSource")
+        .requiredString("eventName")
+        .name("s3")

Review Comment:
   Let's extract all these strings to constants.



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/GenericTestPayload.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Generic class for specific payload implementations to inherit from.
+ */
+public abstract class GenericTestPayload {

Review Comment:
   Maybe rename to AbstractJsonTestPayload? It's essentially for json data right?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/GenericTestPayload.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Generic class for specific payload implementations to inherit from.
+ */
+public abstract class GenericTestPayload {
+
+  protected static final transient ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  protected String partitionPath;
+  protected String rowKey;
+  protected byte[] jsonDataCompressed;
+  protected int dataSize;
+  protected boolean isDeleted;
+  protected Comparable orderingVal;
+
+  public GenericTestPayload(Option<String> jsonData, String rowKey, String partitionPath, String schemaStr,
+                            Boolean isDeleted, Comparable orderingVal) throws IOException {
+    if (jsonData.isPresent()) {
+      this.jsonDataCompressed = compressData(jsonData.get());
+      this.dataSize = jsonData.get().length();
+    }
+    this.rowKey = rowKey;
+    this.partitionPath = partitionPath;
+    this.isDeleted = isDeleted;
+    this.orderingVal = orderingVal;
+  }
+
+  public GenericTestPayload(String jsonData, String rowKey, String partitionPath, String schemaStr) throws IOException {
+    this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L);
+  }
+
+  public GenericTestPayload(String jsonData) throws IOException {
+    this.jsonDataCompressed = compressData(jsonData);
+    this.dataSize = jsonData.length();
+    Map<String, Object> jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class);
+    this.rowKey = jsonRecordMap.get("_row_key").toString();
+    this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/");
+    this.isDeleted = false;
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
+  public IndexedRecord getRecordToInsert(Schema schema) throws IOException {
+    MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
+    return jsonConverter.convert(getJsonData(), schema);
+  }
+
+  public String getRowKey() {
+    return rowKey;
+  }
+
+  public String getJsonData() throws IOException {
+    return unCompressData(jsonDataCompressed);
+  }
+
+  private byte[] compressData(String jsonData) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DeflaterOutputStream dos = new DeflaterOutputStream(baos, new Deflater(Deflater.BEST_COMPRESSION), true);
+    try {
+      dos.write(jsonData.getBytes());
+    } finally {
+      dos.flush();
+      dos.close();

Review Comment:
   Will this close the ByteArrayOutputStream too?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -189,33 +194,36 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
         .filter(filter)
         .select("s3.bucket.name", "s3.object.key")
         .distinct()
-        .mapPartitions((MapPartitionsFunction<Row, String>)  fileListIterator -> {
+        .rdd()
+        // JavaRDD simplifies coding with collect and suitable mapPartitions signature. check if this can be avoided.
+        .toJavaRDD()
+        .mapPartitions(fileListIterator -> {
           List<String> cloudFilesPerPartition = new ArrayList<>();
-          final Configuration configuration = serializableConfiguration.newCopy();
           fileListIterator.forEachRemaining(row -> {
+            // TODO: configuration is updated in the getFs call. check if new copy is needed w.r.t to getFs.

Review Comment:
   Is this still required?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.S3EventsSchemaUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.S3_EVENTS_SCHEMA;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarness {
+  private HoodieTestDataGenerator dataGen;
+  private HoodieTableMetaClient metaClient;
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    dataGen = new HoodieTestDataGenerator();
+    metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+  }
+
+  @Test
+  public void testHoodieIncrSource() throws IOException {

Review Comment:
   Maybe rename to testS3EventsHoodieIncrSource?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+// Utility for the schema of S3 events listed here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html)

Review Comment:
   Should be multi line comment.



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -216,13 +218,15 @@ public int getEstimatedFileSizeInBytes(int numOfRecords) {
     return numOfRecords * BYTES_PER_RECORD + BLOOM_FILTER_BYTES;
   }
 
-  public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
+  public HoodieRecordPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException {

Review Comment:
   Why this rename?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -161,11 +159,18 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
           .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
               queryTypeAndInstantEndpts.getRight().getRight()));
     }
+    return Pair.of(Option.of(source), queryTypeAndInstantEndpts.getRight().getRight());
+  }
 
-    if (source.isEmpty()) {
-      return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+    Pair<Option<Dataset<Row>>, String> sourceMetadata = fetchMetadata(lastCkptStr, sourceLimit);
+    if (!sourceMetadata.getKey().isPresent()) {

Review Comment:
   sourceMetadata.getLeft?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+// Utility for the schema of S3 events listed here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html)
+public class S3EventsSchemaUtils {
+  public static final String DEFAULT_STRING_VALUE = "default_string";
+  public static final GenericRecord DEFAULT_S3_BUCKET_RECORD;
+  static {
+    GenericRecord rec = new GenericData.Record(generateBucketInfoSchema());
+    rec.put("name", "default_s3_bucket");
+    DEFAULT_S3_BUCKET_RECORD = rec;
+  }
+
+  public static String generateSchemaString() {
+    return generateS3EventSchema().toString();
+  }
+
+  public static Schema generateObjInfoSchema() {
+    Schema objInfo = SchemaBuilder.record("objInfo")
+        .fields()
+        .requiredString("key")
+        .requiredLong("size")
+        .endRecord();
+    return objInfo;
+  }
+
+  public static Schema generateBucketInfoSchema() {
+    Schema bucketInfo = SchemaBuilder.record("bucketInfo")
+        .fields()
+        .requiredString("name")
+        .endRecord();
+    return bucketInfo;
+  }
+
+  public static GenericRecord generateObjInfoRecord(String key, Long size) {
+    GenericRecord rec = new GenericData.Record(generateObjInfoSchema());
+    rec.put("key", key);
+    rec.put("size", size);
+    return rec;
+  }
+
+  public static Schema generateS3MetadataSchema() {
+    Schema s3Metadata = SchemaBuilder.record("s3Metadata")
+        .fields()
+        .requiredString("configurationId")
+        .name("object")
+        .type(generateObjInfoSchema())
+        .noDefault()
+        .name("bucket")
+        .type(generateBucketInfoSchema())
+        .noDefault()
+        .endRecord();
+    return s3Metadata;
+  }
+
+  public static GenericRecord generateS3MetadataRecord(GenericRecord objRecord) {
+    GenericRecord rec = new GenericData.Record(generateS3MetadataSchema());
+    rec.put("configurationId", DEFAULT_STRING_VALUE);
+    rec.put("object", objRecord);
+    rec.put("bucket", DEFAULT_S3_BUCKET_RECORD);
+    return rec;
+  }
+
+  public static Schema generateS3EventSchema() {
+    Schema s3Event = SchemaBuilder.record("s3Event")
+        .fields()
+        .requiredString("eventSource")
+        .requiredString("eventName")
+        .requiredString("_row_key")
+        .name("s3")

Review Comment:
   Preferably extract all these as static string constants.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r927282574


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
     }
 
-    String filter = "s3.object.size > 0";
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key not like '" + props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING, null))) {
-      filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // add file format filtering by default
-    filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+    Column filterColumn = s3EventsColumnFilter(fileFormat);
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
     // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
-        .filter(filter)
-        .select("s3.bucket.name", "s3.object.key")
-        .distinct()
-        .collectAsList();
     // Create S3 paths
     final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+    SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
+        .filter(filterColumn)
+        .select("s3.bucket.name", "s3.object.key")
+        .distinct()
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = serializableConfiguration.newCopy();
+            String bucket = row.getString(0);
+            String filePath = s3Prefix + bucket + "/" + row.getString(1);
+            try {
+              String decodeUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name());
+              if (checkExists) {
+                FileSystem fs = FSUtils.getFs(s3Prefix + bucket, configuration);
+                try {
+                  if (fs.exists(new Path(decodeUrl))) {
+                    cloudFilesPerPartition.add(decodeUrl);
+                  }
+                } catch (IOException e) {
+                  LOG.error(String.format("Error while checking path exists for %s ", decodeUrl), e);
+                }
+              } else {
+                cloudFilesPerPartition.add(decodeUrl);
+              }
+            } catch (Exception exception) {
+              LOG.warn("Failed to add cloud file ", exception);
+            }
+          });
+          return cloudFilesPerPartition.iterator();
+        }).collect();
     Option<Dataset<Row>> dataset = Option.empty();
     if (!cloudFiles.isEmpty()) {
       DataFrameReader dataFrameReader = getDataFrameReader(fileFormat);
-      dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
+      Dataset ds = addPartitionColumn(dataFrameReader.load(cloudFiles.toArray(new String[0])),cloudFiles);
+      dataset = Option.of(ds);
     }
+    LOG.warn("Extracted distinct files " + cloudFiles.size()

Review Comment:
   i assume it was for testing, change log to `debug` level?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions";
+
+    static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists";
+    static final Boolean DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN = true;

Review Comment:
   Have we fully tested this change? If not, I would suggest keeping the default false for now. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && !StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
+      String partitionKey = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];
+      String partitionPathPattern = String.format("%s=",partitionKey);
+      String filePath = cloudFiles.get(0);
+      List<String> nestedPartition = Arrays.stream(filePath.split("/"))
+          .filter(level -> level.contains(partitionPathPattern)).collect(Collectors.toList());
+      if (nestedPartition.size() > 1) {
+        throw new HoodieException("More than one level of partitioning exists");
+      }
+      if (nestedPartition.size() == 1) {
+        LOG.info(String.format("adding column name = %s to dataset",partitionKey));
+        ds = ds.withColumn(partitionKey, split(split(input_file_name(),
+            partitionPathPattern).getItem(1), "/").getItem(0));
+      }
+    }
+    return ds;
+  }
+
+  private Column s3EventsColumnFilter(String fileFormat) {

Review Comment:
   A minor suggestion to extract such kind of methods to a separate util class and keep this class plain and simple. Or if you prefer to keep these methods in this class for better readability then move it to the bottom (i.e. after the call site) for linear flow.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && !StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
+      String partitionKey = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];
+      String partitionPathPattern = String.format("%s=",partitionKey);
+      String filePath = cloudFiles.get(0);
+      List<String> nestedPartition = Arrays.stream(filePath.split("/"))
+          .filter(level -> level.contains(partitionPathPattern)).collect(Collectors.toList());
+      if (nestedPartition.size() > 1) {
+        throw new HoodieException("More than one level of partitioning exists");

Review Comment:
   Is it planned to be supported sometime in future? If yes, let's create a tracking JIRA for that.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions";
+
+    static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists";

Review Comment:
   ```suggestion
   // Add a comment on the purpose of this config and rename as below
       static final String ADD_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.add.source.partition.column";
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && !StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
+      String partitionKey = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];

Review Comment:
   return early or log error/warn if `partitionKey` is null or empty?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions";

Review Comment:
   Is this a list of supported source data files extensions, e.g. .json, .parquet, .avro, etc?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
     }
 
-    String filter = "s3.object.size > 0";
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key not like '" + props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING, null))) {
-      filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // add file format filtering by default
-    filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+    Column filterColumn = s3EventsColumnFilter(fileFormat);
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
     // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
-        .filter(filter)
-        .select("s3.bucket.name", "s3.object.key")
-        .distinct()
-        .collectAsList();
     // Create S3 paths
     final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+    SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
+        .filter(filterColumn)
+        .select("s3.bucket.name", "s3.object.key")
+        .distinct()
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = serializableConfiguration.newCopy();

Review Comment:
   Why creating a copy again? I don't see any config modification happening within the executor. Why not pass `serializableConfiguration` simply?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1196100077

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313",
       "triggerID" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10375",
       "triggerID" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e783a886bb0288647776e8645aa7a765f2edd6fd Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313) 
   * e9fd966c00d63200168f42e65b10303bcee5859d Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10375) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv closed pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by "vamshigv (via GitHub)" <gi...@apache.org>.
vamshigv closed pull request #6176: [HUDI-4445] S3 Incremental source improvements
URL: https://github.com/apache/hudi/pull/6176


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1192123762

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r1188004960


##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -274,6 +277,11 @@ public RawTripTestPayload generatePayloadForShortTripSchema(HoodieKey key, Strin
     return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), SHORT_TRIP_SCHEMA);
   }
 
+  public RawTripTestPayload generatePayloadForS3EventsSchema(HoodieKey key, String commitTime) throws IOException {

Review Comment:
   This is an old comment. Please check if it's still valid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r1188064051


##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/GenericTestPayload.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Generic class for specific payload implementations to inherit from.
+ */
+public abstract class GenericTestPayload {

Review Comment:
   unsure about the necessity of creating a parent payload in tests. we need just different types of payload to use directly, be it json, avro, spark, etc. we should make test utils/models more straightforward



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1192125776

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1192152022

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179) 
   * 0ff79e179d4ed9d6e300dfb609379860b7b9c9ce Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r1188065742


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -189,33 +194,36 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
         .filter(filter)
         .select("s3.bucket.name", "s3.object.key")
         .distinct()
-        .mapPartitions((MapPartitionsFunction<Row, String>)  fileListIterator -> {
+        .rdd()
+        // JavaRDD simplifies coding with collect and suitable mapPartitions signature. check if this can be avoided.
+        .toJavaRDD()
+        .mapPartitions(fileListIterator -> {

Review Comment:
   we usually prefer high level dataframe apis. how is it actually beneficial to convert to rdd here? don't quite get the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r1188070807


##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventTestPayload.java:
##########
@@ -0,0 +1,53 @@
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Test payload for S3 event here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html).
+ */
+public class S3EventTestPayload extends GenericTestPayload implements HoodieRecordPayload<S3EventTestPayload> {

Review Comment:
   there is a lot of existing misused with the RawTripTestPayload see https://issues.apache.org/jira/browse/HUDI-6164
   
   so you may want to decouple the improvement changes from payload changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1193857984

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313",
       "triggerID" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ff79e179d4ed9d6e300dfb609379860b7b9c9ce Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182) 
   * e783a886bb0288647776e8645aa7a765f2edd6fd Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r927845153


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
     }
 
-    String filter = "s3.object.size > 0";
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key not like '" + props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING, null))) {
-      filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // add file format filtering by default
-    filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+    Column filterColumn = s3EventsColumnFilter(fileFormat);
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
     // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
-        .filter(filter)
-        .select("s3.bucket.name", "s3.object.key")
-        .distinct()
-        .collectAsList();
     // Create S3 paths
     final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+    SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
+        .filter(filterColumn)
+        .select("s3.bucket.name", "s3.object.key")
+        .distinct()
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {

Review Comment:
   why convert to RDD? you should be able to do mapPartitions with Dataset too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
vamshigv commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1197368170

   Priority is to land https://github.com/apache/hudi/pull/6228 ahead of this while this can make it to the next release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r1188063109


##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/GenericTestPayload.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Generic class for specific payload implementations to inherit from.
+ */
+public abstract class GenericTestPayload {
+
+  protected static final transient ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  protected String partitionPath;
+  protected String rowKey;
+  protected byte[] jsonDataCompressed;
+  protected int dataSize;
+  protected boolean isDeleted;
+  protected Comparable orderingVal;
+
+  public GenericTestPayload(Option<String> jsonData, String rowKey, String partitionPath, String schemaStr,
+                            Boolean isDeleted, Comparable orderingVal) throws IOException {
+    if (jsonData.isPresent()) {
+      this.jsonDataCompressed = compressData(jsonData.get());
+      this.dataSize = jsonData.get().length();
+    }
+    this.rowKey = rowKey;
+    this.partitionPath = partitionPath;
+    this.isDeleted = isDeleted;
+    this.orderingVal = orderingVal;
+  }
+
+  public GenericTestPayload(String jsonData, String rowKey, String partitionPath, String schemaStr) throws IOException {
+    this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L);
+  }
+
+  public GenericTestPayload(String jsonData) throws IOException {
+    this.jsonDataCompressed = compressData(jsonData);
+    this.dataSize = jsonData.length();
+    Map<String, Object> jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class);
+    this.rowKey = jsonRecordMap.get("_row_key").toString();
+    this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/");

Review Comment:
   i recall this logic has been refactored in the current `RawTripTestPayload`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1196990772

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313",
       "triggerID" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10375",
       "triggerID" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d1d558f59d5cf997040c8a493e0c3479a1143517",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d1d558f59d5cf997040c8a493e0c3479a1143517",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e9fd966c00d63200168f42e65b10303bcee5859d Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10375) 
   * d1d558f59d5cf997040c8a493e0c3479a1143517 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1192129920

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179) 
   * 0ff79e179d4ed9d6e300dfb609379860b7b9c9ce UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6176:
URL: https://github.com/apache/hudi/pull/6176#issuecomment-1196097810

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10179",
       "triggerID" : "e434c6b5bb7b4dc8ee24164a01d58cf0e03207cd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10182",
       "triggerID" : "0ff79e179d4ed9d6e300dfb609379860b7b9c9ce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313",
       "triggerID" : "e783a886bb0288647776e8645aa7a765f2edd6fd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e9fd966c00d63200168f42e65b10303bcee5859d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e783a886bb0288647776e8645aa7a765f2edd6fd Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10313) 
   * e9fd966c00d63200168f42e65b10303bcee5859d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r1188069077


##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventTestPayload.java:
##########
@@ -0,0 +1,53 @@
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Test payload for S3 event here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html).
+ */
+public class S3EventTestPayload extends GenericTestPayload implements HoodieRecordPayload<S3EventTestPayload> {

Review Comment:
   I'd suggest just test with DefaultHoodieRecordPayload with a specific S3 event schema, instead of creating a new test payload, as we want to test as close as the real scenario. Besides, we don't couple payload with schema, as payload is just responsible for how to merge



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org