You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/10/07 00:20:59 UTC

[hudi] branch master updated: [HUDI-2513] Refactor table upgrade and downgrade actions in hudi-client module (#3743)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e15217  [HUDI-2513] Refactor table upgrade and downgrade actions in hudi-client module (#3743)
2e15217 is described below

commit 2e152177fb9a1138e46d21f4608fea41cc32af15
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Wed Oct 6 17:20:41 2021 -0700

    [HUDI-2513] Refactor table upgrade and downgrade actions in hudi-client module (#3743)
---
 .../org/apache/hudi/cli/commands/SparkMain.java    |   6 +-
 .../table/upgrade/AbstractUpgradeDowngrade.java    | 164 -----------------
 .../table/upgrade/BaseUpgradeDowngradeHelper.java  |  42 +++++
 .../hudi/table/upgrade/DowngradeHandler.java       |  11 +-
 ...adeHandler.java => OneToTwoUpgradeHandler.java} |  13 +-
 ...Handler.java => OneToZeroDowngradeHandler.java} |  13 +-
 .../table/upgrade/ThreeToTwoDowngradeHandler.java  |  15 +-
 ...eHandler.java => TwoToOneDowngradeHandler.java} |  20 +--
 .../table/upgrade/TwoToThreeUpgradeHandler.java    |  15 +-
 .../hudi/table/upgrade/UpgradeDowngrade.java       | 198 +++++++++++++++++++++
 .../apache/hudi/table/upgrade/UpgradeHandler.java  |  11 +-
 ...deHandler.java => ZeroToOneUpgradeHandler.java} |  38 ++--
 .../hudi/client/FlinkTaskContextSupplier.java      |   2 +-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   9 +-
 .../hudi/table/upgrade/FlinkUpgradeDowngrade.java  |  71 --------
 .../table/upgrade/FlinkUpgradeDowngradeHelper.java |  53 ++++++
 .../hudi/table/upgrade/OneToTwoUpgradeHandler.java |  30 ----
 .../table/upgrade/OneToZeroDowngradeHandler.java   |  36 ----
 .../table/upgrade/ThreeToTwoDowngradeHandler.java  |  44 -----
 .../table/upgrade/TwoToOneDowngradeHandler.java    |  32 ----
 .../table/upgrade/TwoToThreeUpgradeHandler.java    |  43 -----
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |  54 ------
 .../apache/hudi/client/SparkRDDWriteClient.java    |  14 +-
 .../hudi/table/upgrade/OneToTwoUpgradeHandler.java |  33 ----
 .../table/upgrade/OneToZeroDowngradeHandler.java   |  35 ----
 .../hudi/table/upgrade/SparkUpgradeDowngrade.java  |  75 --------
 .../table/upgrade/SparkUpgradeDowngradeHelper.java |  52 ++++++
 .../table/upgrade/TwoToOneDowngradeHandler.java    |  35 ----
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |  53 ------
 .../functional/TestHoodieBackedMetadata.java       |   6 +-
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   |  13 +-
 31 files changed, 456 insertions(+), 780 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index f86937d..82688fe 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -38,7 +38,8 @@ import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
-import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
+import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
+import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 import org.apache.hudi.utilities.HDFSParquetImporter;
 import org.apache.hudi.utilities.HDFSParquetImporter.Config;
 import org.apache.hudi.utilities.HoodieCleaner;
@@ -453,7 +454,8 @@ public class SparkMain {
             .setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
             .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
     try {
-      new SparkUpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc)).run(metaClient, HoodieTableVersion.valueOf(toVersion), config, new HoodieSparkEngineContext(jsc), null);
+      new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
+          .run(HoodieTableVersion.valueOf(toVersion), null);
       LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion));
       return 0;
     } catch (Exception e) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/AbstractUpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/AbstractUpgradeDowngrade.java
