You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/22 04:51:25 UTC

[GitHub] [hudi] codope commented on a diff in pull request #6176: [HUDI-4445] S3 Incremental source improvements

codope commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r927282574


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
     }
 
-    String filter = "s3.object.size > 0";
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key not like '" + props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING, null))) {
-      filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // add file format filtering by default
-    filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+    Column filterColumn = s3EventsColumnFilter(fileFormat);
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
     // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
-        .filter(filter)
-        .select("s3.bucket.name", "s3.object.key")
-        .distinct()
-        .collectAsList();
     // Create S3 paths
     final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+    SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
+        .filter(filterColumn)
+        .select("s3.bucket.name", "s3.object.key")
+        .distinct()
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = serializableConfiguration.newCopy();
+            String bucket = row.getString(0);
+            String filePath = s3Prefix + bucket + "/" + row.getString(1);
+            try {
+              String decodeUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name());
+              if (checkExists) {
+                FileSystem fs = FSUtils.getFs(s3Prefix + bucket, configuration);
+                try {
+                  if (fs.exists(new Path(decodeUrl))) {
+                    cloudFilesPerPartition.add(decodeUrl);
+                  }
+                } catch (IOException e) {
+                  LOG.error(String.format("Error while checking path exists for %s ", decodeUrl), e);
+                }
+              } else {
+                cloudFilesPerPartition.add(decodeUrl);
+              }
+            } catch (Exception exception) {
+              LOG.warn("Failed to add cloud file ", exception);
+            }
+          });
+          return cloudFilesPerPartition.iterator();
+        }).collect();
     Option<Dataset<Row>> dataset = Option.empty();
     if (!cloudFiles.isEmpty()) {
       DataFrameReader dataFrameReader = getDataFrameReader(fileFormat);
-      dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
+      Dataset ds = addPartitionColumn(dataFrameReader.load(cloudFiles.toArray(new String[0])),cloudFiles);
+      dataset = Option.of(ds);
     }
+    LOG.warn("Extracted distinct files " + cloudFiles.size()

Review Comment:
   i assume it was for testing, change log to `debug` level?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions";
+
+    static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists";
+    static final Boolean DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN = true;

Review Comment:
   Have we fully tested this change? If not, I would suggest keeping the default false for now. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && !StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
+      String partitionKey = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];
+      String partitionPathPattern = String.format("%s=",partitionKey);
+      String filePath = cloudFiles.get(0);
+      List<String> nestedPartition = Arrays.stream(filePath.split("/"))
+          .filter(level -> level.contains(partitionPathPattern)).collect(Collectors.toList());
+      if (nestedPartition.size() > 1) {
+        throw new HoodieException("More than one level of partitioning exists");
+      }
+      if (nestedPartition.size() == 1) {
+        LOG.info(String.format("adding column name = %s to dataset",partitionKey));
+        ds = ds.withColumn(partitionKey, split(split(input_file_name(),
+            partitionPathPattern).getItem(1), "/").getItem(0));
+      }
+    }
+    return ds;
+  }
+
+  private Column s3EventsColumnFilter(String fileFormat) {

Review Comment:
   A minor suggestion to extract such kind of methods to a separate util class and keep this class plain and simple. Or if you prefer to keep these methods in this class for better readability then move it to the bottom (i.e. after the call site) for linear flow.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && !StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
+      String partitionKey = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];
+      String partitionPathPattern = String.format("%s=",partitionKey);
+      String filePath = cloudFiles.get(0);
+      List<String> nestedPartition = Arrays.stream(filePath.split("/"))
+          .filter(level -> level.contains(partitionPathPattern)).collect(Collectors.toList());
+      if (nestedPartition.size() > 1) {
+        throw new HoodieException("More than one level of partitioning exists");

Review Comment:
   Is it planned to be supported sometime in future? If yes, let's create a tracking JIRA for that.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions";
+
+    static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists";

Review Comment:
   ```suggestion
   // Add a comment on the purpose of this config and rename as below
       static final String ADD_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.add.source.partition.column";
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -107,6 +126,45 @@ private DataFrameReader getDataFrameReader(String fileFormat) {
     return dataFrameReader;
   }
 
+  private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) {
+    if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN)
+        && !StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
+      String partitionKey = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0];

Review Comment:
   return early or log error/warn if `partitionKey` is null or empty?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -81,6 +94,12 @@ static class Config {
      * - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
      */
     static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";
+
+    // ToDo make it a list of extensions
+    static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions";

Review Comment:
   Is this a list of supported source data files extensions, e.g. .json, .parquet, .avro, etc?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -156,53 +214,52 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
     }
 
-    String filter = "s3.object.size > 0";
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX, null))) {
-      filter = filter + " and s3.object.key not like '" + props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
-    }
-    if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING, null))) {
-      filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
-    }
-    // add file format filtering by default
-    filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
-
+    Column filterColumn = s3EventsColumnFilter(fileFormat);
     String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
     String s3Prefix = s3FS + "://";
 
     // Extract distinct file keys from s3 meta hoodie table
-    final List<Row> cloudMetaDf = source
-        .filter(filter)
-        .select("s3.bucket.name", "s3.object.key")
-        .distinct()
-        .collectAsList();
     // Create S3 paths
     final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK);
-    List<String> cloudFiles = new ArrayList<>();
-    for (Row row : cloudMetaDf) {
-      // construct file path, row index 0 refers to bucket and 1 refers to key
-      String bucket = row.getString(0);
-      String filePath = s3Prefix + bucket + "/" + row.getString(1);
-      if (checkExists) {
-        FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration());
-        try {
-          if (fs.exists(new Path(filePath))) {
-            cloudFiles.add(filePath);
-          }
-        } catch (IOException e) {
-          LOG.error(String.format("Error while checking path exists for %s ", filePath), e);
-        }
-      } else {
-        cloudFiles.add(filePath);
-      }
-    }
+    SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sparkContext.hadoopConfiguration());
+    List<String> cloudFiles = source
+        .filter(filterColumn)
+        .select("s3.bucket.name", "s3.object.key")
+        .distinct()
+        .rdd().toJavaRDD().mapPartitions(fileListIterator -> {
+          List<String> cloudFilesPerPartition = new ArrayList<>();
+          fileListIterator.forEachRemaining(row -> {
+            final Configuration configuration = serializableConfiguration.newCopy();

Review Comment:
   Why creating a copy again? I don't see any config modification happening within the executor. Why not pass `serializableConfiguration` simply?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org