You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2021/11/25 00:53:42 UTC

[hudi] branch master updated: [HUDI-2844][CLI] Fixing archived Timeline crashing if timeline contains REPLACE_COMMIT (#4091)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5129773  [HUDI-2844][CLI] Fixing archived Timeline crashing if timeline contains REPLACE_COMMIT (#4091)
5129773 is described below

commit 51297736ca7f72076376d4d245b3c161bdfe57b2
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Nov 24 16:53:29 2021 -0800

    [HUDI-2844][CLI] Fixing archived Timeline crashing if timeline contains REPLACE_COMMIT (#4091)
---
 .../table/timeline/HoodieArchivedTimeline.java     | 59 +++++++++++-----------
 1 file changed, 30 insertions(+), 29 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 4926b2a..faff4ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.common.table.timeline;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -28,14 +32,10 @@ import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
@@ -51,7 +51,6 @@ import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
@@ -147,7 +146,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
     final String instantTime  = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
     final String action = record.get(ACTION_TYPE_KEY).toString();
     if (loadDetails) {
-      Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> {
+      getMetadataKey(action).map(key -> {
+        Object actionData = record.get(key);
         if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
           this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)actionData));
         } else {
@@ -159,22 +159,25 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
     return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action, instantTime);
   }
 
-  private String getMetadataKey(String action) {
+  @Nonnull
+  private Option<String> getMetadataKey(String action) {
     switch (action) {
       case HoodieTimeline.CLEAN_ACTION:
-        return "hoodieCleanMetadata";
+        return Option.of("hoodieCleanMetadata");
       case HoodieTimeline.COMMIT_ACTION:
-        return "hoodieCommitMetadata";
       case HoodieTimeline.DELTA_COMMIT_ACTION:
-        return "hoodieCommitMetadata";
+        return Option.of("hoodieCommitMetadata");
       case HoodieTimeline.ROLLBACK_ACTION:
-        return "hoodieRollbackMetadata";
+        return Option.of("hoodieRollbackMetadata");
       case HoodieTimeline.SAVEPOINT_ACTION:
-        return "hoodieSavePointMetadata";
+        return Option.of("hoodieSavePointMetadata");
       case HoodieTimeline.COMPACTION_ACTION:
-        return "hoodieCompactionPlan";
+        return Option.of("hoodieCompactionPlan");
+      case HoodieTimeline.REPLACE_COMMIT_ACTION:
+        return Option.of("hoodieReplaceCommitMetadata");
       default:
-        throw new HoodieIOException("Unknown action in metadata " + action);
+        LOG.error(String.format("Unknown action in metadata (%s)", action));
+        return Option.empty();
     }
   }
 
@@ -199,35 +202,33 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
   private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails,
        Function<GenericRecord, Boolean> commitsFilter) {
     try {
-      // list all files
+      // List all files
       FileStatus[] fsStatuses = metaClient.getFs().globStatus(
               new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
 
-      // sort files by version suffix in reverse (implies reverse chronological order)
+      // Sort files by version suffix in reverse (implies reverse chronological order)
       Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
 
       List<HoodieInstant> instantsInRange = new ArrayList<>();
       for (FileStatus fs : fsStatuses) {
-        //read the archived file
+        // Read the archived file
         try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
             new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
           int instantsInPreviousFile = instantsInRange.size();
-          //read the avro blocks
+          // Read the avro blocks
           while (reader.hasNext()) {
             HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
             // TODO If we can store additional metadata in datablock, we can skip parsing records
             // (such as startTime, endTime of records in the block)
             List<IndexedRecord> records = blk.getRecords();
-            // filter blocks in desired time window
-            Stream<HoodieInstant> instantsInBlkStream = records.stream()
-                .filter(r -> commitsFilter.apply((GenericRecord) r))
-                .map(r -> readCommit((GenericRecord) r, loadInstantDetails));
-
-            if (filter != null) {
-              instantsInBlkStream = instantsInBlkStream.filter(filter::isInRange);
-            }
-
-            instantsInRange.addAll(instantsInBlkStream.collect(Collectors.toList()));
+            // Filter blocks in desired time window
+            instantsInRange.addAll(
+                records.stream()
+                    .filter(r -> commitsFilter.apply((GenericRecord) r))
+                    .map(r -> readCommit((GenericRecord) r, loadInstantDetails))
+                    .filter(c -> filter == null || filter.isInRange(c))
+                    .collect(Collectors.toList())
+            );
           }
 
           if (filter != null) {