You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2021/11/16 15:06:00 UTC

[jira] [Created] (HUDI-2773) Deltastreamer checkpoint copy over does not ignore compaction metadata

sivabalan narayanan created HUDI-2773:
-----------------------------------------

             Summary: Deltastreamer checkpoint copy over does not ignore compaction metadata
                 Key: HUDI-2773
                 URL: https://issues.apache.org/jira/browse/HUDI-2773
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: sivabalan narayanan


compaction commit metadata is not going to have the deltastreamer checkpoint key. so, when a concurrent writer is trying to copy over deltastreamer checkpoint, it should skip compaction metadata and look at previous instants. 

//possible fix in TransactionUtils
{code:java}
public static Option<Pair<HoodieInstant, Map<String, String>>> getLastCompletedTxnInstantAndMetadata(
    HoodieTableMetaClient metaClient) {
  List<HoodieInstant> hoodieInstants = metaClient.getActiveTimeline().getCommitsTimeline()
      .filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
  if (!hoodieInstants.isEmpty()) {
    for (HoodieInstant hoodieInstant : hoodieInstants) {
      try {
        switch (hoodieInstant.getAction()) {
          case HoodieTimeline.REPLACE_COMMIT_ACTION:
            HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
                .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
            return Option.of(Pair.of(hoodieInstant, replaceCommitMetadata.getExtraMetadata()));
          case HoodieTimeline.DELTA_COMMIT_ACTION:
          case HoodieTimeline.COMMIT_ACTION:
            HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
                .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
            if (!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)) { // skip compaction instants
              return Option.of(Pair.of(hoodieInstant, commitMetadata.getExtraMetadata()));
            } else {
              LOG.warn("Skipping compaction instants to read latest metadata");
            }
            break;
          default:
            throw new IllegalArgumentException("Unknown instant action" + hoodieInstant.getAction());
        }
      } catch (IOException io) {
        throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstant, io);
      }
    }
    return Option.empty();
  } else {
    return Option.empty();
  }

} {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)