You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2021/11/16 15:07:00 UTC
[jira] [Assigned] (HUDI-2773) Deltastreamer checkpoint copy over does not ignore compaction metadata
[ https://issues.apache.org/jira/browse/HUDI-2773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sivabalan narayanan reassigned HUDI-2773:
-----------------------------------------
Assignee: sivabalan narayanan
> Deltastreamer checkpoint copy over does not ignore compaction metadata
> ----------------------------------------------------------------------
>
> Key: HUDI-2773
> URL: https://issues.apache.org/jira/browse/HUDI-2773
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: sivabalan narayanan
> Assignee: sivabalan narayanan
> Priority: Major
>
> compaction commit metadata is not going to have the deltastreamer checkpoint key. so, when a concurrent writer is trying to copy over deltastreamer checkpoint, it should skip compaction metadata and look at previous instants.
> //possible fix in TransactionUtils
> {code:java}
> public static Option<Pair<HoodieInstant, Map<String, String>>> getLastCompletedTxnInstantAndMetadata(
> HoodieTableMetaClient metaClient) {
> List<HoodieInstant> hoodieInstants = metaClient.getActiveTimeline().getCommitsTimeline()
> .filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
> if (!hoodieInstants.isEmpty()) {
> for (HoodieInstant hoodieInstant : hoodieInstants) {
> try {
> switch (hoodieInstant.getAction()) {
> case HoodieTimeline.REPLACE_COMMIT_ACTION:
> HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
> .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
> return Option.of(Pair.of(hoodieInstant, replaceCommitMetadata.getExtraMetadata()));
> case HoodieTimeline.DELTA_COMMIT_ACTION:
> case HoodieTimeline.COMMIT_ACTION:
> HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
> .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
> if (!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)) { // skip compaction instants
> return Option.of(Pair.of(hoodieInstant, commitMetadata.getExtraMetadata()));
> } else {
> LOG.warn("Skipping compaction instants to read latest metadata");
> }
> break;
> default:
> throw new IllegalArgumentException("Unknown instant action" + hoodieInstant.getAction());
> }
> } catch (IOException io) {
> throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstant, io);
> }
> }
> return Option.empty();
> } else {
> return Option.empty();
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)