You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/04 07:20:05 UTC

[GitHub] [hudi] vinothchandar commented on a diff in pull request #5179: [HUDI-3290] Different file formats for the partition metadata file.

vinothchandar commented on code in PR #5179:
URL: https://github.com/apache/hudi/pull/5179#discussion_r841430510


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java:
##########
@@ -118,35 +136,112 @@ public void trySave(int taskPartitionId) {
     }
   }
 
+  private String getMetafileExtension() {
+    // To be backwards compatible, there is no extension to the properties file base partition metafile
+    return format.isPresent() ? format.get().getFileExtension() : StringUtils.EMPTY_STRING;
+  }
+
+  /**
+   * Write the partition metadata in the correct format in the given file path.
+   *
+   * @param filePath Path of the file to write
+   * @throws IOException
+   */
+  private void writeMetafile(Path filePath) throws IOException {
+    if (format.isPresent()) {
+      Schema schema = HoodieAvroUtils.getRecordKeySchema();
+
+      switch (format.get()) {
+        case PARQUET:
+          // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
+          // parameters are not important.
+          MessageType type = Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+          HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(type, schema, Option.empty());
+          try (ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
+            for (String key : props.stringPropertyNames()) {
+              writeSupport.addFooterMetadata(key, props.getProperty(key));
+            }
+          }
+          break;
+        case ORC:
+          // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
+          // parameters are not important.
+          OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
+              .setSchema(AvroOrcUtils.createOrcSchema(schema));
+          try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
+            for (String key : props.stringPropertyNames()) {
+              writer.addUserMetadata(key, ByteBuffer.wrap(props.getProperty(key).getBytes()));
+            }
+          }
+          break;
+        default:
+          throw new HoodieException("Unsupported format for partition metafiles: " + format.get());
+      }
+    } else {
+      // Backwards compatible properties file format
+      FSDataOutputStream os = fs.create(filePath, true);
+      props.store(os, "partition metadata");
+      os.hsync();
+      os.hflush();
+      os.close();
+    }
+  }
+
   /**
    * Read out the metadata for this partition.
    */
   public void readFromFS() throws IOException {
-    FSDataInputStream is = null;
-    try {
-      Path metaFile = new Path(partitionPath, HOODIE_PARTITION_METAFILE);
-      is = fs.open(metaFile);
+    // first try reading the text format (legacy, currently widespread)

Review Comment:
   @xushiyan to review this part once.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -371,6 +373,8 @@ public void refreshTimeline() throws IOException {
               HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
           .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
               SimpleKeyGenerator.class.getName()))
+          .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),

Review Comment:
   @vingov this was the missing wiring



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java:
##########
@@ -118,35 +136,112 @@ public void trySave(int taskPartitionId) {
     }
   }
 
