You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2021/11/25 00:53:42 UTC
[hudi] branch master updated: [HUDI-2844][CLI] Fixing archived Timeline crashing if timeline contains REPLACE_COMMIT (#4091)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5129773 [HUDI-2844][CLI] Fixing archived Timeline crashing if timeline contains REPLACE_COMMIT (#4091)
5129773 is described below
commit 51297736ca7f72076376d4d245b3c161bdfe57b2
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Nov 24 16:53:29 2021 -0800
[HUDI-2844][CLI] Fixing archived Timeline crashing if timeline contains REPLACE_COMMIT (#4091)
---
.../table/timeline/HoodieArchivedTimeline.java | 59 +++++++++++-----------
1 file changed, 30 insertions(+), 29 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 4926b2a..faff4ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -18,6 +18,10 @@
package org.apache.hudi.common.table.timeline;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -28,14 +32,10 @@ import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
@@ -51,7 +51,6 @@ import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
* Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
@@ -147,7 +146,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
final String action = record.get(ACTION_TYPE_KEY).toString();
if (loadDetails) {
- Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> {
+ getMetadataKey(action).map(key -> {
+ Object actionData = record.get(key);
if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)actionData));
} else {
@@ -159,22 +159,25 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action, instantTime);
}
- private String getMetadataKey(String action) {
+ @Nonnull
+ private Option<String> getMetadataKey(String action) {
switch (action) {
case HoodieTimeline.CLEAN_ACTION:
- return "hoodieCleanMetadata";
+ return Option.of("hoodieCleanMetadata");
case HoodieTimeline.COMMIT_ACTION:
- return "hoodieCommitMetadata";
case HoodieTimeline.DELTA_COMMIT_ACTION:
- return "hoodieCommitMetadata";
+ return Option.of("hoodieCommitMetadata");
case HoodieTimeline.ROLLBACK_ACTION:
- return "hoodieRollbackMetadata";
+ return Option.of("hoodieRollbackMetadata");
case HoodieTimeline.SAVEPOINT_ACTION:
- return "hoodieSavePointMetadata";
+ return Option.of("hoodieSavePointMetadata");
case HoodieTimeline.COMPACTION_ACTION:
- return "hoodieCompactionPlan";
+ return Option.of("hoodieCompactionPlan");
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ return Option.of("hoodieReplaceCommitMetadata");
default:
- throw new HoodieIOException("Unknown action in metadata " + action);
+ LOG.error(String.format("Unknown action in metadata (%s)", action));
+ return Option.empty();
}
}
@@ -199,35 +202,33 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails,
Function<GenericRecord, Boolean> commitsFilter) {
try {
- // list all files
+ // List all files
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
- // sort files by version suffix in reverse (implies reverse chronological order)
+ // Sort files by version suffix in reverse (implies reverse chronological order)
Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
List<HoodieInstant> instantsInRange = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
- //read the archived file
+ // Read the archived file
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
int instantsInPreviousFile = instantsInRange.size();
- //read the avro blocks
+ // Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
// TODO If we can store additional metadata in datablock, we can skip parsing records
// (such as startTime, endTime of records in the block)
List<IndexedRecord> records = blk.getRecords();
- // filter blocks in desired time window
- Stream<HoodieInstant> instantsInBlkStream = records.stream()
- .filter(r -> commitsFilter.apply((GenericRecord) r))
- .map(r -> readCommit((GenericRecord) r, loadInstantDetails));
-
- if (filter != null) {
- instantsInBlkStream = instantsInBlkStream.filter(filter::isInRange);
- }
-
- instantsInRange.addAll(instantsInBlkStream.collect(Collectors.toList()));
+ // Filter blocks in desired time window
+ instantsInRange.addAll(
+ records.stream()
+ .filter(r -> commitsFilter.apply((GenericRecord) r))
+ .map(r -> readCommit((GenericRecord) r, loadInstantDetails))
+ .filter(c -> filter == null || filter.isInRange(c))
+ .collect(Collectors.toList())
+ );
}
if (filter != null) {