You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2024/01/03 07:21:41 UTC

(impala) 02/02: IMPALA-12375: Make DataSource Object persistent

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a2c2f118d2de1c08d5c7ba7f98f8e3490ea90d65
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Sat Dec 9 20:50:35 2023 -0800

    IMPALA-12375: Make DataSource Object persistent
    
    DataSource objects are saved in-memory cache in Catalog server. They are
    not persisted to the HMS. The objects are lost after Catalog server is
    restarted and user needs to recreate DataSource objects before creating
    new external DataSource tables.
    This patch makes DataSource Object persistent by saving DataSource
    objects as DataConnector objects with type "impalaDataSource" in HMS.
    Since HMS events for DataConnector are not handled, Catalog server
    has to refresh DataSource objects when the catalogd becomes active.
    Note that this feature is not supported for Apache Hive 3.1 and older
    version.
    
    Testing:
     - Added two end-to-end unit tests with restarting of Catalog server,
       and catalogd HA failover.
       These two tests are skipped when USE_APACHE_HIVE is set as true
       and Apache Hive version is 3.x or older version.
     - Passed all-build-options-ub2004.
     - Passed core test.
    
    Change-Id: I500a99142bb62ce873e693d573064ad4ffa153ab
    Reviewed-on: http://gerrit.cloudera.org:8080/20768
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Wenzhe Zhou <wz...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |  6 ++
 be/src/catalog/catalog.cc                          |  5 ++
 be/src/catalog/catalog.h                           |  6 ++
 .../org/apache/impala/compat/MetastoreShim.java    | 20 +++++
 .../org/apache/impala/compat/MetastoreShim.java    | 89 ++++++++++++++++++++++
 .../impala/analysis/CreateTableDataSrcStmt.java    |  4 +-
 .../java/org/apache/impala/catalog/Catalog.java    |  3 +-
 .../impala/catalog/CatalogServiceCatalog.java      | 41 ++++++++--
 .../org/apache/impala/catalog/DataSourceTable.java |  8 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 83 +++++++++++++++++++-
 .../java/org/apache/impala/service/JniCatalog.java |  7 ++
 tests/common/skip.py                               |  3 +
 tests/custom_cluster/test_ext_data_sources.py      | 85 +++++++++++++++++++++
 13 files changed, 343 insertions(+), 17 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 6544ba6b2..43f77e970 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -592,6 +592,12 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
         if (!status.ok()) {
           LOG(ERROR) << "Failed to reset metadata triggered by catalogd failover.";
         }
+      } else {
+        // Refresh DataSource objects when the catalogd becomes active.
+        Status status = catalog_->RefreshDataSources();
+        if (!status.ok()) {
+          LOG(ERROR) << "Failed to refresh data sources triggered by catalogd failover.";
+        }
       }
       // Signal the catalog update gathering thread to start.
       topic_updates_ready_ = false;
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index 3efc4b464..698091787 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -70,6 +70,7 @@ Catalog::Catalog() {
     {"getPartitionStats", "([B)[B", &get_partition_stats_id_},
     {"updateTableUsage", "([B)V", &update_table_usage_id_},
     {"regenerateServiceId", "()V", &regenerate_service_id_},
+    {"refreshDataSources", "()V", &refresh_data_sources_},
   };
 
   JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -215,3 +216,7 @@ void Catalog::RegenerateServiceId() {
   jni_env->CallVoidMethod(catalog_, regenerate_service_id_);
   ABORT_IF_EXC(jni_env);
 }
+
+Status Catalog::RefreshDataSources() {
+  return JniUtil::CallJniMethod(catalog_, refresh_data_sources_);
+}
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 242383e1c..c94f09a6b 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -141,6 +141,11 @@ class Catalog {
   /// The function should be called when the CatalogD becomes active.
   void RegenerateServiceId();
 
+  /// Refresh the data sources from metadata store.
+  /// Returns OK if the refreshing was successful, otherwise a Status object with
+  /// information on the error will be returned.
+  Status RefreshDataSources();
+
  private:
   jobject catalog_;  // instance of org.apache.impala.service.JniCatalog
   jmethodID update_metastore_id_;  // JniCatalog.updateMetaastore()
@@ -164,6 +169,7 @@ class Catalog {
   jmethodID catalog_ctor_;
   jmethodID update_table_usage_id_;
   jmethodID regenerate_service_id_; // JniCatalog.regenerateServiceId()
+  jmethodID refresh_data_sources_; // JniCatalog.refreshDataSources()
 };
 
 }
diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index cd6a97a81..038f6eb1e 100644
--- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -65,6 +66,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
@@ -946,4 +948,22 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
     //  materialized view is outdated for rewriting, so set the flag to "Unknown"
     formatOutput("Outdated for Rewriting:", "Unknown", tableInfo);
   }
+
+  public static void createDataSource(IMetaStoreClient client, DataSource dataSource)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    // noop for unsupported operation IMetaStoreClient.createDataConnector().
+  }
+
+  public static void dropDataSource(IMetaStoreClient client, String name,
+      boolean ifExists) throws NoSuchObjectException, InvalidOperationException,
+          MetaException, TException {
+    // noop for unsupported operation IMetaStoreClient.dropDataConnector().
+  }
+
+  public static Map<String, DataSource> loadAllDataSources(IMetaStoreClient client)
+      throws MetaException, TException {
+    // Unsupported operation IMetaStoreClient.getAllDataConnectorNames() and
+    // IMetaStoreClient.getDataConnector().
+    return null;
+  }
 }
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 504e13a9c..d3f33e8cb 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -27,6 +27,7 @@ import static org.apache.impala.util.HiveMetadataFormatUtils.formatOutput;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 
 import java.net.InetAddress;
@@ -46,9 +47,11 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
+import org.apache.hadoop.hive.metastore.api.DataConnector;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FireEventRequest;
 import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
@@ -90,6 +93,7 @@ import org.apache.impala.analysis.TableName;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.CompactionInfoLoader;
+import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.DatabaseNotFoundException;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.HdfsPartition;
@@ -131,6 +135,13 @@ import org.slf4j.LoggerFactory;
 public class MetastoreShim extends Hive3MetastoreShimBase {
   private static final Logger LOG = LoggerFactory.getLogger(MetastoreShim.class);
 
+  // Impala DataSource object is saved in HMS as DataConnector with type
+  // as 'impalaDataSource'
+  private static final String HMS_DATA_CONNECTOR_TYPE = "impalaDataSource";
+  private static final String HMS_DATA_CONNECTOR_DESC = "Impala DataSource Object";
+  private static final String HMS_DATA_CONNECTOR_PARAM_KEY_CLASS_NAME = "className";
+  private static final String HMS_DATA_CONNECTOR_PARAM_KEY_API_VERSION = "apiVersion";
+
   /**
    * Wrapper around IMetaStoreClient.alter_table with validWriteIds as a param.
    */
@@ -962,4 +973,82 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
       tableInfo.append(metaDataTable.renderTable(isOutputPadded));
     }
   }
