You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/18 06:21:14 UTC

[GitHub] [hudi] danny0405 commented on a change in pull request #4837: [HUDI-3446][HUDI-FLINK] support batch Reader in BootstrapOperator#loadRecords

danny0405 commented on a change in pull request #4837:
URL: https://github.com/apache/hudi/pull/4837#discussion_r809692854



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
##########
@@ -171,6 +172,33 @@ public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path
    */
   public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath);
 
+  /**
+   * Fetch {@link HoodieKey}s from the given data file.
+   * @param reader        The file reader
+   * @param filePath      The data file path
+   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+   */
+  public abstract List<HoodieKey> fetchRecordKeyPartitionPath(BaseFileReader reader, Path filePath, int batchSize);
+
+  /**
+   * Open File Reader.
+   * @param configuration        configuration to build file reader
+   * @param filePath      The data file path
+   * @param keyGeneratorOpt instance of KeyGenerator
+   * @return file reader
+   */
+  public abstract BaseFileReader getReader(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt);
+
+  /**
+   * Open File Reader.
+   * @param configuration        configuration to build file reader
+   * @param filePath      The data file path
+   * @return file reader
+   */
+  public BaseFileReader getReader(Configuration configuration, Path filePath) {
+    return getReader(configuration, filePath, Option.empty());

Review comment:
       ditto

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
##########
@@ -134,17 +162,30 @@
   @Override
   public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema avroSchema) {
     List<GenericRecord> records = new ArrayList<>();
+    Reader reader = null;
+    RecordReader recordReader = null;
     try {

Review comment:
       ```java
   final Reader reader = null;
   final RecordReader recordReader = null;
   ```

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
##########
@@ -171,6 +172,33 @@ public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path
    */
   public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath);
 
+  /**
+   * Fetch {@link HoodieKey}s from the given data file.
+   * @param reader        The file reader
+   * @param filePath      The data file path
+   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+   */
+  public abstract List<HoodieKey> fetchRecordKeyPartitionPath(BaseFileReader reader, Path filePath, int batchSize);
+
+  /**
+   * Open File Reader.
+   * @param configuration        configuration to build file reader
+   * @param filePath      The data file path
+   * @param keyGeneratorOpt instance of KeyGenerator
+   * @return file reader
+   */
+  public abstract BaseFileReader getReader(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt);
+

Review comment:
       `getReader` => `getRecordKeyPartitionPathReader`

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
##########
@@ -239,11 +291,30 @@ public Schema readAvroSchema(Configuration conf, Path orcFilePath) {
 
   @Override
   public long getRowCount(Configuration conf, Path orcFilePath) {
-    try {
-      Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
+    try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) {
       return reader.getNumberOfRows();
     } catch (IOException io) {
       throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io);
     }
   }
+
+  private class OrcFileInnerReader extends BaseFileReader {
+    Reader reader;

Review comment:
       `OrcFileInnerReader` => `OrcReaderWrapper`

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
##########
@@ -211,16 +213,29 @@ protected void loadRecords(String partitionPath) throws Exception {
             return;
           }
 
-          final List<HoodieKey> hoodieKeys;
-          try {
-            hoodieKeys =
-                fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
-          } catch (Exception e) {
-            throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
-          }
-
-          for (HoodieKey hoodieKey : hoodieKeys) {
-            output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
+          if (this.batchSize > 0) {
+            List<HoodieKey> hoodieKeys;
+            Path filePath = new Path(baseFile.getPath());
+            try (BaseFileUtils.BaseFileReader reader = fileUtils.getReader(this.hadoopConf, filePath)) {
+              do {
+                hoodieKeys = fileUtils.fetchRecordKeyPartitionPath(reader, filePath, batchSize);

Review comment:
       1. Can we have method `BaseFileReader#hasNextBatch` and `BaseFileReader#NextBatch` then iterator over that ?
   2. Can we also have a constant batch size `1024` and remove the config option then ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
##########
@@ -100,6 +126,9 @@
           String rowKey = rowKeys.toString(i);
           String partitionPath = partitionPaths.toString(i);
           hoodieKeys.add(new HoodieKey(rowKey, partitionPath));
+          if (batchSize > 0 && hoodieKeys.size() >= batchSize) {
+            break;
+          }

Review comment:
       1. I see that there is already a batchSize param in `VectorizedRowBatch` and the default val is 1024, can we use that
   2. We should not skip the for-loop early in while reading a batch, which would cause data lost, instead we can put it out of the for-loop, that means we must read a full `VectorizedRowBatch`
   3. Considering there is already a batch there, the more reasonable way to control the read buffer is a param like `int batches`, which means the number of read batches once a time. Or you can divide the `batchSize` by the `VectorizedRowBatch` size to get the vector batch number to read.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -143,11 +148,20 @@
         fields.addAll(keyGenerator.getPartitionPathFields());
         return HoodieAvroUtils.getSchemaForFields(readAvroSchema(conf, filePath), fields);
       })
-          .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema());
+              .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema());
       AvroReadSupport.setAvroReadSchema(conf, readSchema);

Review comment:
       Fix the indentation.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
##########
@@ -171,6 +172,33 @@ public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path
    */
   public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath);
 
+  /**
+   * Fetch {@link HoodieKey}s from the given data file.
+   * @param reader        The file reader
+   * @param filePath      The data file path
+   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file

Review comment:
       1. Please align the comments of the param document, please fix all the documents
   2. add document for `batchSize`

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -424,4 +435,17 @@ private static BigDecimal extractDecimal(Object val, DecimalMetadata decimalMeta
       throw new UnsupportedOperationException(String.format("Unsupported value type (%s)", val.getClass().getName()));
     }
   }
+
+  private class ParquetFileInnerReader extends BaseFileReader {
+    ParquetReader reader;

Review comment:
       `ParquetFileInnerReader` => `ParquetReaderWrapper`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org