You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/08/24 19:11:43 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #1804: [HUDI-960] Implementation of the HFile base and log file format.

vinothchandar commented on a change in pull request #1804:
URL: https://github.com/apache/hudi/pull/1804#discussion_r475815871



##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
##########
@@ -81,6 +87,7 @@ public Builder fromProperties(Properties props) {
 
     public Builder limitFileSize(long maxFileSize) {
       props.setProperty(PARQUET_FILE_MAX_BYTES, String.valueOf(maxFileSize));
+      props.setProperty(HFILE_FILE_MAX_BYTES, String.valueOf(maxFileSize));

Review comment:
       not sure if its a good idea to overload two configs like this. we may need to break this builder method up separately.

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
##########
@@ -55,7 +56,7 @@
   private long recordsWritten = 0;
   private long insertRecordsWritten = 0;
   private long recordsDeleted = 0;
-  private Iterator<HoodieRecord<T>> recordIterator;
+  private Map<String, HoodieRecord<T>> recordMap;

Review comment:
       need to ensure that having this be a map wont affect normal inserts.

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+/**
+ * Hoodie merge handle which writes records (new inserts or updates) sorted by their key.
+ *
+ * The implementation performs a merge-sort by comparing the key of the record being written to the list of
+ * keys in newRecordKeys (sorted in-memory).
+ */
+public class HoodieSortedMergeHandle<T extends HoodieRecordPayload> extends HoodieMergeHandle<T> {
+
+  private Queue<String> newRecordKeysSorted = new PriorityQueue<>();
+
+  public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
+       Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
+    newRecordKeysSorted.addAll(keyToNewRecords.keySet());
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
+      Map<String, HoodieRecord<T>> keyToNewRecordsOrig, String partitionPath, String fileId,
+      HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier sparkTaskContextSupplier) {
+    super(config, instantTime, hoodieTable, keyToNewRecordsOrig, partitionPath, fileId, dataFileToBeMerged,
+        sparkTaskContextSupplier);
+
+    newRecordKeysSorted.addAll(keyToNewRecords.keySet());
+  }
+
+  /**
+   * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
+   */
+  @Override
+  public void write(GenericRecord oldRecord) {
+    String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+
+    // To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than
+    // the oldRecord's key.
+    while (!newRecordKeysSorted.isEmpty() && newRecordKeysSorted.peek().compareTo(key) <= 0) {
+      String keyToPreWrite = newRecordKeysSorted.remove();

Review comment:
       instead, we can just do a streaming sort-merge? 

##########
File path: hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
##########
@@ -115,6 +115,35 @@ public void testParquetDemo() throws Exception {
     testIncrementalHiveQueryAfterCompaction();
   }
 
+  @Test
+  public void testHFileDemo() throws Exception {

Review comment:
       this is good. but will add to our integration test runtime by a lot. we should disable this and enabled later. need a JIRA

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
##########
@@ -135,15 +135,14 @@
 
     // Compacting is very similar to applying updates to existing file
     Iterator<List<WriteStatus>> result;
-    // If the dataFile is present, there is a base parquet file present, perform updates else perform inserts into a
-    // new base parquet file.
+    // If the dataFile is present, perform updates else perform inserts into a new base file.
     if (oldDataFileOpt.isPresent()) {
       result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(),
               operation.getFileId(), scanner.getRecords(),
           oldDataFileOpt.get());
     } else {
       result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(),
-          scanner.iterator());
+          scanner.getRecords());

Review comment:
       this will pay additional merge costs

##########
File path: hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieHFileInputFormat {

Review comment:
       is this copy-pasting some other test? room for code re-use? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * HoodieHFileWriter writes IndexedRecords into an HFile. The record's key is used as the key and the
+ * AVRO encoded record bytes are saved as the value.
+ *
+ * Limitations (compared to columnar formats like Parquet or ORC):
+ *  1. Records should be added in order of keys
+ *  2. There are no column stats
+ */
+public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
+    implements HoodieFileWriter<R> {
+  private static AtomicLong recordIndex = new AtomicLong(1);
+
+  private final Path file;
+  private HoodieHFileConfig hfileConfig;
+  private final HoodieWrapperFileSystem fs;
+  private final long maxFileSize;
+  private final String instantTime;
+  private final SparkTaskContextSupplier sparkTaskContextSupplier;
+  private HFile.Writer writer;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileConfig, Schema schema,
+      SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
+
+    Configuration conf = FSUtils.registerFileSystem(file, hfileConfig.getHadoopConf());
+    this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
+    this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
+    this.hfileConfig = hfileConfig;
+
+    // TODO - compute this compression ratio dynamically by looking at the bytes written to the
+    // stream and the actual file size reported by HDFS
+    // this.maxFileSize = hfileConfig.getMaxFileSize()

Review comment:
       are these configs used at all now? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
##########
@@ -77,4 +79,9 @@ public long getTotalRecords() {
     // TODO Auto-generated method stub
     return 0;
   }
+
+  @Override
+  public Option getRecordByKey(String key, Schema schema) throws IOException {
+    throw new HoodieException("HoodieParquetReader does not support reading records by key");

Review comment:
       throw UnsupportedException

##########
File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
##########
@@ -34,7 +35,17 @@
 
   public Set<String> filterRowKeys(Set<String> candidateRowKeys);
 
-  public Iterator<R> getRecordIterator(Schema schema) throws IOException;
+  public Iterator<R> getRecordIterator(Schema readerSchema) throws IOException;
+
+  default Iterator<R> getRecordIterator() throws IOException {
+    return getRecordIterator(getSchema());
+  }
+
+  public Option<R> getRecordByKey(String key, Schema readerSchema) throws IOException;

Review comment:
       I think. throwing `UnSupportedException` is actually better. nothing can fundamentally change whether a format supports this or not 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -83,68 +79,6 @@ protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline tim
     return timeline;
   }
 
-  /**
-   * Add a field to the existing fields projected.
-   */
-  private static Configuration addProjectionField(Configuration conf, String fieldName, int fieldIndex) {

Review comment:
       is this dead code? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org