+
+  /**
+   * Wrapper around IMetaStoreClient.createDataConnector().
+   */
+  public static void createDataSource(IMetaStoreClient client, DataSource dataSource)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    DataConnector connector = MetastoreShim.dataSourceToDataConnector(dataSource);
+    Preconditions.checkNotNull(connector);
+    client.createDataConnector(connector);
+  }
+
+  /**
+   * Wrapper around IMetaStoreClient.dropDataConnector().
+   */
+  public static void dropDataSource(IMetaStoreClient client, String name,
+      boolean ifExists) throws NoSuchObjectException, InvalidOperationException,
+          MetaException, TException {
+    // Set 'checkReferences' as false, e.g. drop DataSource object without checking its
+    // reference since the properties of DataSource object are copied to the table
+    // property of the referenced DataSource tables.
+    client.dropDataConnector(
+        name, /* ifNotExists */ !ifExists, /* checkReferences */ false);
+  }
+
+  /**
+   * Wrapper around IMetaStoreClient.getAllDataConnectorNames() and
+   * IMetaStoreClient.getDataConnector().
+   */
+  public static Map<String, DataSource> loadAllDataSources(IMetaStoreClient client)
+      throws MetaException, TException {
+    Map<String, DataSource> newDataSrcs = new HashMap<String, DataSource>();
+    // Load DataSource objects from HMS DataConnector objects.
+    List<String> allConnectorNames = client.getAllDataConnectorNames();
+    for (String connectorName: allConnectorNames) {
+      DataConnector connector = client.getDataConnector(connectorName);
+      if (connector != null) {
+        DataSource dataSrc = MetastoreShim.dataConnectorToDataSource(connector);
+        if (dataSrc != null) newDataSrcs.put(connectorName, dataSrc);
+      }
+    }
+    return newDataSrcs;
+  }
+
+  /**
+   * Convert DataSource object to DataConnector object.
+   */
+  private static DataConnector dataSourceToDataConnector(DataSource dataSource) {
+    DataConnector connector = new DataConnector(
+        dataSource.getName(), HMS_DATA_CONNECTOR_TYPE, dataSource.getLocation());
+    connector.setDescription(HMS_DATA_CONNECTOR_DESC);
+    connector.putToParameters(
+        HMS_DATA_CONNECTOR_PARAM_KEY_CLASS_NAME, dataSource.getClassName());
+    connector.putToParameters(
+        HMS_DATA_CONNECTOR_PARAM_KEY_API_VERSION, dataSource.getApiVersion());
+    return connector;
+  }
+
+  /**
+   * Convert DataConnector object to DataSource object.
+   */
+  private static DataSource dataConnectorToDataSource(DataConnector connector) {
+    if (!connector.isSetName() || !connector.isSetType() || !connector.isSetUrl()
+        || !connector.isSetDescription() || connector.getParametersSize() == 0
+        || !connector.getType().equalsIgnoreCase(HMS_DATA_CONNECTOR_TYPE)) {
+      return null;
+    }
+    String name = connector.getName();
+    String location = connector.getUrl();
+    String className =
+        connector.getParameters().get(HMS_DATA_CONNECTOR_PARAM_KEY_CLASS_NAME);
+    String apiVersion =
+        connector.getParameters().get(HMS_DATA_CONNECTOR_PARAM_KEY_API_VERSION);
+    if (!Strings.isNullOrEmpty(name) && !Strings.isNullOrEmpty(location) &&
+        !Strings.isNullOrEmpty(className) && !Strings.isNullOrEmpty(apiVersion)) {
+      return new DataSource(name, location, className, apiVersion);
+    }
+    return null;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
index 00fd4b69f..13eda5a7b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
@@ -63,8 +63,8 @@ public class CreateTableDataSrcStmt extends CreateTableStmt {
       }
     }
     // Add table properties from the DataSource catalog object now that we have access
