You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/06/28 09:01:51 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #1768: [HUDI-1054][Peformance] Several performance fixes during finalizing writes

vinothchandar commented on a change in pull request #1768:
URL: https://github.com/apache/hudi/pull/1768#discussion_r446621351



##########
File path: hudi-common/pom.xml
##########
@@ -147,6 +147,16 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- Spark -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+    </dependency>
+

Review comment:
       yes.. we cannot depend on spark in `hudi-common` 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -386,13 +389,26 @@ public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWri
    *
    * @param instantTs Instant Time
    */
-  public void deleteMarkerDir(String instantTs) {
+  public void deleteMarkerDir(JavaSparkContext jsc, String instantTs) {
     try {
       FileSystem fs = getMetaClient().getFs();
       Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
+
       if (fs.exists(markerDir)) {
         // For append only case, we do not write to marker dir. Hence, the above check
-        LOG.info("Removing marker directory=" + markerDir);
+        LOG.info("Removing marker directory = " + markerDir);
+
+        FileStatus[] fileStatuses = fs.listStatus(markerDir);

Review comment:
       cc @n3nash should we have flag to protect this for HDFS.. i.e if the recursive delete works better there (IIUC). you might want to tradeoff less RPCs ..? 
   we can override defaults at spark datasource level, and set these based on `StorageSchemes` as well. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -386,13 +389,26 @@ public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWri
    *
    * @param instantTs Instant Time
    */
-  public void deleteMarkerDir(String instantTs) {
+  public void deleteMarkerDir(JavaSparkContext jsc, String instantTs) {
     try {
       FileSystem fs = getMetaClient().getFs();
       Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
+
       if (fs.exists(markerDir)) {
         // For append only case, we do not write to marker dir. Hence, the above check
-        LOG.info("Removing marker directory=" + markerDir);
+        LOG.info("Removing marker directory = " + markerDir);
+
+        FileStatus[] fileStatuses = fs.listStatus(markerDir);

Review comment:
       @umehrot2 so seems like, for object stores this is different..  and makes sense completely to do parallel cleaning of individual files.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -199,16 +201,40 @@ public static String getRelativePartitionPath(Path basePath, Path partitionPath)
     return partitions;
   }
 
-  public static List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
-      String markerDir, String baseFileExtension) throws IOException {
-    List<String> dataFiles = new LinkedList<>();
-    processFiles(fs, markerDir, (status) -> {
-      String pathStr = status.getPath().toString();
-      if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
-        dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs, baseFileExtension));
+  public static Set<String> getAllDataFilesForMarkers(JavaSparkContext jsc, FileSystem fs, String basePath,

Review comment:
       i think this is the reason for needing spark in `hudi-common`.. we can move refactor the code to `hudi-client`.. 
   
   In fact, #1755  has already modularized this more.. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -199,16 +201,40 @@ public static String getRelativePartitionPath(Path basePath, Path partitionPath)
     return partitions;
   }
 
-  public static List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
-      String markerDir, String baseFileExtension) throws IOException {
-    List<String> dataFiles = new LinkedList<>();
-    processFiles(fs, markerDir, (status) -> {
-      String pathStr = status.getPath().toString();
-      if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
-        dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs, baseFileExtension));
+  public static Set<String> getAllDataFilesForMarkers(JavaSparkContext jsc, FileSystem fs, String basePath,
+      String instantTs, String markerDir, String baseFileExtension, int parallelism) throws IOException {
+    FileStatus[] topLevelStatuses = fs.listStatus(new Path(markerDir));
+
+    Set<String> dataFiles = new HashSet<>();
+
+    List<String> subDirectories = new ArrayList<>();
+    for (FileStatus topLevelStatus: topLevelStatuses) {
+      if (topLevelStatus.isFile()) {
+        String pathStr = topLevelStatus.getPath().toString();
+        if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
+          dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs, baseFileExtension));
+        }
+      } else {
+        subDirectories.add(topLevelStatus.getPath().toString());
       }
-      return true;
-    }, false);
+    }
+
+    parallelism = subDirectories.size() < parallelism ? subDirectories.size() : parallelism;

Review comment:
       Math.min(subDirectories.size(), parallelism)?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -199,16 +201,40 @@ public static String getRelativePartitionPath(Path basePath, Path partitionPath)
     return partitions;
   }
 
-  public static List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
-      String markerDir, String baseFileExtension) throws IOException {
-    List<String> dataFiles = new LinkedList<>();
-    processFiles(fs, markerDir, (status) -> {
-      String pathStr = status.getPath().toString();
-      if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
-        dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs, baseFileExtension));
+  public static Set<String> getAllDataFilesForMarkers(JavaSparkContext jsc, FileSystem fs, String basePath,
+      String instantTs, String markerDir, String baseFileExtension, int parallelism) throws IOException {
+    FileStatus[] topLevelStatuses = fs.listStatus(new Path(markerDir));
+
+    Set<String> dataFiles = new HashSet<>();
+
+    List<String> subDirectories = new ArrayList<>();
+    for (FileStatus topLevelStatus: topLevelStatuses) {
+      if (topLevelStatus.isFile()) {
+        String pathStr = topLevelStatus.getPath().toString();
+        if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
+          dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs, baseFileExtension));
+        }
+      } else {
+        subDirectories.add(topLevelStatus.getPath().toString());
       }
-      return true;
-    }, false);
+    }
+
+    parallelism = subDirectories.size() < parallelism ? subDirectories.size() : parallelism;
+    dataFiles.addAll(jsc.parallelize(subDirectories, parallelism).flatMap(directory -> {

Review comment:
       similar question here.. cc @n3nash .. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org