You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/01 14:36:11 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

vinothchandar commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r791213691



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -719,19 +717,6 @@ public HoodieFileFormat getLogFileFormat() {
     return metaClient.getTableConfig().getLogFileFormat();
   }
 
-  public HoodieLogBlockType getLogDataBlockFormat() {

Review comment:
       this is a very good cleanup. thanks!

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1503,6 +1504,11 @@ public String parquetOutputTimestampType() {
     return getString(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE);
   }
 
+  public HoodieLogBlock.HoodieLogBlockType getLogDataBlockFormat() {

Review comment:
       Can we avoid `null` as the sentinel here and use an `Option`

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -18,151 +18,173 @@
 
 package org.apache.hudi.common.table.log.block;
 
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * DataBlock contains a list of records serialized using formats compatible with the base file format.
  * For each base file format there is a corresponding DataBlock format.
- *
+ * <p>
  * The Datablock contains:
  *   1. Data Block version
  *   2. Total number of records in the block
  *   3. Actual serialized content of the records
  */
 public abstract class HoodieDataBlock extends HoodieLogBlock {
 
-  protected List<IndexedRecord> records;
-  protected Schema schema;
-  protected String keyField;
+  // TODO rebase records/content to leverage Either to warrant
+  //      that they are mutex (used by read/write flows respectively)
+  private Option<List<IndexedRecord>> records;
 
-  public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
-      @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
-      @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
-      FSDataInputStream inputStream, boolean readBlockLazily) {
-    super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
-    this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
-  }
+  /**
+   * Dot-path notation reference to the key field w/in the record's schema
+   */
+  private final String keyFieldRef;
 
-  public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
-                         @Nonnull Map<HeaderMetadataType, String> footer, String keyField) {
-    this(header, footer, Option.empty(), Option.empty(), null, false);
-    this.records = records;
-    this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-    this.keyField = keyField;
-  }
+  private final boolean enablePointLookups;
 
-  protected HoodieDataBlock(Option<byte[]> content, @Nonnull FSDataInputStream inputStream, boolean readBlockLazily,
-                            Option<HoodieLogBlockContentLocation> blockContentLocation, Schema readerSchema,
-                            @Nonnull Map<HeaderMetadataType, String> headers, @Nonnull Map<HeaderMetadataType,
-      String> footer, String keyField) {
-    this(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
-    this.schema = readerSchema;
-    this.keyField = keyField;
-  }
+  protected final Schema readerSchema;
 
   /**
-   * Util method to get a data block for the requested type.
-   *
-   * @param logDataBlockFormat - Data block type
-   * @param recordList         - List of records that goes in the data block
-   * @param header             - data block header
-   * @return Data block of the requested type.
+   * NOTE: This ctor is used on the write-path (ie when records ought to be written into the log)
    */
-  public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
-                                        Map<HeaderMetadataType, String> header) {
-    return getBlock(logDataBlockFormat, recordList, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  public HoodieDataBlock(List<IndexedRecord> records,
+                         Map<HeaderMetadataType, String> header,
+                         Map<HeaderMetadataType, String> footer,
+                         String keyFieldRef) {
+    super(header, footer, Option.empty(), Option.empty(), null, false);
+    this.records = Option.of(records);
+    this.keyFieldRef = keyFieldRef;
+    // If no reader-schema has been provided assume writer-schema as one
+    this.readerSchema = getWriterSchema(super.getLogBlockHeader());
+    this.enablePointLookups = false;
   }
 
   /**
-   * Util method to get a data block for the requested type.
-   *
-   * @param logDataBlockFormat - Data block type
-   * @param recordList         - List of records that goes in the data block
-   * @param header             - data block header
-   * @param keyField           - FieldId to get the key from the records
-   * @return Data block of the requested type.
+   * NOTE: This ctor is used on the write-path (ie when records ought to be written into the log)
    */
-  public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
-                                        Map<HeaderMetadataType, String> header, String keyField) {
-    switch (logDataBlockFormat) {
-      case AVRO_DATA_BLOCK:
-        return new HoodieAvroDataBlock(recordList, header, keyField);
-      case HFILE_DATA_BLOCK:
-        return new HoodieHFileDataBlock(recordList, header, keyField);
-      default:
-        throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented");
-    }
+  protected HoodieDataBlock(Option<byte[]> content,
+                            FSDataInputStream inputStream,
+                            boolean readBlockLazily,
+                            Option<HoodieLogBlockContentLocation> blockContentLocation,
+                            Option<Schema> readerSchema,
+                            Map<HeaderMetadataType, String> headers,
+                            Map<HeaderMetadataType, String> footer,
+                            String keyFieldRef,
+                            boolean enablePointLookups) {
+    super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
+    this.records = Option.empty();
+    this.keyFieldRef = keyFieldRef;
+    // If no reader-schema has been provided assume writer-schema as one
+    this.readerSchema = readerSchema.orElseGet(() -> getWriterSchema(super.getLogBlockHeader()));
+    this.enablePointLookups = enablePointLookups;
   }
 
   @Override
   public byte[] getContentBytes() throws IOException {
     // In case this method is called before realizing records from content
-    if (getContent().isPresent()) {
-      return getContent().get();
-    } else if (readBlockLazily && !getContent().isPresent() && records == null) {
-      // read block lazily
-      createRecordsFromContentBytes();
+    Option<byte[]> content = getContent();
+
+    checkState(content.isPresent() || records.isPresent(), "Block is in invalid state");
+
+    if (content.isPresent()) {
+      return content.get();
     }
 
-    return serializeRecords();
+    return serializeRecords(records.get());
   }
 
-  public abstract HoodieLogBlockType getBlockType();
+  protected static Schema getWriterSchema(Map<HeaderMetadataType, String> logBlockHeader) {
+    return new Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
+  }
 
-  public List<IndexedRecord> getRecords() {
-    if (records == null) {
+  /**
+   * Returns all the records contained w/in this block
+   */
+  public final List<IndexedRecord> getRecords() {
+    if (!records.isPresent()) {
       try {
         // in case records are absent, read content lazily and then convert to IndexedRecords
-        createRecordsFromContentBytes();
+        records = Option.of(readRecordsFromBlockPayload());
       } catch (IOException io) {
         throw new HoodieIOException("Unable to convert content bytes to records", io);
       }
     }
-    return records;
+    return records.get();
+  }
+
+  public Schema getSchema() {
+    return readerSchema;
   }
 
   /**
    * Batch get of keys of interest. Implementation can choose to either do full scan and return matched entries or
    * do a seek based parsing and return matched entries.
+   *
    * @param keys keys of interest.
    * @return List of IndexedRecords for the keys of interest.
-   * @throws IOException
+   * @throws IOException in case of failures encountered when reading/parsing records
    */
-  public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
-    throw new UnsupportedOperationException("On demand batch get based on interested keys not supported");
-  }
+  public final List<IndexedRecord> getRecords(List<String> keys) throws IOException {
+    boolean fullScan = keys.isEmpty();
+    if (enablePointLookups && !fullScan) {
+      return lookupRecords(keys);
+    }
 
-  public Schema getSchema() {
-    // if getSchema was invoked before converting byte [] to records
-    if (records == null) {
-      getRecords();
+    // Otherwise, we fetch all the records and filter out all the records, but the
+    // ones requested
+    List<IndexedRecord> allRecords = getRecords();
+    if (fullScan) {
+      return allRecords;
     }
-    return schema;
+
+    HashSet<String> keySet = new HashSet<>(keys);
+    return allRecords.stream()
+        .filter(record -> keySet.contains(getRecordKey(record)))
+        .collect(Collectors.toList());
   }
 
-  protected void createRecordsFromContentBytes() throws IOException {
+  protected List<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
     if (readBlockLazily && !getContent().isPresent()) {
       // read log block contents from disk
       inflate();
     }
 
-    deserializeRecords();
+    try {
+      return deserializeRecords(getContent().get());
+    } finally {
+      // Free up content to be GC'd by deflating the block
+      deflate();
+    }
+  }
+
+  protected List<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
+    throw new UnsupportedOperationException(
+        String.format("Point lookups are not supported by this Data block type (%s)", getBlockType())
+    );
   }
 
-  protected abstract byte[] serializeRecords() throws IOException;
+  protected abstract byte[] serializeRecords(List<IndexedRecord> records) throws IOException;
 
-  protected abstract void deserializeRecords() throws IOException;
+  protected abstract List<IndexedRecord> deserializeRecords(byte[] content) throws IOException;
+
+  public abstract HoodieLogBlockType getBlockType();
+
+  protected String getRecordKey(IndexedRecord record) {
+    return Option.ofNullable(record.getSchema().getField(keyFieldRef))
+        .map(keyField -> record.get(keyField.pos()))

Review comment:
       there is similar code for the metadata table/key field de-dupe work. Could we carve this into some util method

##########
File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.OutputFile;
+
+import java.io.IOException;
+
+// TODO unify w/ HoodieParquetWriter

Review comment:
       JIRA for this?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -18,151 +18,173 @@
 
 package org.apache.hudi.common.table.log.block;
 
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * DataBlock contains a list of records serialized using formats compatible with the base file format.
  * For each base file format there is a corresponding DataBlock format.
- *
+ * <p>
  * The Datablock contains:
  *   1. Data Block version
  *   2. Total number of records in the block
  *   3. Actual serialized content of the records
  */
 public abstract class HoodieDataBlock extends HoodieLogBlock {
 
-  protected List<IndexedRecord> records;
-  protected Schema schema;
-  protected String keyField;
+  // TODO rebase records/content to leverage Either to warrant
+  //      that they are mutex (used by read/write flows respectively)
+  private Option<List<IndexedRecord>> records;
 
-  public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
-      @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
-      @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
-      FSDataInputStream inputStream, boolean readBlockLazily) {
-    super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
-    this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
-  }
+  /**
+   * Dot-path notation reference to the key field w/in the record's schema
+   */
+  private final String keyFieldRef;
 
-  public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
-                         @Nonnull Map<HeaderMetadataType, String> footer, String keyField) {
-    this(header, footer, Option.empty(), Option.empty(), null, false);
-    this.records = records;
-    this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-    this.keyField = keyField;
-  }
+  private final boolean enablePointLookups;
 
-  protected HoodieDataBlock(Option<byte[]> content, @Nonnull FSDataInputStream inputStream, boolean readBlockLazily,
-                            Option<HoodieLogBlockContentLocation> blockContentLocation, Schema readerSchema,
-                            @Nonnull Map<HeaderMetadataType, String> headers, @Nonnull Map<HeaderMetadataType,
-      String> footer, String keyField) {
-    this(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
-    this.schema = readerSchema;
-    this.keyField = keyField;
-  }
+  protected final Schema readerSchema;
 
   /**
-   * Util method to get a data block for the requested type.
-   *
-   * @param logDataBlockFormat - Data block type
-   * @param recordList         - List of records that goes in the data block
-   * @param header             - data block header
-   * @return Data block of the requested type.
+   * NOTE: This ctor is used on the write-path (ie when records ought to be written into the log)
    */
-  public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
-                                        Map<HeaderMetadataType, String> header) {
-    return getBlock(logDataBlockFormat, recordList, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  public HoodieDataBlock(List<IndexedRecord> records,
+                         Map<HeaderMetadataType, String> header,
+                         Map<HeaderMetadataType, String> footer,
+                         String keyFieldRef) {
+    super(header, footer, Option.empty(), Option.empty(), null, false);
+    this.records = Option.of(records);
+    this.keyFieldRef = keyFieldRef;
+    // If no reader-schema has been provided assume writer-schema as one
+    this.readerSchema = getWriterSchema(super.getLogBlockHeader());
+    this.enablePointLookups = false;
   }
 
   /**
-   * Util method to get a data block for the requested type.
-   *
-   * @param logDataBlockFormat - Data block type
-   * @param recordList         - List of records that goes in the data block
-   * @param header             - data block header
-   * @param keyField           - FieldId to get the key from the records
-   * @return Data block of the requested type.
+   * NOTE: This ctor is used on the write-path (ie when records ought to be written into the log)
    */
-  public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
-                                        Map<HeaderMetadataType, String> header, String keyField) {
-    switch (logDataBlockFormat) {
-      case AVRO_DATA_BLOCK:
-        return new HoodieAvroDataBlock(recordList, header, keyField);
-      case HFILE_DATA_BLOCK:
-        return new HoodieHFileDataBlock(recordList, header, keyField);
-      default:
-        throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented");
-    }
+  protected HoodieDataBlock(Option<byte[]> content,
+                            FSDataInputStream inputStream,
+                            boolean readBlockLazily,
+                            Option<HoodieLogBlockContentLocation> blockContentLocation,
+                            Option<Schema> readerSchema,
+                            Map<HeaderMetadataType, String> headers,
+                            Map<HeaderMetadataType, String> footer,
+                            String keyFieldRef,
+                            boolean enablePointLookups) {
+    super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
+    this.records = Option.empty();
+    this.keyFieldRef = keyFieldRef;
+    // If no reader-schema has been provided assume writer-schema as one
+    this.readerSchema = readerSchema.orElseGet(() -> getWriterSchema(super.getLogBlockHeader()));
+    this.enablePointLookups = enablePointLookups;
   }
 
   @Override
   public byte[] getContentBytes() throws IOException {
     // In case this method is called before realizing records from content
-    if (getContent().isPresent()) {
-      return getContent().get();
-    } else if (readBlockLazily && !getContent().isPresent() && records == null) {
-      // read block lazily
-      createRecordsFromContentBytes();
+    Option<byte[]> content = getContent();
+
+    checkState(content.isPresent() || records.isPresent(), "Block is in invalid state");
+
+    if (content.isPresent()) {
+      return content.get();
     }
 
-    return serializeRecords();
+    return serializeRecords(records.get());
   }
 
-  public abstract HoodieLogBlockType getBlockType();
+  protected static Schema getWriterSchema(Map<HeaderMetadataType, String> logBlockHeader) {
+    return new Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
+  }
 
-  public List<IndexedRecord> getRecords() {
-    if (records == null) {
+  /**
+   * Returns all the records contained w/in this block
+   */
+  public final List<IndexedRecord> getRecords() {
+    if (!records.isPresent()) {
       try {
         // in case records are absent, read content lazily and then convert to IndexedRecords
-        createRecordsFromContentBytes();
+        records = Option.of(readRecordsFromBlockPayload());
       } catch (IOException io) {
         throw new HoodieIOException("Unable to convert content bytes to records", io);
       }
     }
-    return records;
+    return records.get();
+  }
+
+  public Schema getSchema() {
+    return readerSchema;
   }
 
   /**
    * Batch get of keys of interest. Implementation can choose to either do full scan and return matched entries or
    * do a seek based parsing and return matched entries.
+   *
    * @param keys keys of interest.
    * @return List of IndexedRecords for the keys of interest.
-   * @throws IOException
+   * @throws IOException in case of failures encountered when reading/parsing records
    */
-  public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
-    throw new UnsupportedOperationException("On demand batch get based on interested keys not supported");
-  }
+  public final List<IndexedRecord> getRecords(List<String> keys) throws IOException {
+    boolean fullScan = keys.isEmpty();
+    if (enablePointLookups && !fullScan) {
+      return lookupRecords(keys);
+    }
 
-  public Schema getSchema() {
-    // if getSchema was invoked before converting byte [] to records
-    if (records == null) {
-      getRecords();
+    // Otherwise, we fetch all the records and filter out all the records, but the
+    // ones requested
+    List<IndexedRecord> allRecords = getRecords();
+    if (fullScan) {
+      return allRecords;
     }
-    return schema;
+
+    HashSet<String> keySet = new HashSet<>(keys);
+    return allRecords.stream()
+        .filter(record -> keySet.contains(getRecordKey(record)))
+        .collect(Collectors.toList());
   }
 
-  protected void createRecordsFromContentBytes() throws IOException {
+  protected List<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
     if (readBlockLazily && !getContent().isPresent()) {
       // read log block contents from disk
       inflate();
     }
 
-    deserializeRecords();
+    try {
+      return deserializeRecords(getContent().get());
+    } finally {
+      // Free up content to be GC'd by deflating the block
+      deflate();
+    }
+  }
+
+  protected List<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
+    throw new UnsupportedOperationException(
+        String.format("Point lookups are not supported by this Data block type (%s)", getBlockType())
+    );
   }
 
-  protected abstract byte[] serializeRecords() throws IOException;
+  protected abstract byte[] serializeRecords(List<IndexedRecord> records) throws IOException;
 
-  protected abstract void deserializeRecords() throws IOException;
+  protected abstract List<IndexedRecord> deserializeRecords(byte[] content) throws IOException;
+
+  public abstract HoodieLogBlockType getBlockType();
+
+  protected String getRecordKey(IndexedRecord record) {
+    return Option.ofNullable(record.getSchema().getField(keyFieldRef))
+        .map(keyField -> record.get(keyField.pos()))
+        .map(Object::toString)
+        .orElse(null);

Review comment:
       avoid returning null?

##########
File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.OutputFile;
+
+import java.io.IOException;
+
+// TODO unify w/ HoodieParquetWriter
+public class HoodieParquetStreamWriter<R extends IndexedRecord> implements AutoCloseable {
+
+  private final ParquetWriter<R> writer;
+  private final HoodieAvroWriteSupport writeSupport;
+
+  public HoodieParquetStreamWriter(
+      FSDataOutputStream outputStream,
+      HoodieAvroParquetConfig parquetConfig) throws IOException {
+    this.writeSupport = parquetConfig.getWriteSupport();
+    this.writer = new Builder<R>(new OutputStreamBackedOutputFile(outputStream), writeSupport)

Review comment:
       do we set all the configs that are currently set on the other path. double check once?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -18,151 +18,173 @@
 
 package org.apache.hudi.common.table.log.block;
 
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * DataBlock contains a list of records serialized using formats compatible with the base file format.
  * For each base file format there is a corresponding DataBlock format.
- *
+ * <p>
  * The Datablock contains:
  *   1. Data Block version
  *   2. Total number of records in the block
  *   3. Actual serialized content of the records
  */
 public abstract class HoodieDataBlock extends HoodieLogBlock {
 
-  protected List<IndexedRecord> records;
-  protected Schema schema;
-  protected String keyField;
+  // TODO rebase records/content to leverage Either to warrant
+  //      that they are mutex (used by read/write flows respectively)
+  private Option<List<IndexedRecord>> records;
 
-  public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
-      @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
-      @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
-      FSDataInputStream inputStream, boolean readBlockLazily) {
-    super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
-    this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
-  }
+  /**
+   * Dot-path notation reference to the key field w/in the record's schema
+   */
+  private final String keyFieldRef;

Review comment:
       rename: keyFieldName?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -481,4 +447,63 @@ public long moveToPrev() throws IOException {
   public void remove() {
     throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader");
   }
+
+  private static Path makeQualified(FileSystem fs, Path path) {

Review comment:
       move to FSUtils

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
##########
@@ -60,7 +59,7 @@ public HoodieLogFile(FileStatus fileStatus) {
   public HoodieLogFile(Path logPath) {
     this.fileStatus = null;
     this.pathStr = logPath.toString();
-    this.fileLen = 0;
+    this.fileLen = -1;

Review comment:
       did you hit any issues that required this to be -1. I am okay with the change. But there might be side-effects to consider?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -18,151 +18,173 @@
 
 package org.apache.hudi.common.table.log.block;
 
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * DataBlock contains a list of records serialized using formats compatible with the base file format.
  * For each base file format there is a corresponding DataBlock format.
- *
+ * <p>
  * The Datablock contains:
  *   1. Data Block version
  *   2. Total number of records in the block
  *   3. Actual serialized content of the records
  */
 public abstract class HoodieDataBlock extends HoodieLogBlock {
 
-  protected List<IndexedRecord> records;
-  protected Schema schema;
-  protected String keyField;
+  // TODO rebase records/content to leverage Either to warrant
+  //      that they are mutex (used by read/write flows respectively)
+  private Option<List<IndexedRecord>> records;
 
-  public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
-      @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
-      @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
-      FSDataInputStream inputStream, boolean readBlockLazily) {
-    super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
-    this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
-  }
+  /**
+   * Dot-path notation reference to the key field w/in the record's schema
+   */
+  private final String keyFieldRef;
 
-  public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
-                         @Nonnull Map<HeaderMetadataType, String> footer, String keyField) {
-    this(header, footer, Option.empty(), Option.empty(), null, false);
-    this.records = records;
-    this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-    this.keyField = keyField;
-  }
+  private final boolean enablePointLookups;
 
-  protected HoodieDataBlock(Option<byte[]> content, @Nonnull FSDataInputStream inputStream, boolean readBlockLazily,
-                            Option<HoodieLogBlockContentLocation> blockContentLocation, Schema readerSchema,
-                            @Nonnull Map<HeaderMetadataType, String> headers, @Nonnull Map<HeaderMetadataType,
-      String> footer, String keyField) {
-    this(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
-    this.schema = readerSchema;
-    this.keyField = keyField;
-  }
+  protected final Schema readerSchema;
 
   /**
-   * Util method to get a data block for the requested type.
-   *
-   * @param logDataBlockFormat - Data block type
-   * @param recordList         - List of records that goes in the data block
-   * @param header             - data block header
-   * @return Data block of the requested type.
+   * NOTE: This ctor is used on the write-path (ie when records ought to be written into the log)
    */
-  public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
-                                        Map<HeaderMetadataType, String> header) {
-    return getBlock(logDataBlockFormat, recordList, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  public HoodieDataBlock(List<IndexedRecord> records,
+                         Map<HeaderMetadataType, String> header,
+                         Map<HeaderMetadataType, String> footer,
+                         String keyFieldRef) {
+    super(header, footer, Option.empty(), Option.empty(), null, false);
+    this.records = Option.of(records);
+    this.keyFieldRef = keyFieldRef;
+    // If no reader-schema has been provided assume writer-schema as one
+    this.readerSchema = getWriterSchema(super.getLogBlockHeader());
+    this.enablePointLookups = false;
   }
 
   /**
-   * Util method to get a data block for the requested type.
-   *
-   * @param logDataBlockFormat - Data block type
-   * @param recordList         - List of records that goes in the data block
-   * @param header             - data block header
-   * @param keyField           - FieldId to get the key from the records
-   * @return Data block of the requested type.
+   * NOTE: This ctor is used on the write-path (ie when records ought to be written into the log)
    */
-  public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
-                                        Map<HeaderMetadataType, String> header, String keyField) {
-    switch (logDataBlockFormat) {
-      case AVRO_DATA_BLOCK:
-        return new HoodieAvroDataBlock(recordList, header, keyField);
-      case HFILE_DATA_BLOCK:
-        return new HoodieHFileDataBlock(recordList, header, keyField);
-      default:
-        throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented");
-    }
+  protected HoodieDataBlock(Option<byte[]> content,
+                            FSDataInputStream inputStream,
+                            boolean readBlockLazily,
+                            Option<HoodieLogBlockContentLocation> blockContentLocation,
+                            Option<Schema> readerSchema,
+                            Map<HeaderMetadataType, String> headers,
+                            Map<HeaderMetadataType, String> footer,
+                            String keyFieldRef,
+                            boolean enablePointLookups) {
+    super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
+    this.records = Option.empty();
+    this.keyFieldRef = keyFieldRef;
+    // If no reader-schema has been provided assume writer-schema as one
+    this.readerSchema = readerSchema.orElseGet(() -> getWriterSchema(super.getLogBlockHeader()));
+    this.enablePointLookups = enablePointLookups;
   }
 
   @Override
   public byte[] getContentBytes() throws IOException {
     // In case this method is called before realizing records from content
-    if (getContent().isPresent()) {
-      return getContent().get();
-    } else if (readBlockLazily && !getContent().isPresent() && records == null) {
-      // read block lazily
-      createRecordsFromContentBytes();
+    Option<byte[]> content = getContent();
+
+    checkState(content.isPresent() || records.isPresent(), "Block is in invalid state");

Review comment:
       add more reasoning to the error message

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -224,6 +217,33 @@ public static HoodieAvroDataBlock getBlock(byte[] content, Schema readerSchema)
     return new HoodieAvroDataBlock(records, readerSchema);
   }
 
+  private static byte[] compress(String text) {

Review comment:
       and add an unit test around it

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -224,6 +217,33 @@ public static HoodieAvroDataBlock getBlock(byte[] content, Schema readerSchema)
     return new HoodieAvroDataBlock(records, readerSchema);
   }
 
+  private static byte[] compress(String text) {

Review comment:
       can we move this to IOUtils or some common utils class?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -481,4 +447,63 @@ public long moveToPrev() throws IOException {
   public void remove() {
     throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader");
   }
+
+  private static Path makeQualified(FileSystem fs, Path path) {
+    return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
+
+  /**
+   * Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams.
+   * @param fs instance of {@link FileSystem} in use.
+   * @param bufferSize buffer size to be used.
+   * @return the right {@link FSDataInputStream} as required.
+   */
+  private static FSDataInputStream getFSDataInputStream(FileSystem fs,

Review comment:
       whenever you move code, please leave a comment on the PR on what has changed. Can you let me know if any code has changed here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org