deleted file mode 100644
index 0a74689..0000000
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/AbstractUpgradeDowngrade.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.config.HoodieWriteConfig;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Helper class to assist in upgrading/downgrading Hoodie when there is a version change.
- */
-public abstract class AbstractUpgradeDowngrade {
-
-  private static final Logger LOG = LogManager.getLogger(AbstractUpgradeDowngrade.class);
-  public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated";
-
-  private HoodieTableMetaClient metaClient;
-  protected HoodieWriteConfig config;
-  protected HoodieEngineContext context;
-  private transient FileSystem fs;
-  private Path updatedPropsFilePath;
-  private Path propsFilePath;
-
-  /**
-   * Perform Upgrade or Downgrade steps if required and updated table version if need be.
-   * <p>
-   * Starting from version 0.6.0, this upgrade/downgrade step will be added in all write paths.
-   *
-   * Essentially, if a dataset was created using any pre 0.6.0(for eg 0.5.3), and Hoodie version was upgraded to 0.6.0,
-   * Hoodie table version gets bumped to 1 and there are some upgrade steps need to be executed before doing any writes.
-   * Similarly, if a dataset was created using Hoodie version 0.6.0 or Hoodie table version 1 and then hoodie was downgraded
-   * to pre 0.6.0 or to Hoodie table version 0, then some downgrade steps need to be executed before proceeding w/ any writes.
-   *
-   * On a high level, these are the steps performed
-   *
-   * Step1 : Understand current hoodie table version and table version from hoodie.properties file
-   * Step2 : Delete any left over .updated from previous upgrade/downgrade
-   * Step3 : If version are different, perform upgrade/downgrade.
-   * Step4 : Copy hoodie.properties -> hoodie.properties.updated with the version updated
-   * Step6 : Rename hoodie.properties.updated to hoodie.properties
-   * </p>
-   *
-   * @param metaClient instance of {@link HoodieTableMetaClient} to use
-   * @param toVersion version to which upgrade or downgrade has to be done.
-   * @param config instance of {@link HoodieWriteConfig} to use.
-   * @param context instance of {@link HoodieEngineContext} to use.
-   * @param instantTime current instant time that should not be touched.
-   */
-  public abstract void run(HoodieTableMetaClient metaClient, HoodieTableVersion toVersion, HoodieWriteConfig config,
-                         HoodieEngineContext context, String instantTime);
-
-  public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) {
-    HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
-    // Ensure no inflight commits & versions are same
-    return toVersion.versionCode() != fromVersion.versionCode();
-  }
-
-  protected AbstractUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) {
-    this.metaClient = metaClient;
-    this.config = config;
-    this.context = context;
-    this.fs = metaClient.getFs();
-    this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE);
-    this.propsFilePath = new Path(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
-  }
-
-  protected void run(HoodieTableVersion toVersion, String instantTime) throws IOException {
-    // Fetch version from property file and current version
-    HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
-    if (!needsUpgradeOrDowngrade(toVersion)) {
-      return;
-    }
-
-    if (fs.exists(updatedPropsFilePath)) {
-      // this can be left over .updated file from a failed attempt before. Many cases exist here.
-      // a) We failed while writing the .updated file and it's content is partial (e.g hdfs)
-      // b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway
-      // c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file
-      // All cases, it simply suffices to delete the file and proceed.
-      LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath)));
-      fs.delete(updatedPropsFilePath, false);
-    }
-
-    // Perform the actual upgrade/downgrade; this has to be idempotent, for now.
-    LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion);
-    Map<ConfigProperty, String> tableProps = new HashMap<>();
-    if (fromVersion.versionCode() < toVersion.versionCode()) {
-      // upgrade
-      while (fromVersion.versionCode() < toVersion.versionCode()) {
-        HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1);
-        tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
-        fromVersion = nextVersion;
-      }
-    } else {
-      // downgrade
-      while (fromVersion.versionCode() > toVersion.versionCode()) {
-        HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1);
-        tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
-        fromVersion = prevVersion;
-      }
-    }
-
-    // Write out the current version in hoodie.properties.updated file
-    for (Map.Entry<ConfigProperty, String> entry: tableProps.entrySet()) {
-      metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
-    }
-    metaClient.getTableConfig().setTableVersion(toVersion);
-    createUpdatedFile(metaClient.getTableConfig().getProps());
-
-    // because for different fs the fs.rename have different action,such as:
-    // a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false
-    // b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture
-    // c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException
-    // so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent
-    if (fs.exists(propsFilePath)) {
-      fs.delete(propsFilePath, false);
-    }
-    // Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores.
-    // But as long as this does not leave a partial hoodie.properties file, we are okay.
-    fs.rename(updatedPropsFilePath, propsFilePath);
-  }
-
-  private void createUpdatedFile(Properties props) throws IOException {
-    try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) {
-      props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
-    }
-  }
-
-  protected abstract Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime);
-
-  protected abstract Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime);
-}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseUpgradeDowngradeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseUpgradeDowngradeHelper.java
new file mode 100644
index 0000000..d3f157b
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseUpgradeDowngradeHelper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Interface for engine-specific logic needed for upgrade and downgrade actions.
+ */
+public interface BaseUpgradeDowngradeHelper {
+  /**
+   * @param config  Write config.
+   * @param context {@link HoodieEngineContext} instance to use.
+   * @return A new Hudi table for upgrade and downgrade actions.
+   */
+  HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
+
+  /**
+   * @param config Write config.
+   * @return partition columns in String.
+   */
+  String getPartitionColumns(HoodieWriteConfig config);
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
index 7501ed5..24b9d6f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
@@ -32,10 +32,13 @@ public interface DowngradeHandler {
   /**
    * to be invoked to downgrade hoodie table from one version to a lower version.
    *
-   * @param config instance of {@link HoodieWriteConfig} to be used.
-   * @param context instance of {@link HoodieEngineContext} to be used.
-   * @param instantTime current instant time that should not touched.
+   * @param config                 instance of {@link HoodieWriteConfig} to be used.
+   * @param context                instance of {@link HoodieEngineContext} to be used.
+   * @param instantTime            current instant time that should not touched.
+   * @param upgradeDowngradeHelper instance of {@link BaseUpgradeDowngradeHelper} to be used.
    * @return Map of config properties and its values to be added to table properties.
    */
-  Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime);
+  Map<ConfigProperty, String> downgrade(
+      HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
+      BaseUpgradeDowngradeHelper upgradeDowngradeHelper);
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToTwoUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java
similarity index 77%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToTwoUpgradeHandler.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java
index e3a14e4..dddd5f4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToTwoUpgradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java
@@ -27,16 +27,19 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import java.util.HashMap;
 import java.util.Map;
 
-public abstract class BaseOneToTwoUpgradeHandler implements UpgradeHandler {
+/**
+ * Upgrade handle to assist in upgrading hoodie table from version 1 to 2.
+ */
+public class OneToTwoUpgradeHandler implements UpgradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
+  public Map<ConfigProperty, String> upgrade(
+      HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
+      BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
     Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
-    tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, getPartitionColumns(config));
+    tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, upgradeDowngradeHelper.getPartitionColumns(config));
     tablePropsToAdd.put(HoodieTableConfig.RECORDKEY_FIELDS, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()));
     tablePropsToAdd.put(HoodieTableConfig.BASE_FILE_FORMAT, config.getString(HoodieTableConfig.BASE_FILE_FORMAT));
     return tablePropsToAdd;
   }