-    // to the catalog. These are stored in the table metadata because DataSource catalog
-    // objects are not currently persisted.
+    // to the catalog. These are stored in the table metadata so that the table could
+    // be scanned without the DataSource catalog object.
     String location = dataSource.getLocation();
     getTblProperties().put(TBL_PROP_LOCATION, location);
     getTblProperties().put(TBL_PROP_CLASS, dataSource.getClassName());
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index be480ec97..a0b22593a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -271,8 +271,7 @@ public abstract class Catalog implements AutoCloseable {
   }
 
   /**
-   * Adds a data source to the in-memory map of data sources. It is not
-   * persisted to the metastore.
+   * Adds a data source to the in-memory map of data sources.
    * @return true if this item was added or false if the existing value was preserved.
    */
   public boolean addDataSource(DataSource dataSource) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 59c24dda5..6e6fae5ae 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -1897,6 +1897,38 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  /**
+   * Loads DataSource objects into the catalog and assigns new versions to all the
+   * loaded DataSource objects.
+   */
+  public void refreshDataSources() throws TException {
+    Map<String, DataSource> newDataSrcs = null;
+    try (MetaStoreClient msClient = getMetaStoreClient()) {
+      // Load all DataSource objects from HMS DataConnector objects.
+      newDataSrcs = MetastoreShim.loadAllDataSources(msClient.getHiveClient());
+      if (newDataSrcs == null) return;
+    }
+    Set<String> oldDataSrcNames = dataSources_.keySet();
+    Set<String> newDataSrcNames = newDataSrcs.keySet();
+    oldDataSrcNames.removeAll(newDataSrcNames);
+    int removedDataSrcNum = 0;
+    for (String dataSrcName: oldDataSrcNames) {
+      // Add removed DataSource objects to deleteLog_.
+      DataSource dataSrc = dataSources_.remove(dataSrcName);
+      if (dataSrc != null) {
+        dataSrc.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(dataSrc.toTCatalogObject());
+        removedDataSrcNum++;
+      }
+    }
+    for (DataSource dataSrc: newDataSrcs.values()) {
+      dataSrc.setCatalogVersion(incrementAndGetCatalogVersion());
+      dataSources_.add(dataSrc);
+    }
+    LOG.info("Finished refreshing DataSources. Added {}. Removed {}.",
+        newDataSrcs.size(), removedDataSrcNum);
+  }
+
   /**
    * Load the list of TableMeta from Hive. If pull_table_types_and_comments=true, the list
    * will contain the table types and comments. Otherwise, we just fetch the table names
@@ -2078,13 +2110,12 @@ public class CatalogServiceCatalog extends Catalog {
     // reset operation itself and to unblock impalads by making the catalog version >
     // INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog()
     ++catalogVersion_;
-    // Assign new versions to all the loaded data sources.
-    for (DataSource dataSource: getDataSources()) {
-      dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
-    }
 
-    // Update db and table metadata
+    // Update data source, db and table metadata
     try {
+      // Refresh DataSource objects from HMS and assign new versions.
+      refreshDataSources();
+
       // Not all Java UDFs are persisted to the metastore. The ones which aren't
       // should be restored once the catalog has been invalidated.
       Map<String, Db> oldDbCache = dbCache_.get();
diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
index 3087a7d30..ecc6a36e7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
@@ -40,11 +40,9 @@ import com.google.common.base.Preconditions;
 
 /**
  * All data source properties are stored as table properties (persisted in the
- * metastore) because the DataSource catalog object is not persisted so the
- * DataSource catalog object will not exist if the catalog server is restarted,
- * but the table does not need the DataSource catalog object in order to scan
- * the table. Tables that contain the TBL_PROP_DATA_SRC_NAME table parameter are
- * assumed to be backed by an external data source.
+ * metastore) so that the table does not need the DataSource catalog object in
+ * order to scan the table. Tables that contain the TBL_PROP_DATA_SRC_NAME table
+ * parameter are assumed to be backed by an external data source.
  */
 public class DataSourceTable extends Table implements FeDataSourceTable {
   private final static Logger LOG = LoggerFactory.getLogger(DataSourceTable.class);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 75fac245a..cd4086584 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -2461,9 +2462,17 @@ public class CatalogOpExecutor {
 
   private void createDataSource(TCreateDataSourceParams params, TDdlExecResponse resp)
       throws ImpalaException {
-    if (LOG.isTraceEnabled()) { LOG.trace("Adding DATA SOURCE: " + params.toString()); }
+    Preconditions.checkNotNull(params);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Adding DATA SOURCE: " + params.toString());
+    }
     DataSource dataSource = DataSource.fromThrift(params.getData_source());
-    DataSource existingDataSource = catalog_.getDataSource(dataSource.getName());
+    Preconditions.checkNotNull(dataSource);
+    String dataSrcName = dataSource.getName();
+    Preconditions.checkState(!Strings.isNullOrEmpty(dataSrcName),
+        "Null or empty DataSource name passed as argument to " +
+        "CatalogOpExecutor.createDataSource");
+    DataSource existingDataSource = catalog_.getDataSource(dataSrcName);
     if (existingDataSource != null) {
       if (!params.if_not_exists) {
         throw new ImpalaRuntimeException("Data source " + dataSource.getName() +
@@ -2474,6 +2483,9 @@ public class CatalogOpExecutor {
       resp.result.setVersion(existingDataSource.getCatalogVersion());
       return;
     }
+    // Create DataSource object in HMS.
+    addDataSourceToHms(dataSource, params.if_not_exists);
+    // Add DataSource object to memory cache,
     catalog_.addDataSource(dataSource);
     resp.result.addToUpdated_catalog_objects(dataSource.toTCatalogObject());
     resp.result.setVersion(dataSource.getCatalogVersion());
@@ -2483,7 +2495,11 @@ public class CatalogOpExecutor {
   private void dropDataSource(TDropDataSourceParams params, TDdlExecResponse resp)
       throws ImpalaException {
     if (LOG.isTraceEnabled()) LOG.trace("Drop DATA SOURCE: " + params.toString());
-    DataSource dataSource = catalog_.removeDataSource(params.getData_source());
+    String dataSrcName = params.getData_source();
+    Preconditions.checkState(!Strings.isNullOrEmpty(dataSrcName),
+        "Null or empty DataSource name passed as argument to " +
+        "CatalogOpExecutor.dropDataSource");
+    DataSource dataSource = catalog_.removeDataSource(dataSrcName);
     if (dataSource == null) {
       if (!params.if_exists) {
         throw new ImpalaRuntimeException("Data source " + params.getData_source() +
@@ -2494,6 +2510,8 @@ public class CatalogOpExecutor {
       resp.result.setVersion(catalog_.getCatalogVersion());
       return;
     }
+    // Drop DataSource object from HMS.
+    dropDataSourceFromHms(dataSrcName, /* ifExists */ false);
     resp.result.addToRemoved_catalog_objects(dataSource.toTCatalogObject());
     resp.result.setVersion(dataSource.getCatalogVersion());
     addSummary(resp, "Data source has been dropped.");
@@ -6196,6 +6214,65 @@ public class CatalogOpExecutor {
     return true;
   }
 
+  /**
+   * Creates a new DataSource in the Hive metastore. Returns true if successful
+   * and false if the call fails and ifNotExists is true.
+   * Note that DataSource object is saved as DataConnector object with type
+   * "impalaDataSource"in HMS.
+   */
+  private boolean addDataSourceToHms(DataSource dataSource, boolean ifNotExists)
+      throws ImpalaRuntimeException{
+    getMetastoreDdlLock().lock();
+    try {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        MetastoreShim.createDataSource(msClient.getHiveClient(), dataSource);
+      } catch(AlreadyExistsException e) {
+        if (!ifNotExists) {
+          throw new ImpalaRuntimeException(
+              String.format(HMS_RPC_ERROR_FORMAT_STR, "createDataConnector"), e);
+        }
+        return false;
+      } catch (TException e) {
+        LOG.error("Error executing createDataConnector() metastore call: " +
+            dataSource.getName(), e);
+        throw new ImpalaRuntimeException(
+            String.format(HMS_RPC_ERROR_FORMAT_STR, "createDataConnector"), e);
+      }
+    } finally {
+      getMetastoreDdlLock().unlock();
+    }
+    return true;
+  }
+
+  /**
+   * Drops the DataSource with given name from Hive metastore. Returns true if
+   * successful and false if the DataSource does not exist and ifExists is true.
+   */
+  private boolean dropDataSourceFromHms(String name, boolean ifExists)
+      throws ImpalaRuntimeException {
+    getMetastoreDdlLock().lock();
+    try {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        try {
+          MetastoreShim.dropDataSource(msClient.getHiveClient(), name, ifExists);
+        } catch (NoSuchObjectException e) {
+          if (!ifExists) {
+            throw new ImpalaRuntimeException(
+                String.format(HMS_RPC_ERROR_FORMAT_STR, "dropDataConnector"), e);
+          }
+          return false;
+        } catch (TException e) {
+          LOG.error("Error executing dropDataConnector HMS call: " + name, e);
+          throw new ImpalaRuntimeException(
+              String.format(HMS_RPC_ERROR_FORMAT_STR, "dropDataDataConnector"), e);
+        }
+      }
+    } finally {
+      getMetastoreDdlLock().unlock();
+    }
+    return true;
+  }
+
   /**
    * Updates the database object in the metastore.
    */
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index fe300fa48..1adfd7efa 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -555,4 +555,11 @@ public class JniCatalog {
           return response;
         });
   }
+
+  /**
+   * Refresh data sources from metadata store.
+   */
+  public void refreshDataSources() throws TException {
+    catalog_.refreshDataSources();
+  }
 }
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 51dcdcf6e..31aa4a272 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -257,3 +257,6 @@ class SkipIfOS:
 class SkipIfApacheHive():
   feature_not_supported = pytest.mark.skipif(IS_APACHE_HIVE,
       reason="Apache Hive does not support this feature")
+  data_connector_not_supported = pytest.mark.skipif(
+      IS_APACHE_HIVE and HIVE_MAJOR_VERSION <= 3,
+      reason="Apache Hive 3.1 or older version do not support DataConnector")
diff --git a/tests/custom_cluster/test_ext_data_sources.py b/tests/custom_cluster/test_ext_data_sources.py
index f12fd1b56..ffa907d06 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -21,6 +21,8 @@ import os
 import subprocess
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.environ import build_flavor_timeout
+from tests.common.skip import SkipIfApacheHive
 
 
 class TestExtDataSources(CustomClusterTestSuite):
@@ -60,6 +62,89 @@ class TestExtDataSources(CustomClusterTestSuite):
     """Run test with batch size less than default size 1024"""
     self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database)
 
+  @SkipIfApacheHive.data_connector_not_supported
+  @pytest.mark.execute_serially
+  def test_restart_catalogd(self, vector, unique_database):
+    """Restart Catalog server after creating a data source. Verify that the data source
+    object is persistent across restarting of Catalog server."""
+    DROP_DATA_SOURCE_QUERY = "DROP DATA SOURCE IF EXISTS test_restart_persistent"
+    CREATE_DATA_SOURCE_QUERY = "CREATE DATA SOURCE test_restart_persistent " \
+        "LOCATION '/test-warehouse/data-sources/jdbc-data-source.jar' " \
+        "CLASS 'org.apache.impala.extdatasource.jdbc.PersistentJdbcDataSource' " \
+        "API_VERSION 'V1'"
+    SHOW_DATA_SOURCE_QUERY = "SHOW DATA SOURCES LIKE 'test_restart_*'"
+
+    # Create a data source and verify that the object is created successfully.
+    self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
+    self.execute_query_expect_success(self.client, CREATE_DATA_SOURCE_QUERY)
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "PersistentJdbcDataSource" in result.get_data()
+
+    # Restart Catalog server.
+    self.cluster.catalogd.restart()
+    wait_time_s = build_flavor_timeout(90, slow_build_timeout=180)
+    self.cluster.statestored.service.wait_for_metric_value('statestore.live-backends',
+        expected_value=4, timeout=wait_time_s)
+
+    # Verify that the data source object is still available after restarting Catalog
+    # server.
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "PersistentJdbcDataSource" in result.get_data()
+    # Remove the data source
+    self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "PersistentJdbcDataSource" not in result.get_data()
+
+  @SkipIfApacheHive.data_connector_not_supported
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
+                     "--statestore_heartbeat_frequency_ms=1000",
+    start_args="--enable_catalogd_ha")
+  def test_catalogd_ha_failover(self):
+    """The test case for cluster started with catalogd HA enabled."""
+    DROP_DATA_SOURCE_QUERY = "DROP DATA SOURCE IF EXISTS test_failover_persistent"
+    CREATE_DATA_SOURCE_QUERY = "CREATE DATA SOURCE test_failover_persistent " \
+        "LOCATION '/test-warehouse/data-sources/jdbc-data-source.jar' " \
+        "CLASS 'org.apache.impala.extdatasource.jdbc.FailoverInSyncJdbcDataSource' " \
+        "API_VERSION 'V1'"
+    SHOW_DATA_SOURCE_QUERY = "SHOW DATA SOURCES LIKE 'test_failover_*'"
+    # Verify two catalogd instances are created with one as active.
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 2)
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_2 = catalogds[1].service
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
+
+    # Create a data source and verify that the object is created successfully.
+    self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
+    self.execute_query_expect_success(self.client, CREATE_DATA_SOURCE_QUERY)
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "FailoverInSyncJdbcDataSource" in result.get_data()
+
+    # Kill active catalogd
+    catalogds[0].kill()
+    # Wait for long enough for the statestore to detect the failure of active catalogd
+    # and assign active role to standby catalogd.
+    catalogd_service_2.wait_for_metric_value(
+        "catalog-server.active-status", expected_value=True, timeout=30)
+    assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
+
+    # Verify that the data source object is available in the catalogd of HA pair.
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "FailoverInSyncJdbcDataSource" in result.get_data()
+    # Remove the data source
+    self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "FailoverInSyncJdbcDataSource" not in result.get_data()
+
 
 class TestMySqlExtJdbcTables(CustomClusterTestSuite):
   """Impala query tests for external jdbc tables on MySQL server."""