You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "nsivabalan (via GitHub)" <gi...@apache.org> on 2023/03/30 00:44:32 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #8290: [HUDI-5983] Improve loading data via cloud store incr source

nsivabalan commented on code in PR #8290:
URL: https://github.com/apache/hudi/pull/8290#discussion_r1152618564


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectsFetcher.java:
##########
@@ -21,29 +21,33 @@
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
+import org.apache.hudi.utilities.sources.helpers.CloudObject;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 
 import java.io.Serializable;
 import java.util.List;
+
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectsPerPartition;
 import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.CLOUD_DATAFILE_EXTENSION;
 import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX;
 import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR;
 import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SELECT_RELATIVE_PATH_PREFIX;
 
 /**
- * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset as input.
+ * Extracts a list of GCS {@link CloudObject} containing filepaths from a given Spark Dataset as input.
  * Optionally:
  * i) Match the filename and path against provided input filter strings
  * ii) Check if each file exists on GCS, in which case it assumes SparkContext is already
  * configured with GCS options through GcsEventsHoodieIncrSource.addGcsAccessConfs().
  */
-public class FilePathsFetcher implements Serializable {
+public class GcsObjectsFetcher implements Serializable {

Review Comment:
   GcsObjectsFetcher and GcsObjectsDataFetcher is still confusing a bit. 
   can we name these as 
   GcsObjectsPathFetcher
   GcsObjectsDataFetcher



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -115,4 +139,41 @@ private static boolean checkIfFileExists(String storageUrlSchemePrefix, String b
       throw new HoodieIOException(errMsg, ioe);
     }
   }
+
+  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObject> cloudObjects, TypedProperties props, String fileFormat) {
+    LOG.debug("Extracted distinct files " + cloudObjects.size()
+        + " and some samples " + cloudObjects.stream().map(CloudObject::getPath).limit(10).collect(Collectors.toList()));
+
+    if (isNullOrEmpty(cloudObjects)) {
+      return Option.empty();
+    }
+    DataFrameReader reader = spark.read().format(fileFormat);
+    String datasourceOpts = props.getString(SPARK_DATASOURCE_OPTIONS, null);
+    if (StringUtils.isNullOrEmpty(datasourceOpts)) {
+      // fall back to legacy config for BWC. TODO consolidate in HUDI-5780
+      datasourceOpts = props.getString(S3EventsHoodieIncrSource.Config.SPARK_DATASOURCE_OPTIONS, null);
+    }
+    if (StringUtils.nonEmpty(datasourceOpts)) {
+      final ObjectMapper mapper = new ObjectMapper();
+      Map<String, String> sparkOptionsMap = null;
+      try {
+        sparkOptionsMap = mapper.readValue(datasourceOpts, Map.class);
+      } catch (IOException e) {
+        throw new HoodieException(String.format("Failed to parse sparkOptions: %s", datasourceOpts), e);
+      }
+      LOG.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
+      reader = reader.options(sparkOptionsMap);
+    }
+    List<String> paths = new ArrayList<>();
+    long totalSize = 0;
+    for (CloudObject o: cloudObjects) {
+      paths.add(o.getPath());
+      totalSize += o.getSize();
+    }
+    // inflate 10% for potential hoodie meta fields
+    totalSize *= 1.1;
+    long parquetMaxFileSize = props.getLong(PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
+    int numPartitions = (int) Math.max(totalSize / parquetMaxFileSize, 1);
+    return Option.of(reader.load(paths.toArray(new String[cloudObjects.size()])).coalesce(numPartitions));

Review Comment:
   don't we need to do repartition here? 
   coalesce [may not increase](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.coalesce.html) the partitions.
   for eg, if we have one file from S3 which is 1GB, we want to repartition so that we get 10 spark partitions. 
   
   may be we can optimize based on total size and max Parquet size. if its less we can do coalesce, if its higher, we can do repartition. 
   
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -115,4 +139,41 @@ private static boolean checkIfFileExists(String storageUrlSchemePrefix, String b
       throw new HoodieIOException(errMsg, ioe);
     }
   }
+
+  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObject> cloudObjects, TypedProperties props, String fileFormat) {
+    LOG.debug("Extracted distinct files " + cloudObjects.size()

Review Comment:
   do you think we should do Logger.isDebugEnabled() ? 
   since this might trigger the dag, just want to be cautious



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -115,4 +129,41 @@ private static boolean checkIfFileExists(String storageUrlSchemePrefix, String b
       throw new HoodieIOException(errMsg, ioe);
     }
   }
+
+  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObject> cloudObjects, TypedProperties props, String fileFormat) {
+    LOG.debug("Extracted distinct files " + cloudObjects.size()
+        + " and some samples " + cloudObjects.stream().map(CloudObject::getPath).limit(10).collect(Collectors.toList()));
+
+    if (isNullOrEmpty(cloudObjects)) {
+      return Option.empty();
+    }
+    DataFrameReader reader = spark.read().format(fileFormat);
+    String datasourceOpts = props.getString(SPARK_DATASOURCE_OPTIONS, null);
+    if (StringUtils.isNullOrEmpty(datasourceOpts)) {
+      // fall back to legacy config for BWC. TODO consolidate in HUDI-5780
+      datasourceOpts = props.getString(S3EventsHoodieIncrSource.Config.SPARK_DATASOURCE_OPTIONS, null);
+    }
+    if (StringUtils.nonEmpty(datasourceOpts)) {
+      final ObjectMapper mapper = new ObjectMapper();
+      Map<String, String> sparkOptionsMap = null;
+      try {
+        sparkOptionsMap = mapper.readValue(datasourceOpts, Map.class);
+      } catch (IOException e) {
+        throw new HoodieException(String.format("Failed to parse sparkOptions: %s", datasourceOpts), e);
+      }
+      LOG.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
+      reader = reader.options(sparkOptionsMap);
+    }
+    List<String> paths = new ArrayList<>();
+    long totalSize = 0;
+    for (CloudObject o: cloudObjects) {
+      paths.add(o.getPath());
+      totalSize += o.getSize();
+    }
+    // inflate 10% for potential hoodie meta fields
+    totalSize *= 1.1;

Review Comment:
   I feel we can't we can do 10%. our meta fields overhead is not proportional to payload size. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -115,4 +129,41 @@ private static boolean checkIfFileExists(String storageUrlSchemePrefix, String b
       throw new HoodieIOException(errMsg, ioe);
     }
   }