-
-  abstract String getPartitionColumns(HoodieWriteConfig config);
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToZeroDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
similarity index 82%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToZeroDowngradeHandler.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
index 5997e18..e6051cf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToZeroDowngradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
@@ -32,12 +32,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public abstract class BaseOneToZeroDowngradeHandler implements DowngradeHandler {
+/**
+ * Downgrade handle to assist in downgrading hoodie table from version 1 to 0.
+ */
+public class OneToZeroDowngradeHandler implements DowngradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
+  public Map<ConfigProperty, String> downgrade(
+      HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
+      BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
     // fetch pending commit info
-    HoodieTable table = getTable(config, context);
     HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
     List<HoodieInstant> commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList());
     for (HoodieInstant inflightInstant : commits) {
@@ -47,6 +52,4 @@ public abstract class BaseOneToZeroDowngradeHandler implements DowngradeHandler
     }
     return Collections.EMPTY_MAP;
   }
-
-  abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
similarity index 75%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
index 9211144..964859c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
@@ -7,13 +7,14 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.table.upgrade;
@@ -32,7 +33,7 @@ import java.util.Map;
 public class ThreeToTwoDowngradeHandler implements DowngradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
+  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
     if (config.isMetadataTableEnabled()) {
       // Metadata Table in version 3 is synchronous and in version 2 is asynchronous. Downgrading to asynchronous
       // removes the checks in code to decide whether to use a LogBlock or not. Also, the schema for the
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
similarity index 90%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
index 621711a..ee638a1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
@@ -46,12 +46,16 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX;
-
-public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
+/**
+ * Downgrade handler to assist in downgrading hoodie table from version 2 to 1.
+ */
+public class TwoToOneDowngradeHandler implements DowngradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
-    HoodieTable table = getTable(config, context);
+  public Map<ConfigProperty, String> downgrade(
+      HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
+      BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
     HoodieTableMetaClient metaClient = table.getMetaClient();
 
     // re-create marker files if any partial timeline server based markers are found
@@ -69,8 +73,6 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
     return Collections.EMPTY_MAP;
   }
 
-  abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
-
   /**
    * Converts the markers in new format(timeline server based) to old format of direct markers,
    * i.e., one marker file per data file, without MARKERS.type file.
@@ -106,8 +108,7 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
           // Deletes marker type file
           MarkerUtils.deleteMarkerTypeFile(fileSystem, markerDir);
           // Deletes timeline server based markers
-          deleteTimelineBasedMarkerFiles(
-              context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism());
+          deleteTimelineBasedMarkerFiles(context, markerDir, fileSystem, parallelism);
           break;
         default:
           throw new HoodieException("The marker type \"" + markerTypeOption.get().name()
@@ -116,8 +117,7 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
     } else {
       // In case of partial failures during downgrade, there is a chance that marker type file was deleted,
       // but timeline server based marker files are left.  So deletes them if any
-      deleteTimelineBasedMarkerFiles(
-          context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism());
+      deleteTimelineBasedMarkerFiles(context, markerDir, fileSystem, parallelism);
     }
   }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
similarity index 75%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
index 278e413..6a825e1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
@@ -7,13 +7,14 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.table.upgrade;
@@ -31,7 +32,7 @@ import java.util.Map;
  */
 public class TwoToThreeUpgradeHandler implements UpgradeHandler {
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
+  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
     if (config.isMetadataTableEnabled()) {
       // Metadata Table in version 2 is asynchronous and in version 3 is synchronous. Synchronous table will not
       // sync any instants not already synced. So its simpler to re-bootstrap the table. Also, the schema for the
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
new file mode 100644
index 0000000..c5ae043
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Helper class to assist in upgrading/downgrading Hoodie when there is a version change.
+ */
+public class UpgradeDowngrade {
+
+  private static final Logger LOG = LogManager.getLogger(UpgradeDowngrade.class);
+  public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated";
+
+  private final BaseUpgradeDowngradeHelper upgradeDowngradeHelper;
+  private HoodieTableMetaClient metaClient;
+  protected HoodieWriteConfig config;
+  protected HoodieEngineContext context;
+  private transient FileSystem fs;
+  private Path updatedPropsFilePath;
+  private Path propsFilePath;
+
+  public UpgradeDowngrade(
+      HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context,
+      BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
+    this.metaClient = metaClient;
+    this.config = config;
+    this.context = context;
+    this.fs = metaClient.getFs();
+    this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE);
+    this.propsFilePath = new Path(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+    this.upgradeDowngradeHelper = upgradeDowngradeHelper;
+  }
+
+  public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) {
+    HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
+    // Ensure versions are same
+    return toVersion.versionCode() != fromVersion.versionCode();
+  }
+
+  /**
+   * Perform Upgrade or Downgrade steps if required and updated table version if need be.
+   * <p>
+   * Starting from version 0.6.0, this upgrade/downgrade step will be added in all write paths.
+   * <p>
+   * Essentially, if a dataset was created using an previous table version in an older release,
+   * and Hoodie version was upgraded to a new release with new table version supported,
+   * Hoodie table version gets bumped to the new version and there are some upgrade steps need
+   * to be executed before doing any writes.
+   * <p>
+   * Similarly, if a dataset was created using an newer table version in an newer release,
+   * and then hoodie was downgraded to an older release or to older Hoodie table version,
+   * then some downgrade steps need to be executed before proceeding w/ any writes.
+   * <p>
+   * Below shows the table version corresponding to the Hudi release:
+   * Hudi release -> table version
+   * pre 0.6.0 -> v0
+   * 0.6.0 to 0.8.0 -> v1
+   * 0.9.0 -> v2
+   * 0.10.0 to current -> v3
+   * <p>
+   * On a high level, these are the steps performed
+   * <p>
+   * Step1 : Understand current hoodie table version and table version from hoodie.properties file
+   * Step2 : Delete any left over .updated from previous upgrade/downgrade
+   * Step3 : If version are different, perform upgrade/downgrade.
+   * Step4 : Copy hoodie.properties -> hoodie.properties.updated with the version updated
+   * Step6 : Rename hoodie.properties.updated to hoodie.properties
+   * </p>
+   *
+   * @param toVersion   version to which upgrade or downgrade has to be done.
+   * @param instantTime current instant time that should not be touched.
+   */
+  public void run(HoodieTableVersion toVersion, String instantTime) {
+    try {
+      // Fetch version from property file and current version
+      HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
+      if (!needsUpgradeOrDowngrade(toVersion)) {
+        return;
+      }
+
+      if (fs.exists(updatedPropsFilePath)) {
+        // this can be left over .updated file from a failed attempt before. Many cases exist here.
+        // a) We failed while writing the .updated file and it's content is partial (e.g hdfs)
+        // b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway
+        // c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file
+        // All cases, it simply suffices to delete the file and proceed.
+        LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath)));
+        fs.delete(updatedPropsFilePath, false);
+      }
+
+      // Perform the actual upgrade/downgrade; this has to be idempotent, for now.
+      LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion);
+      Map<ConfigProperty, String> tableProps = new HashMap<>();
+      if (fromVersion.versionCode() < toVersion.versionCode()) {
+        // upgrade
+        while (fromVersion.versionCode() < toVersion.versionCode()) {
+          HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1);
+          tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
+          fromVersion = nextVersion;
+        }
+      } else {
+        // downgrade
+        while (fromVersion.versionCode() > toVersion.versionCode()) {
+          HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1);
+          tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
+          fromVersion = prevVersion;
+        }
+      }
+
+      // Write out the current version in hoodie.properties.updated file
+      for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
+        metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
+      }
+      metaClient.getTableConfig().setTableVersion(toVersion);
+      createUpdatedFile(metaClient.getTableConfig().getProps());
+
+      // because for different fs the fs.rename have different action,such as:
+      // a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false
+      // b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture
+      // c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException
+      // so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent
+      if (fs.exists(propsFilePath)) {
+        fs.delete(propsFilePath, false);
+      }
+      // Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores.
+      // But as long as this does not leave a partial hoodie.properties file, we are okay.
+      fs.rename(updatedPropsFilePath, propsFilePath);
+    } catch (IOException e) {
+      throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e);
+    }
+  }
+
+  private void createUpdatedFile(Properties props) throws IOException {
+    try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) {
+      props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
+    }
+  }
+
+  protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
+    if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) {
+      return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
+    } else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) {
+      return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
+    } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) {
+      return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
+    } else {
+      throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
+    }
+  }
+
+  protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
+    if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) {
+      return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
+    } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) {
+      return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
+    } else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) {
+      return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
+    } else {
+      throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java
index 8ca6f0e..9dc477f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java
@@ -32,10 +32,13 @@ public interface UpgradeHandler {
   /**
    * to be invoked to upgrade hoodie table from one version to a higher version.
    *
-   * @param config instance of {@link HoodieWriteConfig} to be used.
-   * @param context instance of {@link HoodieEngineContext} to be used.
-   * @param instantTime current instant time that should not be touched.
+   * @param config                 instance of {@link HoodieWriteConfig} to be used.
+   * @param context                instance of {@link HoodieEngineContext} to be used.
+   * @param instantTime            current instant time that should not be touched.
+   * @param upgradeDowngradeHelper instance of {@link BaseUpgradeDowngradeHelper} to be used.
    * @return Map of config properties and its values to be added to table properties.
    */
-  Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime);
+  Map<ConfigProperty, String> upgrade(
+      HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
+      BaseUpgradeDowngradeHelper upgradeDowngradeHelper);
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseZeroToOneUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
similarity index 79%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseZeroToOneUpgradeHandler.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index f0e3e4f..18815b2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseZeroToOneUpgradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -33,6 +34,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
 import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
 import org.apache.hudi.table.action.rollback.RollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkers;
@@ -46,13 +49,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
+/**
+ * Upgrade handle to assist in upgrading hoodie table from version 0 to 1.
+ */
+public class ZeroToOneUpgradeHandler implements UpgradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
+  public Map<ConfigProperty, String> upgrade(
+      HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
+      BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
     // fetch pending commit info
-    //HoodieSparkTable table = HoodieSparkTable.create(config, context);
-    HoodieTable table = getTable(config, context);
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
     HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
     List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
         .collect(Collectors.toList());
@@ -67,8 +74,6 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
     return Collections.EMPTY_MAP;
   }
 
-  abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
-
   /**
    * Recreate markers in new format.
    * Step1: Delete existing markers
@@ -76,14 +81,14 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
    * Step3: recreate markers for all interested files.
    *
    * @param commitInstantTime instant of interest for which markers need to be recreated.
-   * @param table instance of {@link HoodieTable} to use
-   * @param context instance of {@link HoodieEngineContext} to use
+   * @param table             instance of {@link HoodieTable} to use
+   * @param context           instance of {@link HoodieEngineContext} to use
    * @throws HoodieRollbackException on any exception during upgrade.
    */
   protected void recreateMarkers(final String commitInstantTime,
-                                      HoodieTable table,
-                                      HoodieEngineContext context,
-                                      int parallelism) throws HoodieRollbackException {
+                                 HoodieTable table,
+                                 HoodieEngineContext context,
+                                 int parallelism) throws HoodieRollbackException {
     try {
       // fetch hoodie instant
       Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
@@ -121,9 +126,13 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
     }
   }
 
-  abstract List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-                                                                            HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt,
-                                                                            List<ListingBasedRollbackRequest> rollbackRequests);
+  List<HoodieRollbackStat> getListBasedRollBackStats(
+      HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context,
+      Option<HoodieInstant> commitInstantOpt, List<ListingBasedRollbackRequest> rollbackRequests) {
+    List<HoodieRollbackRequest> hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
+        .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
+    return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
+  }
 
   /**
    * Curates file name for marker from existing log file path.
@@ -131,6 +140,7 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
    * marker file format  : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND
    *
    * @param logFilePath log file path for which marker file name needs to be generated.
+   * @param table       {@link HoodieTable} instance to use
    * @return the marker file name thus curated.
    */
   private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
