You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/09/15 04:29:35 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #3651: [HUDI-2422] Adding rollback plan and rollback requested instant

vinothchandar commented on a change in pull request #3651:
URL: https://github.com/apache/hudi/pull/3651#discussion_r708817513



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -590,6 +590,10 @@ public boolean rollback(final String commitInstantTime) throws HoodieRollbackExc
           .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
           .findFirst());
       if (commitInstantOpt.isPresent()) {
+        LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
+        createTable(config, hadoopConf)

Review comment:
       do something with the return value?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMarkerBasedRollbackStrategy.java
##########
@@ -18,59 +18,60 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkers;
 
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
-@SuppressWarnings("checkstyle:LineLength")
-public class JavaMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
-  public JavaMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-                                         HoodieEngineContext context,
-                                         HoodieWriteConfig config,
-                                         String instantTime) {
+import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+
+public class BaseMarkerBasedRollbackStrategy<T extends HoodieRecordPayload, I, K, O> extends AbstractMarkerBasedRollbackStrategy {
+
+  public BaseMarkerBasedRollbackStrategy(HoodieTable<T, I, K, O> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
     super(table, context, config, instantTime);
   }
 
   @Override
-  public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
+  public List<HoodieRollbackRequest> getRollbackRequest(HoodieInstant instantToRollback) {
     try {
       List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
           table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
-      List<HoodieRollbackStat> rollbackStats = context.map(markerPaths, markerFilePath -> {
+      List<HoodieRollbackRequest> rollbackRequests = new ArrayList<>();
+      int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
+      context.foreach(markerPaths, markerFilePath -> {
         String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
         IOType type = IOType.valueOf(typeStr);
         switch (type) {
           case MERGE:
-            return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
-          case APPEND:
-            return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
           case CREATE:
-            return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
+            String fileToDelete = WriteMarkers.stripMarkerSuffix(markerFilePath);
+            Path fullDeletePath = new Path(basePath, fileToDelete);
+            String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());

Review comment:
       We should move any helper that is not actually doing anything with an fs instance to somethihg like PathUtils

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/HoodieRollbackRequestInternal.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HoodieRollbackRequestInternal {

Review comment:
       SerializableRollbackRequest

##########
File path: hudi-common/src/main/avro/HoodieRollbackPlan.avsc
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "namespace": "org.apache.hudi.avro.model",
+  "type": "record",
+  "name": "HoodieRollbackPlan",
+  "fields": [
+     {
+           "name": "instantToRollback",
+           "doc": "Hoodie instant that needs to be rolled back",
+           "default": null,
+           "type": "HoodieInstantInfo"
+    },
+    {
+      "name": "RollbackRequests",
+      "type":["null", {
+                "type":"array",
+                "items":{
+                 "type": "record",
+                         "name": "HoodieRollbackRequest",
+                         "fields": [
+                            {"name": "partitionPath", "type": "string"},
+                            {"name": "fileId",
+                              "type":["null", "string"],
+                              "default": null
+                             },
+                            {"name": "latestBaseInstant",
+                              "type":["null", "string"],
+                              "default": null
+                            },
+                            {"name": "filesToBeDeleted",
+                             "default": [],
+                             "type": {
+                                       "type": "array",
+                                       "default": [],
+                                       "items": "string"
+                                    }
+                            },
+                            {"name": "logFilesToBeDeleted",

Review comment:
       rename this? log blocks to rollback? or sth?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -65,83 +63,32 @@ public AbstractMarkerBasedRollbackStrategy(HoodieTable<T, I, K, O> table, Hoodie
     this.instantTime = instantTime;
   }
 
-  protected HoodieRollbackStat undoMerge(String mergedBaseFilePath) throws IOException {
-    LOG.info("Rolling back by deleting the merged base file:" + mergedBaseFilePath);
-    return deleteBaseFile(mergedBaseFilePath);
-  }
-
-  protected HoodieRollbackStat undoCreate(String createdBaseFilePath) throws IOException {
-    LOG.info("Rolling back by deleting the created base file:" + createdBaseFilePath);
-    return deleteBaseFile(createdBaseFilePath);
-  }
-
-  private HoodieRollbackStat deleteBaseFile(String baseFilePath) throws IOException {
-    Path fullDeletePath = new Path(basePath, baseFilePath);
-    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
-    boolean isDeleted = table.getMetaClient().getFs().delete(fullDeletePath);
-    return HoodieRollbackStat.newBuilder()
-        .withPartitionPath(partitionPath)
-        .withDeletedFileResult(baseFilePath, isDeleted)
-        .build();
-  }
-
-  protected HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant instantToRollback) throws IOException, InterruptedException {
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String appendBaseFilePath) throws IOException {
     Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
     String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
-    final Map<FileStatus, Long> writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
-
-    HoodieLogFormat.Writer writer = null;
-    try {
-      Path partitionFullPath = FSUtils.getPartitionPath(basePath, partitionPath);
-
-      if (!table.getMetaClient().getFs().exists(partitionFullPath)) {
-        return HoodieRollbackStat.newBuilder()
-            .withPartitionPath(partitionPath)
-            .build();
-      }
-      writer = HoodieLogFormat.newWriterBuilder()
-          .onParentPath(partitionFullPath)
-          .withFileId(fileId)
-          .overBaseCommit(baseCommitTime)
-          .withFs(table.getMetaClient().getFs())
-          .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
-      // generate metadata
-      Map<HoodieLogBlock.HeaderMetadataType, String> header = RollbackUtils.generateHeader(instantToRollback.getTimestamp(), instantTime);
-      // if update belongs to an existing log file
-      writer.appendBlock(new HoodieCommandBlock(header));
-    } finally {
-      try {
-        if (writer != null) {
-          writer.close();
-        }
-      } catch (IOException io) {
-        throw new HoodieIOException("Error closing append of rollback block..", io);
-      }
+    Map<FileStatus, Long> writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
+    Map<String, Long> writtenLogFileStrSizeMap = new HashMap<>();
+    for (Map.Entry<FileStatus, Long> entry : writtenLogFileSizeMap.entrySet()) {
+      writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(), entry.getValue());
     }
-
-    // the information of files appended to is required for metadata sync
-    Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
-          table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
-          1L);
-
-    return HoodieRollbackStat.newBuilder()
-        .withPartitionPath(partitionPath)
-        .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-        .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build();
+    return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime, Collections.emptyList(), writtenLogFileStrSizeMap);
   }
 
   /**
    * Returns written log file size map for the respective baseCommitTime to assist in metadata table syncing.
-   * @param partitionPath partition path of interest
+   *
+   * @param partitionPath  partition path of interest
    * @param baseCommitTime base commit time of interest
-   * @param fileId fileId of interest
+   * @param fileId         fileId of interest
    * @return Map<FileStatus, File size>
    * @throws IOException
    */
   protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPath, String baseCommitTime, String fileId) throws IOException {
-    return Collections.EMPTY_MAP;
+    // collect all log files that is supposed to be deleted with this rollback
+    return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),

Review comment:
       should this go through metadata table for listing? would not doing so cause any issues? i.e we may log the rollback some place thats not reachable from metadata table and thus fail to interpret the rollback command

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteListingBasedRollbackStrategy.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+public class SparkCopyOnWriteListingBasedRollbackStrategy extends ListingBasedRollbackStrategy {
+
+  public SparkCopyOnWriteListingBasedRollbackStrategy(HoodieEngineContext context,
+                                                      HoodieWriteConfig config,
+                                                      HoodieTable table,
+                                                      String instantTime) {
+    super(table, context, config, instantTime);
+  }
+
+  @Override
+  public List<HoodieRollbackRequest> getRollbackRequest(HoodieInstant instantToRollback) {
+    List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
+        table.getMetaClient().getBasePath(), config);
+    List<HoodieRollbackRequest> listingBasedRollbackRequests = new SparkListingBasedRollbackHelper(table.getMetaClient(), config)
+        .getRollbackRequestsForRollbackPlan(context, instantToRollback, rollbackRequests);
+    return listingBasedRollbackRequests;
+  }
+}

Review comment:
       just have one class for COW and MOR ? and branch internally using table type?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
-
-/**
- * Performs Rollback of Hoodie Tables.
- */
-public class ListingBasedRollbackHelper implements Serializable {

Review comment:
       can bunch of this live in client-common?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
##########
@@ -92,30 +88,74 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
     }
   }
 
-  protected abstract RollbackStrategy getRollbackStrategy();
+  /**
+   * Execute actual rollback and fetch list of RollbackStats.
+   * @param hoodieRollbackPlan instance of {@link HoodieRollbackPlan} that needs to be executed.
+   * @return a list of {@link HoodieRollbackStat}s.
+   * @throws IOException
+   */
+  protected abstract List<HoodieRollbackStat> executeRollback(HoodieRollbackPlan hoodieRollbackPlan) throws IOException;
+
+  private HoodieRollbackMetadata runRollback(HoodieTable<T, I, K, O> table, HoodieInstant rollbackInstant, HoodieRollbackPlan rollbackPlan) {
+    ValidationUtils.checkArgument(rollbackInstant.getState().equals(HoodieInstant.State.REQUESTED)
+        || rollbackInstant.getState().equals(HoodieInstant.State.INFLIGHT));
+    try {
+      final HoodieInstant inflightInstant;
+      final HoodieTimer timer = new HoodieTimer();
+      timer.startTimer();
+      if (rollbackInstant.isRequested()) {
+        inflightInstant = table.getActiveTimeline().transitionRollbackRequestedToInflight(rollbackInstant,
+            TimelineMetadataUtils.serializeRollbackPlan(rollbackPlan));
+      } else {
+        inflightInstant = rollbackInstant;
+      }
+
+      HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
+      List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackPlan);
+      HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
+          instantTime,
+          Option.of(rollbackTimer.endTimer()),
+          Collections.singletonList(instantToRollback),
+          stats);
+      if (!skipTimelinePublish) {
+        finishRollback(inflightInstant, rollbackMetadata);
+      }
 
-  protected abstract List<HoodieRollbackStat> executeRollback() throws IOException;
+      // Finally, remove the markers post rollback.
+      WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
+          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
 
-  protected abstract List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback);
+      return rollbackMetadata;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to rollback commit ", e);
+    }
+  }
 
   @Override
   public HoodieRollbackMetadata execute() {
-    HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
-    List<HoodieRollbackStat> stats = doRollbackAndGetStats();
-    HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
-        instantTime,
-        Option.of(rollbackTimer.endTimer()),
-        Collections.singletonList(instantToRollback),
-        stats);
-    if (!skipTimelinePublish) {
-      finishRollback(rollbackMetadata);
+    table.getMetaClient().reloadActiveTimeline();
+    List<HoodieInstant> rollBackInstants = table.getRollbackTimeline()
+        .filterInflightsAndRequested().getInstants().collect(Collectors.toList());
+    if (rollBackInstants.isEmpty()) {
+      throw new HoodieRollbackException("No Requested Rollback Instants found to execute rollback ");
+    }
+    HoodieInstant rollbackInstant = null;
+    for (HoodieInstant instant : rollBackInstants) {
+      if (instantTime.equals(instant.getTimestamp())) {
+        rollbackInstant = instant;
+        break;
+      }
+    }
+    if (rollbackInstant != null) {
+      try {
+        HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(table.getMetaClient(), rollbackInstant);
+        return runRollback(table, rollBackInstants.get(0), rollbackPlan);
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);

Review comment:
       add more context to the exception?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java
##########
@@ -88,4 +89,12 @@ public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
     LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
     return stats;
   }
+
+  /**
+   * Execute rollback and fetch rollback stats.
+   * @param instantToRollback instant to be rolled back.
+   * @param rollbackPlan instance of {@link HoodieRollbackPlan} for which rollback needs to be executed.
+   * @return list of {@link HoodieRollbackStat}s.
+   */
+  abstract List<HoodieRollbackStat> rollbackAndGetStats(HoodieInstant instantToRollback, HoodieRollbackPlan rollbackPlan);

Review comment:
       executeRollback?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org