+
+  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObject> cloudObjects, TypedProperties props, String fileFormat) {
+    LOG.debug("Extracted distinct files " + cloudObjects.size()
+        + " and some samples " + cloudObjects.stream().map(CloudObject::getPath).limit(10).collect(Collectors.toList()));
+
+    if (isNullOrEmpty(cloudObjects)) {
+      return Option.empty();
+    }
+    DataFrameReader reader = spark.read().format(fileFormat);
+    String datasourceOpts = props.getString(SPARK_DATASOURCE_OPTIONS, null);
+    if (StringUtils.isNullOrEmpty(datasourceOpts)) {
+      // fall back to legacy config for BWC. TODO consolidate in HUDI-5780
+      datasourceOpts = props.getString(S3EventsHoodieIncrSource.Config.SPARK_DATASOURCE_OPTIONS, null);
+    }
+    if (StringUtils.nonEmpty(datasourceOpts)) {
+      final ObjectMapper mapper = new ObjectMapper();
+      Map<String, String> sparkOptionsMap = null;
+      try {
+        sparkOptionsMap = mapper.readValue(datasourceOpts, Map.class);
+      } catch (IOException e) {
+        throw new HoodieException(String.format("Failed to parse sparkOptions: %s", datasourceOpts), e);
+      }
+      LOG.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
+      reader = reader.options(sparkOptionsMap);
+    }
+    List<String> paths = new ArrayList<>();
+    long totalSize = 0;
+    for (CloudObject o: cloudObjects) {
+      paths.add(o.getPath());
+      totalSize += o.getSize();
+    }
+    // inflate 10% for potential hoodie meta fields
+    totalSize *= 1.1;

Review Comment:
   we can get avg record size from latest commit metadata right? is that doable ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org