index aab248f..10c5ac5 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
@@ -61,5 +61,5 @@ public class FlinkTaskContextSupplier extends TaskContextSupplier {
     // no operation for now
     return Option.empty();
   }
-
+  
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index e279940..669be16 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -58,7 +58,8 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
-import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
+import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
+import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 import org.apache.hudi.util.FlinkClientUtil;
 
 import com.codahale.metrics.Timer;
@@ -383,7 +384,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   @Override
   protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
+    new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance())
+        .run(HoodieTableVersion.current(), instantTime);
     return getTableAndInitCtx(metaClient, operationType);
   }
 
@@ -395,7 +397,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
    */
   public void upgradeDowngrade(String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
+    new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance())
+        .run(HoodieTableVersion.current(), instantTime);
   }
 
   /**
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java
deleted file mode 100644
index 832db1d..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class FlinkUpgradeDowngrade extends AbstractUpgradeDowngrade {
-  public FlinkUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) {
-    super(metaClient, config, context);
-  }
-
-  @Override
-  public void run(HoodieTableMetaClient metaClient, HoodieTableVersion toVersion, HoodieWriteConfig config,
-                  HoodieEngineContext context, String instantTime) {
-    try {
-      new FlinkUpgradeDowngrade(metaClient, config, context).run(toVersion, instantTime);
-    } catch (IOException e) {
-      throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e);
-    }
-  }
-
-  @Override
-  protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
-    if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) {
-      return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime);
-    } else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) {
-      return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime);
-    }  else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) {
-      return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime);
-    }  else {
-      throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
-    }
-  }
-
-  @Override
-  protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
-    if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) {
-      return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime);
-    } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) {
-      return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime);
-    } else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) {
-      return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime);
-    } else {
-      throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
-    }
-  }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
new file mode 100644
index 0000000..d097d2e
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Flink upgrade and downgrade helper.
+ */
+public class FlinkUpgradeDowngradeHelper implements BaseUpgradeDowngradeHelper {
+
+  private static final FlinkUpgradeDowngradeHelper SINGLETON_INSTANCE =
+      new FlinkUpgradeDowngradeHelper();
+
+  private FlinkUpgradeDowngradeHelper() {
+  }
+
+  public static FlinkUpgradeDowngradeHelper getInstance() {
+    return SINGLETON_INSTANCE;
+  }
+
+  @Override
+  public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
+    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+  }
+
+  @Override
+  public String getPartitionColumns(HoodieWriteConfig config) {
+    return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java
deleted file mode 100644
index b84ce6d..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-
-public class OneToTwoUpgradeHandler extends BaseOneToTwoUpgradeHandler {
-
-  @Override
-  String getPartitionColumns(HoodieWriteConfig config) {
-    return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
-  }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
deleted file mode 100644
index 5d6e57e..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.table.HoodieTable;
-
-/**
- * Downgrade handle to assist in downgrading hoodie table from version 1 to 0.
- */
-public class OneToZeroDowngradeHandler extends BaseOneToZeroDowngradeHandler {
-
-  @Override
-  HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
-    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
-  }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
deleted file mode 100644
index e6b3c30..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Downgrade handler to assist in downgrading hoodie table from version 3 to 2.
- */
-public class ThreeToTwoDowngradeHandler implements DowngradeHandler {
-
-  @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
-    if (config.isMetadataTableEnabled()) {
-      // Metadata Table in version 3 is synchronous and in version 2 is asynchronous. Downgrading to asynchronous
-      // removes the checks in code to decide whether to use a LogBlock or not. Also, the schema for the
-      // table has been updated and is not forward compatible. Hence, we need to delete the table.
-      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
-    }
-    return Collections.emptyMap();
-  }
-}
\ No newline at end of file
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
deleted file mode 100644
index ec8098a..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.table.HoodieTable;
-
-public class TwoToOneDowngradeHandler extends BaseTwoToOneDowngradeHandler {
-  @Override
-  HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
-    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
-  }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
deleted file mode 100644
index 9f5644a..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * UpgradeHandler to assist in upgrading {@link org.apache.hudi.table.HoodieTable} from version 2 to 3.
- */
-public class TwoToThreeUpgradeHandler implements UpgradeHandler {
-  @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
-    if (config.isMetadataTableEnabled()) {
-      // Metadata Table in version 2 is asynchronous and in version 3 is synchronous. Synchronous table will not
-      // sync any instants not already synced. So its simpler to re-bootstrap the table. Also, the schema for the
-      // table has been updated and is not backward compatible.
-      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
-    }
-    return Collections.emptyMap();
-  }
-}
\ No newline at end of file
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
deleted file mode 100644
index 284d3bc..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.avro.model.HoodieRollbackRequest;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
-
-import java.util.List;
-
-/**
- * Upgrade handle to assist in upgrading hoodie table from version 0 to 1.
- */
-public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler {
-
-  @Override
-  HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
-    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
-  }
-
-  @Override
-  List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt,
-                                                     List<ListingBasedRollbackRequest> rollbackRequests) {
-    List<HoodieRollbackRequest> hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
-        .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
-    return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 1c5bdf5..dd9f43d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -53,8 +53,8 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.compact.SparkCompactHelpers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
-import org.apache.hudi.table.upgrade.AbstractUpgradeDowngrade;
-import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
+import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
+import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 
 import com.codahale.metrics.Timer;
 import org.apache.hadoop.conf.Configuration;
@@ -414,20 +414,22 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
   @Override
   protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    AbstractUpgradeDowngrade upgradeDowngrade = new SparkUpgradeDowngrade(metaClient, config, context);
+    UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
+        metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance());
     if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
       if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
         this.txnManager.beginTransaction();
         try {
           // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
           this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER));
