You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/13 06:28:24 UTC

[GitHub] [hudi] nsivabalan opened a new pull request, #6661: [HUDI-4853] Speeding up reading S3 files in S3EventsIncrSource

nsivabalan opened a new pull request, #6661:
URL: https://github.com/apache/hudi/pull/6661

   ### Change Logs
   
   _Describe context and summary for this change. Highlight if any code was copied._
   
   ### Impact
   
   _Describe any public API or user-facing feature change or any performance impact._
   
   **Risk level: none | low | medium | high**
   
   _Choose one. If medium or high, explain what verification was done to mitigate the risks._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vinothchandar commented on a diff in pull request #6661: [HUDI-4853] Speeding up reading S3 files in S3EventsIncrSource

Posted by "vinothchandar (via GitHub)" <gi...@apache.org>.
vinothchandar commented on code in PR #6661:
URL: https://github.com/apache/hudi/pull/6661#discussion_r1178763050


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -213,15 +216,27 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
             }
           });
           return cloudFilesPerPartition.iterator();
-        }, Encoders.STRING()).collectAsList();
+        }, Encoders.STRING());
 
     Option<Dataset<Row>> dataset = Option.empty();
+    cloudFiles.cache();
     if (!cloudFiles.isEmpty()) {
-      DataFrameReader dataFrameReader = getDataFrameReader(fileFormat);
-      dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
+      JavaRDD<Dataset<Row>> datasetIterator = cloudFiles.javaRDD().flatMap(new FlatMapFunction<String, Dataset<Row>>() {
+        @Override
+        public Iterator<Dataset<Row>> call(String cloudFile) throws Exception {
+          return Collections.singletonList(getDataFromFile(cloudFile, fileFormat)).iterator();
+        }
+      });
+      Dataset<Row> finalDataset = datasetIterator.fold(sparkSession.emptyDataFrame(), (Function2<Dataset<Row>, Dataset<Row>, Dataset<Row>>) (v1, v2) -> v1.union(v2));
+      dataset = Option.of(finalDataset);
     }
-    LOG.debug("Extracted distinct files " + cloudFiles.size()
-        + " and some samples " + cloudFiles.stream().limit(10).collect(Collectors.toList()));
+    LOG.debug("Extracted distinct files " + cloudFiles.count()

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] the-other-tim-brown commented on a diff in pull request #6661: [HUDI-4853] Speeding up reading S3 files in S3EventsIncrSource

Posted by GitBox <gi...@apache.org>.
the-other-tim-brown commented on code in PR #6661:
URL: https://github.com/apache/hudi/pull/6661#discussion_r1008922752


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -213,15 +216,27 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
             }
           });
           return cloudFilesPerPartition.iterator();
-        }, Encoders.STRING()).collectAsList();
+        }, Encoders.STRING());
 
     Option<Dataset<Row>> dataset = Option.empty();
+    cloudFiles.cache();
     if (!cloudFiles.isEmpty()) {
-      DataFrameReader dataFrameReader = getDataFrameReader(fileFormat);
-      dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
+      JavaRDD<Dataset<Row>> datasetIterator = cloudFiles.javaRDD().flatMap(new FlatMapFunction<String, Dataset<Row>>() {
+        @Override
+        public Iterator<Dataset<Row>> call(String cloudFile) throws Exception {
+          return Collections.singletonList(getDataFromFile(cloudFile, fileFormat)).iterator();
+        }
+      });
+      Dataset<Row> finalDataset = datasetIterator.fold(sparkSession.emptyDataFrame(), (Function2<Dataset<Row>, Dataset<Row>, Dataset<Row>>) (v1, v2) -> v1.union(v2));
+      dataset = Option.of(finalDataset);
     }
-    LOG.debug("Extracted distinct files " + cloudFiles.size()
-        + " and some samples " + cloudFiles.stream().limit(10).collect(Collectors.toList()));
+    LOG.debug("Extracted distinct files " + cloudFiles.count()

Review Comment:
   I think this will always evaluate the count and collect the first ten elements to compute the value of the string before sending the value to the `LOG.debug` method. I think in this occasion we want to check if debug logs are enabled before executing this to avoid the extra overhead. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #6661: [HUDI-4853] Speeding up reading S3 files in S3EventsIncrSource

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on code in PR #6661:
URL: https://github.com/apache/hudi/pull/6661#discussion_r1008901845


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -217,11 +220,18 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
 
     Option<Dataset<Row>> dataset = Option.empty();
     if (!cloudFiles.isEmpty()) {
-      DataFrameReader dataFrameReader = getDataFrameReader(fileFormat);
-      dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
+      JavaRDD<Dataset<Row>> datasetIterator = sparkContext.parallelize(cloudFiles, cloudFiles.size()).flatMap(

Review Comment:
   yes, have fixed it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] the-other-tim-brown commented on a diff in pull request #6661: [HUDI-4853] Speeding up reading S3 files in S3EventsIncrSource

Posted by GitBox <gi...@apache.org>.
the-other-tim-brown commented on code in PR #6661:
URL: https://github.com/apache/hudi/pull/6661#discussion_r973866219


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -217,11 +220,18 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
 
     Option<Dataset<Row>> dataset = Option.empty();
     if (!cloudFiles.isEmpty()) {
-      DataFrameReader dataFrameReader = getDataFrameReader(fileFormat);
-      dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
+      JavaRDD<Dataset<Row>> datasetIterator = sparkContext.parallelize(cloudFiles, cloudFiles.size()).flatMap(

Review Comment:
   Why do we need to take the list of paths and make it into an rdd instead of keeping the list of paths in its rdd form? Instead of calling `collectAsList()` on line 219, could we just leave that as a Dataset<String>?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [HUDI-4853] Speeding up reading S3 files in S3EventsIncrSource [hudi]

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #6661:
URL: https://github.com/apache/hudi/pull/6661#issuecomment-1965105113

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c397139b7ecf8a8ceb8c83ca797266256dc41071",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c397139b7ecf8a8ceb8c83ca797266256dc41071",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c397139b7ecf8a8ceb8c83ca797266256dc41071 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org