You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/05 02:47:08 UTC

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r778468264



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -110,59 +136,94 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, Lis
   @Override
   public byte[] getContentBytes() throws IOException {
     // In case this method is called before realizing records from content
-    if (getContent().isPresent()) {
-      return getContent().get();
-    } else if (readBlockLazily && !getContent().isPresent() && records == null) {
-      // read block lazily
-      createRecordsFromContentBytes();
+    Option<byte[]> content = getContent();
+
+    checkState(content.isPresent() || records != null, "Block is in invalid state");
+
+    if (content.isPresent()) {
+      return content.get();
     }
 
-    return serializeRecords();

Review comment:
       Correct. This could should not be reachable (this method is only used on the write path) 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+  public HoodieParquetDataBlock(HoodieLogFile logFile,
+                                FSDataInputStream inputStream,
+                                Option<byte[]> content,
+                                boolean readBlockLazily,
+                                long position, long blockSize, long blockEndPos,
+                                Option<Schema> readerSchema,
+                                Map<HeaderMetadataType, String> header,
+                                Map<HeaderMetadataType, String> footer,
+                                String keyField) {
+    super(content,
+        inputStream,
+        readBlockLazily,
+        Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)),
+        readerSchema,
+        header,
+        footer,
+        keyField,
+        false);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header,
+      @Nonnull String keyField
+  ) {
+    super(records, header, new HashMap<>(), keyField);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header
+  ) {
+    super(records, header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords(List<IndexedRecord> records) throws IOException {
+    if (records.size() == 0) {
+      return new byte[0];
+    }
+
+    Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+        new AvroSchemaConverter().convert(writerSchema), writerSchema, Option.empty());
+
+    HoodieAvroParquetConfig avroParquetConfig =
+        new HoodieAvroParquetConfig(
+            writeSupport,
+            // TODO fetch compression codec from the config
+            CompressionCodecName.GZIP,
+            ParquetWriter.DEFAULT_BLOCK_SIZE,
+            ParquetWriter.DEFAULT_PAGE_SIZE,
+            1024 * 1024 * 1024,
+            new Configuration(),
+            Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+    try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
+      try (HoodieParquetStreamWriter<IndexedRecord> parquetWriter = new HoodieParquetStreamWriter<>(outputStream, avroParquetConfig)) {
+        for (IndexedRecord record : records) {
+          String recordKey = getRecordKey(record);
+          parquetWriter.writeAvro(recordKey, record);
+        }
+        outputStream.flush();
+      }
+    }
+
+    return baos.toByteArray();
+  }
+
+  public static Iterator<IndexedRecord> getProjectedParquetRecordsIterator(Configuration conf,
+                                                                           Schema readerSchema,
+                                                                           InputFile inputFile) throws IOException {
+    AvroReadSupport.setAvroReadSchema(conf, readerSchema);
+    AvroReadSupport.setRequestedProjection(conf, readerSchema);
+    ParquetReader<IndexedRecord> reader =
+        AvroParquetReader.<IndexedRecord>builder(inputFile).withConf(conf).build();
+    return new ParquetReaderIterator<>(reader);
+  }
+
+  /**
+   * NOTE: We're overriding the whole reading sequence to make sure we properly respect
+   *       the requested Reader's schema and only fetch the columns that have been explicitly
+   *       requested by the caller (providing projected Reader's schema)
+   */
+  @Override
+  protected List<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
+    Configuration inlineConf = new Configuration();
+    inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName());
+
+    HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();
+
+    Path inlineLogFilePath = InLineFSUtils.getInlineFilePath(
+        blockContentLoc.getLogFile().getPath(),
+        blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+        blockContentLoc.getContentPositionInLogFile(),
+        blockContentLoc.getBlockSize());
+
+    ArrayList<IndexedRecord> records = new ArrayList<>();
+
+    getProjectedParquetRecordsIterator(
+        inlineConf,

Review comment:
       Makes sense

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -18,59 +18,79 @@
 
 package org.apache.hudi.common.table.log.block;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
-
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * DataBlock contains a list of records serialized using formats compatible with the base file format.
  * For each base file format there is a corresponding DataBlock format.
- *
+ * <p>
  * The Datablock contains:
  *   1. Data Block version
  *   2. Total number of records in the block
  *   3. Actual serialized content of the records
  */
 public abstract class HoodieDataBlock extends HoodieLogBlock {
 
-  protected List<IndexedRecord> records;
-  protected Schema schema;
-  protected String keyField;
+  // TODO rebase records/content to leverage Either to warrant
+  //      that they are mutex (used by read/write flows respectively)
+  private List<IndexedRecord> records;
 
-  public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
-      @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
-      @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
-      FSDataInputStream inputStream, boolean readBlockLazily) {
-    super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
-    this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
-  }
+  /**
+   * Dot-path notation reference to the key field w/in the record's schema
+   */
+  private final String keyFieldRef;

Review comment:
       `keyFieldName` is confusing -- it's not really a field name but rather FQ ref w/in the schema

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -109,7 +116,25 @@ public long getLogBlockLength() {
    * Type of the log block WARNING: This enum is serialized as the ordinal. Only add new enums at the end.
    */
   public enum HoodieLogBlockType {
-    COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, HFILE_DATA_BLOCK
+    COMMAND_BLOCK(":command"),
+    DELETE_BLOCK(":delete"),
+    CORRUPT_BLOCK(":corrupted"),

Review comment:
       Corrupt is a valid adjective, but it's rarely used in a context of "broken" things as far as i can tell (more often used to describe lack of moral integrity like "politician is corrupt")
   
   https://english.stackexchange.com/questions/405182/corrupt-or-corrupted

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -224,6 +226,33 @@ public static HoodieAvroDataBlock getBlock(byte[] content, Schema readerSchema)
     return new HoodieAvroDataBlock(records, readerSchema);
   }
 
+  private static byte[] compress(String text) {

Review comment:
       This is only used in `HoodieAvroDataBlock` w/in the method that is deprecated. Do we want to hang on to it?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.hudi.parquet.io.ByteBufferBackedInputFile;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+  public HoodieParquetDataBlock(
+      HoodieLogFile logFile,
+      FSDataInputStream inputStream,
+      Option<byte[]> content,
+      boolean readBlockLazily, long position, long blockSize, long blockEndpos,
+      Option<Schema> readerSchema,
+      Map<HeaderMetadataType, String> header,
+      Map<HeaderMetadataType, String> footer,
+      String keyField
+  ) {
+    super(
+        content,
+        inputStream,
+        readBlockLazily,
+        Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)),
+        readerSchema,
+        header,
+        footer,
+        keyField,
+        false);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header,
+      @Nonnull String keyField
+  ) {
+    super(records, header, new HashMap<>(), keyField);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header
+  ) {
+    super(records, header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords(List<IndexedRecord> records) throws IOException {
+    if (records.size() == 0) {
+      return new byte[0];
+    }
+
+    Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+        new AvroSchemaConverter().convert(writerSchema), writerSchema, Option.empty());
+
+    HoodieAvroParquetConfig avroParquetConfig =
+        new HoodieAvroParquetConfig(
+            writeSupport,
+            // TODO fetch compression codec from the config
+            CompressionCodecName.GZIP,

Review comment:
       It's a fair call given that this is a new code.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##########
@@ -125,6 +126,11 @@
       .withAlternatives("hoodie.table.rt.file.format")
       .withDocumentation("Log format used for the delta logs.");
 
+  public static final ConfigProperty<String> LOG_BLOCK_TYPE = ConfigProperty

Review comment:
       How would this work? How's block-type going to be determined on the write path? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -288,28 +288,14 @@ public synchronized void close() {
     }
   }
 
-  static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {
+  static class SeekableByteArrayInputStream extends ByteBufferBackedInputStream implements Seekable, PositionedReadable {

Review comment:
       It's not changing, just abstracting common functionality required elsewhere

##########
File path: hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+/**
+ * Implementation of the {@link OutputFile} backed by {@link java.io.OutputStream}
+ */
+public class OutputStreamBackedOutputFile implements OutputFile {

Review comment:
       Correct. These are needed to be able to write Parquet into `OutputStream`

##########
File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.OutputFile;
+
+import java.io.IOException;
+
+// TODO unify w/ HoodieParquetWriter
+public class HoodieParquetStreamWriter<R extends IndexedRecord> implements AutoCloseable {

Review comment:
       `ParquetWriter` ctors that are needed here are package-private and hence i can't extend `HoodieParquetWriter` to use them -- hence i had to wrap around `ParquetWriter` in here to be able to write into `OutputStream`

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -109,7 +116,25 @@ public long getLogBlockLength() {
    * Type of the log block WARNING: This enum is serialized as the ordinal. Only add new enums at the end.
    */
   public enum HoodieLogBlockType {
-    COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, HFILE_DATA_BLOCK
+    COMMAND_BLOCK(":command"),

Review comment:
       To distinguish b/w format-based blocks and action-based blocks

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HadoopMapRedUtils.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.common.util.Option;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HadoopMapRedUtils {

Review comment:
       Fair call, but this is the only way Parquet reader provides insights into how much was actually read. If we don't want to add those we will have to forego these assertions.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -110,59 +132,97 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, Lis
   @Override
   public byte[] getContentBytes() throws IOException {
     // In case this method is called before realizing records from content
-    if (getContent().isPresent()) {
-      return getContent().get();
-    } else if (readBlockLazily && !getContent().isPresent() && records == null) {
-      // read block lazily
-      createRecordsFromContentBytes();
+    Option<byte[]> content = getContent();
+
+    checkState(content.isPresent() || records != null, "Block is in invalid state");
+
+    if (content.isPresent()) {
+      return content.get();
     }
 
-    return serializeRecords();
+    return serializeRecords(records);
   }
 
-  public abstract HoodieLogBlockType getBlockType();
+  protected static Schema getWriterSchema(Map<HeaderMetadataType, String> logBlockHeader) {
+    return new Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
+  }
 
-  public List<IndexedRecord> getRecords() {
+  /**
+   * Returns all the records contained w/in this block
+   */
+  public final List<IndexedRecord> getRecords() {
     if (records == null) {
       try {
         // in case records are absent, read content lazily and then convert to IndexedRecords
-        createRecordsFromContentBytes();
+        records = readRecordsFromBlockPayload();
       } catch (IOException io) {
         throw new HoodieIOException("Unable to convert content bytes to records", io);
       }
     }
     return records;
   }
 
+  public Schema getSchema() {
+    return readerSchema;
+  }
+
   /**
    * Batch get of keys of interest. Implementation can choose to either do full scan and return matched entries or
    * do a seek based parsing and return matched entries.
+   *
    * @param keys keys of interest.
    * @return List of IndexedRecords for the keys of interest.
-   * @throws IOException
+   * @throws IOException in case of failures encountered when reading/parsing records
    */
-  public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
-    throw new UnsupportedOperationException("On demand batch get based on interested keys not supported");
-  }
+  public final List<IndexedRecord> getRecords(List<String> keys) throws IOException {
+    boolean fullScan = keys.isEmpty();
+    if (enablePointLookups && !fullScan) {
+      return lookupRecords(keys);
+    }
 
-  public Schema getSchema() {
-    // if getSchema was invoked before converting byte [] to records
-    if (records == null) {
-      getRecords();
+    // Otherwise, we fetch all the records and filter out all the records, but the
+    // ones requested
+    List<IndexedRecord> allRecords = getRecords();
+    if (fullScan) {
+      return allRecords;
     }
-    return schema;
+
+    HashSet<String> keySet = new HashSet<>(keys);
+    return allRecords.stream()
+        .filter(record -> keySet.contains(getRecordKey(record)))
+        .collect(Collectors.toList());
   }
 
-  protected void createRecordsFromContentBytes() throws IOException {
+  protected List<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
     if (readBlockLazily && !getContent().isPresent()) {
       // read log block contents from disk
       inflate();
     }
 
-    deserializeRecords();
+    try {
+      return deserializeRecords(getContent().get());
+    } finally {
+      // Free up content to be GC'd by deflating the block
+      deflate();
+    }
+  }
+
+  protected List<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
+    throw new UnsupportedOperationException(
+        String.format("Point-wise records lookups are not supported by this Data block type (%s)", getBlockType())
+    );
   }
 
-  protected abstract byte[] serializeRecords() throws IOException;
+  protected abstract byte[] serializeRecords(List<IndexedRecord> records) throws IOException;
+
+  protected abstract List<IndexedRecord> deserializeRecords(byte[] content) throws IOException;
 
-  protected abstract void deserializeRecords() throws IOException;
+  public abstract HoodieLogBlockType getBlockType();
+
+  protected String getRecordKey(IndexedRecord record) {

Review comment:
       This is just an abstraction of the existing behavior: currently blocks don't support virtual keys, and if we want to do so i'd rather do this in a standalone PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org