-          new SparkUpgradeDowngrade(metaClient, config, context)
-              .run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
+          new UpgradeDowngrade(
+              metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance())
+              .run(HoodieTableVersion.current(), instantTime);
         } finally {
           this.txnManager.endTransaction();
         }
       } else {
-        upgradeDowngrade.run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
+        upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
       }
     }
     metaClient.validateTableProperties(config.getProps(), operationType);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java
deleted file mode 100644
index 7fb286e..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.HoodieSparkUtils;
-import org.apache.hudi.config.HoodieWriteConfig;
-
-/**
- * Upgrade handle to assist in upgrading hoodie table from version 1 to 2.
- */
-public class OneToTwoUpgradeHandler extends BaseOneToTwoUpgradeHandler {
-
-  @Override
-  String getPartitionColumns(HoodieWriteConfig config) {
-    return HoodieSparkUtils.getPartitionColumns(config.getProps());
-  }
-}
\ No newline at end of file
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
deleted file mode 100644
index 2e6064a..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.HoodieTable;
-
-/**
- * Downgrade handle to assist in downgrading hoodie table from version 1 to 0.
- */
-public class OneToZeroDowngradeHandler extends BaseOneToZeroDowngradeHandler {
-
-  @Override
-  HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
-    return HoodieSparkTable.create(config, context);
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java
deleted file mode 100644
index 83f29b5..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class SparkUpgradeDowngrade extends AbstractUpgradeDowngrade {
-
-  public SparkUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) {
-    super(metaClient, config, context);
-  }
-
-  @Override
-  public void run(HoodieTableMetaClient metaClient,
-                  HoodieTableVersion toVersion,
-                  HoodieWriteConfig config,
-                  HoodieEngineContext context,
-                  String instantTime) {
-    try {
-      new SparkUpgradeDowngrade(metaClient, config, context).run(toVersion, instantTime);
-    } catch (IOException e) {
-      throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e);
-    }
-  }
-
-  @Override
-  protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
-    if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) {
-      return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime);
-    } else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) {
-      return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime);
-    } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) {
-      return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime);
-    } else {
-      throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
-    }
-  }
-
-  @Override
-  protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
-    if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) {
-      return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime);
-    } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) {
-      return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime);
-    } else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) {
-      return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime);
-    } else {
-      throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
-    }
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java
new file mode 100644
index 0000000..f943b70
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Spark upgrade and downgrade helper.
+ */
+public class SparkUpgradeDowngradeHelper implements BaseUpgradeDowngradeHelper {
+
+  private static final SparkUpgradeDowngradeHelper SINGLETON_INSTANCE =
+      new SparkUpgradeDowngradeHelper();
+
+  private SparkUpgradeDowngradeHelper() {
+  }
+
+  public static SparkUpgradeDowngradeHelper getInstance() {
+    return SINGLETON_INSTANCE;
+  }
+
+  @Override
+  public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
+    return HoodieSparkTable.create(config, context);
+  }
+
+  @Override
+  public String getPartitionColumns(HoodieWriteConfig config) {
+    return HoodieSparkUtils.getPartitionColumns(config.getProps());
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
deleted file mode 100644
index 055d330..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.HoodieTable;
-
-/**
- * Downgrade handle to assist in downgrading hoodie table from version 2 to 1.
- */
-public class TwoToOneDowngradeHandler extends BaseTwoToOneDowngradeHandler {
-
-  @Override
-  HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
-    return HoodieSparkTable.create(config, context);
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
deleted file mode 100644
index 2cfb39c..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.upgrade;
-
-import org.apache.hudi.avro.model.HoodieRollbackRequest;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
-
-import java.util.List;
-
-/**
- * Upgrade handle to assist in upgrading hoodie table from version 0 to 1.
- */
-public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler {
-
-  @Override
-  HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
-    return HoodieSparkTable.create(config, context);
-  }
-
-  @Override
-  List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt,
-                                                     List<ListingBasedRollbackRequest> rollbackRequests) {
-    List<HoodieRollbackRequest> hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
-        .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
-    return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index f7cb22c..468444b 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -71,7 +71,8 @@ import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
+import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
+import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -885,7 +886,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());
 
     // Test downgrade by running the downgrader
-    new SparkUpgradeDowngrade(metaClient, writeConfig, context).run(metaClient, HoodieTableVersion.TWO, writeConfig, context, null);
+    new UpgradeDowngrade(metaClient, writeConfig, context, SparkUpgradeDowngradeHelper.getInstance())
+        .run(HoodieTableVersion.TWO, null);
 
     assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.TWO.versionCode());
     assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist");
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 792da4e..19ec4e6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -85,7 +85,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Unit tests {@link SparkUpgradeDowngrade}.
+ * Unit tests {@link UpgradeDowngrade}.
  */
 public class TestUpgradeDowngrade extends HoodieClientTestBase {
 
@@ -177,7 +177,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
     }
 
     // should re-create marker files for 2nd commit since its pending.
-    new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.ONE, cfg, context, null);
+    new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
+        .run(HoodieTableVersion.ONE, null);
 
     // assert marker files
     assertMarkerFilesForUpgrade(table, commitInstant, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
@@ -218,7 +219,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
     downgradeTableConfigsFromTwoToOne(cfg);
 
     // perform upgrade
-    new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.TWO, cfg, context, null);
+    new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
+        .run(HoodieTableVersion.TWO, null);
 
     // verify hoodie.table.version got upgraded
     metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath())
@@ -321,7 +323,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
     }
 
     // downgrade should be performed. all marker files should be deleted
-    new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, toVersion, cfg, context, null);
+    new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
+        .run(toVersion, null);
 
     // assert marker files
     assertMarkerFilesForDowngrade(table, commitInstant, toVersion == HoodieTableVersion.ONE);
@@ -557,7 +560,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
 
   private void createResidualFile() throws IOException {
     Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE);
-    Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE);
+    Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE);
 
     // Step1: Copy hoodie.properties to hoodie.properties.orig
     FileUtil.copy(metaClient.getFs(), propertyFile, metaClient.getFs(), updatedPropertyFile,