+  private String getMetafileExtension() {
+    // To be backwards compatible, there is no extension to the properties file base partition metafile
+    return format.isPresent() ? format.get().getFileExtension() : StringUtils.EMPTY_STRING;
+  }
+
+  /**
+   * Write the partition metadata in the correct format in the given file path.
+   *
+   * @param filePath Path of the file to write
+   * @throws IOException
+   */
+  private void writeMetafile(Path filePath) throws IOException {
+    if (format.isPresent()) {
+      Schema schema = HoodieAvroUtils.getRecordKeySchema();
+
+      switch (format.get()) {
+        case PARQUET:
+          // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
+          // parameters are not important.
+          MessageType type = Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+          HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(type, schema, Option.empty());
+          try (ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
+            for (String key : props.stringPropertyNames()) {
+              writeSupport.addFooterMetadata(key, props.getProperty(key));
+            }
+          }
+          break;
+        case ORC:
+          // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
+          // parameters are not important.
+          OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
+              .setSchema(AvroOrcUtils.createOrcSchema(schema));
+          try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
+            for (String key : props.stringPropertyNames()) {
+              writer.addUserMetadata(key, ByteBuffer.wrap(props.getProperty(key).getBytes()));
+            }
+          }
+          break;
+        default:
+          throw new HoodieException("Unsupported format for partition metafiles: " + format.get());
+      }
+    } else {
+      // Backwards compatible properties file format
+      FSDataOutputStream os = fs.create(filePath, true);
+      props.store(os, "partition metadata");
+      os.hsync();
+      os.hflush();
+      os.close();
+    }
+  }
+
   /**
    * Read out the metadata for this partition.
    */
   public void readFromFS() throws IOException {
-    FSDataInputStream is = null;
-    try {
-      Path metaFile = new Path(partitionPath, HOODIE_PARTITION_METAFILE);
-      is = fs.open(metaFile);
+    // first try reading the text format (legacy, currently widespread)
+    boolean readFile = readTextFormatMetaFile();
+    if (!readFile) {
+      // now try reading the base file formats.
+      readFile = readBaseFormatMetaFile();
+    }
+
+    // throw exception.
+    if (!readFile) {
+      throw new HoodieException("Unable to read any partition meta file to locate the table timeline.");
+    }
+  }
+
+  private boolean readTextFormatMetaFile() {
+    // Properties file format
+    Path metafilePath = textFormatMetaFilePath(partitionPath);
+    try (FSDataInputStream is = fs.open(metafilePath)) {
       props.load(is);
-    } catch (IOException ioe) {
-      throw new HoodieException("Error reading Hoodie partition metadata for " + partitionPath, ioe);
-    } finally {
-      if (is != null) {
-        is.close();
+      return true;
+    } catch (Throwable t) {
+      LOG.warn("Unable to read partition meta properties file for partition " + partitionPath, t);
+      return false;
+    }
+  }
+
+  private boolean readBaseFormatMetaFile() {
+    for (Path metafilePath : baseFormatMetaFilePaths(partitionPath)) {
+      try {
+        BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath.toString());
+        // Data file format
+        Map<String, String> metadata = reader.readFooter(fs.getConf(), true, metafilePath, PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
+        props.clear();
+        metadata.forEach(props::put);
+        return true;
+      } catch (Throwable t) {
+        // any error, log, check the next base format
+        LOG.warn("Unable to read partition metadata " + metafilePath.getName() + " for partition " + partitionPath, t);

Review Comment:
   this log may be noisy. but thought, its better to have it for now. 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java:
##########
@@ -117,30 +133,119 @@ public void trySave(int taskPartitionId) {
     }
   }
 
+  private String getMetafileExtension() {
+    // To be backwards compatible, there is no extension to the properties file base partition metafile
+    return format.isPresent() ? format.get().getFileExtension() : "";
+  }
+
+  /**
+   * Write the partition metadata in the correct format in the given file path.
+   *
+   * @param filePath Path of the file to write
+   * @throws IOException
+   */
+  private void writeMetafile(Path filePath) throws IOException {
+    if (format.isPresent()) {
+      Schema schema = HoodieAvroUtils.getRecordKeySchema();
+
+      switch (format.get()) {
+        case PARQUET:
+          // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
+          // parameters are not important.
+          MessageType type = Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+          HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(type, schema, Option.empty());

Review Comment:
   @vingov Not sure what you were hitting. @xushiyan just tested with the spark datasource writer and reports that its all working end-end. I pushed a lot of changes just now. There was one place missing for deltastreamer as well, where it was n't passing the config correctly. 
   
   Can you try this PR, with my changes as well (as of time of this comment?)



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java:
##########
@@ -118,35 +136,112 @@ public void trySave(int taskPartitionId) {
     }
   }
 
+  private String getMetafileExtension() {
+    // To be backwards compatible, there is no extension to the properties file base partition metafile
+    return format.isPresent() ? format.get().getFileExtension() : StringUtils.EMPTY_STRING;
+  }
+
+  /**
+   * Write the partition metadata in the correct format in the given file path.
+   *
+   * @param filePath Path of the file to write
+   * @throws IOException
+   */
+  private void writeMetafile(Path filePath) throws IOException {
+    if (format.isPresent()) {
+      Schema schema = HoodieAvroUtils.getRecordKeySchema();
+
+      switch (format.get()) {
+        case PARQUET:
+          // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
+          // parameters are not important.
+          MessageType type = Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+          HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(type, schema, Option.empty());
+          try (ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
+            for (String key : props.stringPropertyNames()) {
+              writeSupport.addFooterMetadata(key, props.getProperty(key));
+            }
+          }
+          break;
+        case ORC:
+          // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
+          // parameters are not important.
+          OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
+              .setSchema(AvroOrcUtils.createOrcSchema(schema));
+          try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
+            for (String key : props.stringPropertyNames()) {
+              writer.addUserMetadata(key, ByteBuffer.wrap(props.getProperty(key).getBytes()));
+            }
+          }
+          break;
+        default:
+          throw new HoodieException("Unsupported format for partition metafiles: " + format.get());
+      }
+    } else {
+      // Backwards compatible properties file format
+      FSDataOutputStream os = fs.create(filePath, true);
+      props.store(os, "partition metadata");
+      os.hsync();
+      os.hflush();
+      os.close();
+    }
+  }
+
   /**
    * Read out the metadata for this partition.
    */
   public void readFromFS() throws IOException {
-    FSDataInputStream is = null;
-    try {
-      Path metaFile = new Path(partitionPath, HOODIE_PARTITION_METAFILE);
-      is = fs.open(metaFile);
+    // first try reading the text format (legacy, currently widespread)
+    boolean readFile = readTextFormatMetaFile();
+    if (!readFile) {
+      // now try reading the base file formats.
+      readFile = readBaseFormatMetaFile();
+    }
+
+    // throw exception.
+    if (!readFile) {
+      throw new HoodieException("Unable to read any partition meta file to locate the table timeline.");
+    }
+  }
+
+  private boolean readTextFormatMetaFile() {
+    // Properties file format
+    Path metafilePath = textFormatMetaFilePath(partitionPath);

Review Comment:
   kept current behavior of reading the text file directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org