You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2021/11/16 15:07:00 UTC

[jira] [Assigned] (HUDI-2773) Deltastreamer checkpoint copy over does not ignore compaction metadata

     [ https://issues.apache.org/jira/browse/HUDI-2773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sivabalan narayanan reassigned HUDI-2773:
-----------------------------------------

    Assignee: sivabalan narayanan

> Deltastreamer checkpoint copy over does not ignore compaction metadata
> ----------------------------------------------------------------------
>
>                 Key: HUDI-2773
>                 URL: https://issues.apache.org/jira/browse/HUDI-2773
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Major
>
> compaction commit metadata is not going to have the deltastreamer checkpoint key. so, when a concurrent writer is trying to copy over deltastreamer checkpoint, it should skip compaction metadata and look at previous instants. 
> //possible fix in TransactionUtils
> {code:java}
> public static Option<Pair<HoodieInstant, Map<String, String>>> getLastCompletedTxnInstantAndMetadata(
>     HoodieTableMetaClient metaClient) {
>   List<HoodieInstant> hoodieInstants = metaClient.getActiveTimeline().getCommitsTimeline()
>       .filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
>   if (!hoodieInstants.isEmpty()) {
>     for (HoodieInstant hoodieInstant : hoodieInstants) {
>       try {
>         switch (hoodieInstant.getAction()) {
>           case HoodieTimeline.REPLACE_COMMIT_ACTION:
>             HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
>                 .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
>             return Option.of(Pair.of(hoodieInstant, replaceCommitMetadata.getExtraMetadata()));
>           case HoodieTimeline.DELTA_COMMIT_ACTION:
>           case HoodieTimeline.COMMIT_ACTION:
>             HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
>                 .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
>             if (!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)) { // skip compaction instants
>               return Option.of(Pair.of(hoodieInstant, commitMetadata.getExtraMetadata()));
>             } else {
>               LOG.warn("Skipping compaction instants to read latest metadata");
>             }
>             break;
>           default:
>             throw new IllegalArgumentException("Unknown instant action" + hoodieInstant.getAction());
>         }
>       } catch (IOException io) {
>         throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstant, io);
>       }
>     }
>     return Option.empty();
>   } else {
>     return Option.empty();
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)