You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/01 04:13:31 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

nsivabalan commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r840216398



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -144,79 +148,52 @@ public HoodieLogBlockType getBlockType() {
   }
 
   @Override
-  protected void createRecordsFromContentBytes() throws IOException {
-    if (enableInlineReading) {
-      getRecords(Collections.emptyList());
-    } else {
-      super.createRecordsFromContentBytes();
-    }
-  }
+  protected List<IndexedRecord> deserializeRecords(byte[] content) throws IOException {
+    checkState(readerSchema != null, "Reader's schema has to be non-null");
 
-  @Override
-  public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
-    readWithInlineFS(keys);
-    return records;
-  }
+    // Get schema from the header
+    Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
 
-  /**
-   * Serialize the record to byte buffer.
-   *
-   * @param record         - Record to serialize
-   * @param keyField - Key field in the schema
-   * @return Serialized byte buffer for the record
-   */
-  private byte[] serializeRecord(final IndexedRecord record, final Option<Field> keyField) {
-    if (keyField.isPresent()) {
-      record.put(keyField.get().pos(), StringUtils.EMPTY_STRING);
-    }
-    return HoodieAvroUtils.indexedRecordToBytes(record);
+    // Read the content
+    HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(content);
+    List<Pair<String, IndexedRecord>> records = reader.readAllRecords(writerSchema, readerSchema);
+
+    return records.stream().map(Pair::getSecond).collect(Collectors.toList());
   }
 
-  private void readWithInlineFS(List<String> keys) throws IOException {
-    boolean enableFullScan = keys.isEmpty();
-    // Get schema from the header
-    Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-    // If readerSchema was not present, use writerSchema
-    if (schema == null) {
-      schema = writerSchema;
-    }
-    Configuration conf = new Configuration();
-    CacheConfig cacheConf = new CacheConfig(conf);
-    Configuration inlineConf = new Configuration();
+  // TODO abstract this w/in HoodieDataBlock
+  @Override
+  protected List<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
+    HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();
+
+    // NOTE: It's important to extend Hadoop configuration here to make sure configuration
+    //       is appropriately carried over
+    Configuration inlineConf = new Configuration(blockContentLoc.getHadoopConf());

Review comment:
       wiring in hadoop conf from higher layers breaks this. Please check https://issues.apache.org/jira/browse/HUDI-3763
   check L185 prior to this patch (where in inline was tested) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org