You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2021/11/16 15:06:00 UTC
[jira] [Created] (HUDI-2773) Deltastreamer checkpoint copy over does not ignore compaction metadata
sivabalan narayanan created HUDI-2773:
-----------------------------------------
Summary: Deltastreamer checkpoint copy over does not ignore compaction metadata
Key: HUDI-2773
URL: https://issues.apache.org/jira/browse/HUDI-2773
Project: Apache Hudi
Issue Type: Bug
Reporter: sivabalan narayanan
compaction commit metadata is not going to have the deltastreamer checkpoint key. so, when a concurrent writer is trying to copy over deltastreamer checkpoint, it should skip compaction metadata and look at previous instants.
//possible fix in TransactionUtils
{code:java}
public static Option<Pair<HoodieInstant, Map<String, String>>> getLastCompletedTxnInstantAndMetadata(
HoodieTableMetaClient metaClient) {
List<HoodieInstant> hoodieInstants = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
if (!hoodieInstants.isEmpty()) {
for (HoodieInstant hoodieInstant : hoodieInstants) {
try {
switch (hoodieInstant.getAction()) {
case HoodieTimeline.REPLACE_COMMIT_ACTION:
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
return Option.of(Pair.of(hoodieInstant, replaceCommitMetadata.getExtraMetadata()));
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.COMMIT_ACTION:
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
if (!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)) { // skip compaction instants
return Option.of(Pair.of(hoodieInstant, commitMetadata.getExtraMetadata()));
} else {
LOG.warn("Skipping compaction instants to read latest metadata");
}
break;
default:
throw new IllegalArgumentException("Unknown instant action" + hoodieInstant.getAction());
}
} catch (IOException io) {
throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstant, io);
}
}
return Option.empty();
} else {
return Option.empty();
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)