You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2022/03/03 15:36:00 UTC

[jira] [Closed] (HUDI-3346) Ignore non existant temp marker dir for a commit while downgrading from 2 to 1 table version

     [ https://issues.apache.org/jira/browse/HUDI-3346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sivabalan narayanan closed HUDI-3346.
-------------------------------------
    Resolution: Fixed

> Ignore non existant temp marker dir for a commit while downgrading from 2 to 1 table version
> --------------------------------------------------------------------------------------------
>
>                 Key: HUDI-3346
>                 URL: https://issues.apache.org/jira/browse/HUDI-3346
>             Project: Apache Hudi
>          Issue Type: Task
>          Components: writer-core
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.11.0
>
>
> While downgrading from 2 to 1, if for a commit, there is no marker directory, an exception is thrown. 
> We should silently ignore it. 
>  
> Code of interest in TwoToOneDowngradeHandler
> {code:java}
> private void convertToDirectMarkers(final String commitInstantTime,
>                                     HoodieTable table,
>                                     HoodieEngineContext context,
>                                     int parallelism) throws IOException {
>   String markerDir = table.getMetaClient().getMarkerFolderPath(commitInstantTime);
>   FileSystem fileSystem = FSUtils.getFs(markerDir, context.getHadoopConf().newCopy());
>   Option<MarkerType> markerTypeOption = MarkerUtils.readMarkerType(fileSystem, markerDir);
>   if (markerTypeOption.isPresent()) {
>     switch (markerTypeOption.get()) {
>       case TIMELINE_SERVER_BASED:
>         // Reads all markers written by the timeline server
>         Map<String, Set<String>> markersMap =
>             MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
>                 markerDir, fileSystem, context, parallelism);
>         DirectWriteMarkers directWriteMarkers = new DirectWriteMarkers(table, commitInstantTime);
>         // Recreates the markers in the direct format
>         markersMap.values().stream().flatMap(Collection::stream)
>             .forEach(directWriteMarkers::create);
>         // Deletes marker type file
>         MarkerUtils.deleteMarkerTypeFile(fileSystem, markerDir);
>         // Deletes timeline server based markers
>         deleteTimelineBasedMarkerFiles(context, markerDir, fileSystem, parallelism);
>         break;
>       default:
>         throw new HoodieException("The marker type \"" + markerTypeOption.get().name()
>             + "\" is not supported for rollback.");
>     }
>   } else {
>     // In case of partial failures during downgrade, there is a chance that marker type file was deleted,
>     // but timeline server based marker files are left.  So deletes them if any
>     deleteTimelineBasedMarkerFiles(context, markerDir, fileSystem, parallelism);
>   }
> } {code}
> else block in above.
>  
> {code:java}
> private void deleteTimelineBasedMarkerFiles(HoodieEngineContext context, String markerDir,
>                                             FileSystem fileSystem, int parallelism) throws IOException {
>   // Deletes timeline based marker files if any.
>   Predicate<FileStatus> prefixFilter = fileStatus ->
>       fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX);
>   FSUtils.parallelizeSubPathProcess(context, fileSystem, new Path(markerDir), parallelism,
>           prefixFilter, pairOfSubPathAndConf ->
>                   FSUtils.deleteSubPath(pairOfSubPathAndConf.getKey(), pairOfSubPathAndConf.getValue(), false));
> } {code}
> fix: in the else block in first snippet, we should call deleteTimelineBasedMarkerFiles only if marker dir for the commit exists. 
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)