You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2024/01/03 07:21:39 UTC

(impala) branch master updated (4d9393643 -> a2c2f118d)

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 4d9393643 IMPALA-12658: UPDATE Iceberg table FROM view throws IllegalStateException
     new 9a132bc43 IMPALA-12380: Securing dbcp.password for JDBC external data source
     new a2c2f118d IMPALA-12375: Make DataSource Object persistent

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/catalog/catalog-server.cc                   |   6 +
 be/src/catalog/catalog.cc                          |   5 +
 be/src/catalog/catalog.h                           |   6 +
 .../org/apache/impala/compat/MetastoreShim.java    |  20 ++++
 .../org/apache/impala/compat/MetastoreShim.java    |  89 +++++++++++++++
 .../impala/analysis/CreateTableDataSrcStmt.java    |   4 +-
 .../java/org/apache/impala/catalog/Catalog.java    |   3 +-
 .../impala/catalog/CatalogServiceCatalog.java      |  41 ++++++-
 .../org/apache/impala/catalog/DataSourceTable.java |   8 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  83 +++++++++++++-
 .../java/org/apache/impala/service/JniCatalog.java |   7 ++
 .../extdatasource/jdbc/conf/JdbcStorageConfig.java |   7 +-
 .../jdbc/conf/JdbcStorageConfigManager.java        |  57 ++++++++-
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java      |   6 +
 testdata/bin/copy-ext-data-sources.sh              |  17 +++
 .../QueryTest/jdbc-data-source-with-keystore.test  | 127 +++++++++++++++++++++
 tests/common/impala_test_suite.py                  |   4 +-
 tests/common/skip.py                               |   3 +
 tests/custom_cluster/test_ext_data_sources.py      |  85 ++++++++++++++
 tests/query_test/test_ext_data_sources.py          |   5 +
 tests/util/filesystem_utils.py                     |  11 ++
 21 files changed, 572 insertions(+), 22 deletions(-)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source-with-keystore.test


(impala) 02/02: IMPALA-12375: Make DataSource Object persistent

Posted by wz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a2c2f118d2de1c08d5c7ba7f98f8e3490ea90d65
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Sat Dec 9 20:50:35 2023 -0800

    IMPALA-12375: Make DataSource Object persistent
    
    DataSource objects are saved in-memory cache in Catalog server. They are
    not persisted to the HMS. The objects are lost after Catalog server is
    restarted and user needs to recreate DataSource objects before creating
    new external DataSource tables.
    This patch makes DataSource Object persistent by saving DataSource
    objects as DataConnector objects with type "impalaDataSource" in HMS.
    Since HMS events for DataConnector are not handled, Catalog server
    has to refresh DataSource objects when the catalogd becomes active.
    Note that this feature is not supported for Apache Hive 3.1 and older
    version.
    
    Testing:
     - Added two end-to-end unit tests with restarting of Catalog server,
       and catalogd HA failover.
       These two tests are skipped when USE_APACHE_HIVE is set as true
       and Apache Hive version is 3.x or older version.
     - Passed all-build-options-ub2004.
     - Passed core test.
    
    Change-Id: I500a99142bb62ce873e693d573064ad4ffa153ab
    Reviewed-on: http://gerrit.cloudera.org:8080/20768
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Wenzhe Zhou <wz...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |  6 ++
 be/src/catalog/catalog.cc                          |  5 ++
 be/src/catalog/catalog.h                           |  6 ++
 .../org/apache/impala/compat/MetastoreShim.java    | 20 +++++
 .../org/apache/impala/compat/MetastoreShim.java    | 89 ++++++++++++++++++++++
 .../impala/analysis/CreateTableDataSrcStmt.java    |  4 +-
 .../java/org/apache/impala/catalog/Catalog.java    |  3 +-
 .../impala/catalog/CatalogServiceCatalog.java      | 41 ++++++++--
 .../org/apache/impala/catalog/DataSourceTable.java |  8 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 83 +++++++++++++++++++-
 .../java/org/apache/impala/service/JniCatalog.java |  7 ++
 tests/common/skip.py                               |  3 +
 tests/custom_cluster/test_ext_data_sources.py      | 85 +++++++++++++++++++++
 13 files changed, 343 insertions(+), 17 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 6544ba6b2..43f77e970 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -592,6 +592,12 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
         if (!status.ok()) {
           LOG(ERROR) << "Failed to reset metadata triggered by catalogd failover.";
         }
+      } else {
+        // Refresh DataSource objects when the catalogd becomes active.
+        Status status = catalog_->RefreshDataSources();
+        if (!status.ok()) {
+          LOG(ERROR) << "Failed to refresh data sources triggered by catalogd failover.";
+        }
       }
       // Signal the catalog update gathering thread to start.
       topic_updates_ready_ = false;
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index 3efc4b464..698091787 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -70,6 +70,7 @@ Catalog::Catalog() {
     {"getPartitionStats", "([B)[B", &get_partition_stats_id_},
     {"updateTableUsage", "([B)V", &update_table_usage_id_},
     {"regenerateServiceId", "()V", &regenerate_service_id_},
+    {"refreshDataSources", "()V", &refresh_data_sources_},
   };
 
   JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -215,3 +216,7 @@ void Catalog::RegenerateServiceId() {
   jni_env->CallVoidMethod(catalog_, regenerate_service_id_);
   ABORT_IF_EXC(jni_env);
 }
+
+Status Catalog::RefreshDataSources() {
+  return JniUtil::CallJniMethod(catalog_, refresh_data_sources_);
+}
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 242383e1c..c94f09a6b 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -141,6 +141,11 @@ class Catalog {
   /// The function should be called when the CatalogD becomes active.
   void RegenerateServiceId();
 
+  /// Refresh the data sources from metadata store.
+  /// Returns OK if the refreshing was successful, otherwise a Status object with
+  /// information on the error will be returned.
+  Status RefreshDataSources();
+
  private:
   jobject catalog_;  // instance of org.apache.impala.service.JniCatalog
   jmethodID update_metastore_id_;  // JniCatalog.updateMetaastore()
@@ -164,6 +169,7 @@ class Catalog {
   jmethodID catalog_ctor_;
   jmethodID update_table_usage_id_;
   jmethodID regenerate_service_id_; // JniCatalog.regenerateServiceId()
+  jmethodID refresh_data_sources_; // JniCatalog.refreshDataSources()
 };
 
 }
diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index cd6a97a81..038f6eb1e 100644
--- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -65,6 +66,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
@@ -946,4 +948,22 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
     //  materialized view is outdated for rewriting, so set the flag to "Unknown"
     formatOutput("Outdated for Rewriting:", "Unknown", tableInfo);
   }
+
+  public static void createDataSource(IMetaStoreClient client, DataSource dataSource)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    // noop for unsupported operation IMetaStoreClient.createDataConnector().
+  }
+
+  public static void dropDataSource(IMetaStoreClient client, String name,
+      boolean ifExists) throws NoSuchObjectException, InvalidOperationException,
+          MetaException, TException {
+    // noop for unsupported operation IMetaStoreClient.dropDataConnector().
+  }
+
+  public static Map<String, DataSource> loadAllDataSources(IMetaStoreClient client)
+      throws MetaException, TException {
+    // Unsupported operation IMetaStoreClient.getAllDataConnectorNames() and
+    // IMetaStoreClient.getDataConnector().
+    return null;
+  }
 }
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 504e13a9c..d3f33e8cb 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -27,6 +27,7 @@ import static org.apache.impala.util.HiveMetadataFormatUtils.formatOutput;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 
 import java.net.InetAddress;
@@ -46,9 +47,11 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
+import org.apache.hadoop.hive.metastore.api.DataConnector;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FireEventRequest;
 import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
@@ -90,6 +93,7 @@ import org.apache.impala.analysis.TableName;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.CompactionInfoLoader;
+import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.DatabaseNotFoundException;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.HdfsPartition;
@@ -131,6 +135,13 @@ import org.slf4j.LoggerFactory;
 public class MetastoreShim extends Hive3MetastoreShimBase {
   private static final Logger LOG = LoggerFactory.getLogger(MetastoreShim.class);
 
+  // Impala DataSource object is saved in HMS as DataConnector with type
+  // as 'impalaDataSource'
+  private static final String HMS_DATA_CONNECTOR_TYPE = "impalaDataSource";
+  private static final String HMS_DATA_CONNECTOR_DESC = "Impala DataSource Object";
+  private static final String HMS_DATA_CONNECTOR_PARAM_KEY_CLASS_NAME = "className";
+  private static final String HMS_DATA_CONNECTOR_PARAM_KEY_API_VERSION = "apiVersion";
+
   /**
    * Wrapper around IMetaStoreClient.alter_table with validWriteIds as a param.
    */
@@ -962,4 +973,82 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
       tableInfo.append(metaDataTable.renderTable(isOutputPadded));
     }
   }
+
+  /**
+   * Wrapper around IMetaStoreClient.createDataConnector().
+   */
+  public static void createDataSource(IMetaStoreClient client, DataSource dataSource)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    DataConnector connector = MetastoreShim.dataSourceToDataConnector(dataSource);
+    Preconditions.checkNotNull(connector);
+    client.createDataConnector(connector);
+  }
+
+  /**
+   * Wrapper around IMetaStoreClient.dropDataConnector().
+   */
+  public static void dropDataSource(IMetaStoreClient client, String name,
+      boolean ifExists) throws NoSuchObjectException, InvalidOperationException,
+          MetaException, TException {
+    // Set 'checkReferences' as false, e.g. drop DataSource object without checking its
+    // reference since the properties of DataSource object are copied to the table
+    // property of the referenced DataSource tables.
+    client.dropDataConnector(
+        name, /* ifNotExists */ !ifExists, /* checkReferences */ false);
+  }
+
+  /**
+   * Wrapper around IMetaStoreClient.getAllDataConnectorNames() and
+   * IMetaStoreClient.getDataConnector().
+   */
+  public static Map<String, DataSource> loadAllDataSources(IMetaStoreClient client)
+      throws MetaException, TException {
+    Map<String, DataSource> newDataSrcs = new HashMap<String, DataSource>();
+    // Load DataSource objects from HMS DataConnector objects.
+    List<String> allConnectorNames = client.getAllDataConnectorNames();
+    for (String connectorName: allConnectorNames) {
+      DataConnector connector = client.getDataConnector(connectorName);
+      if (connector != null) {
+        DataSource dataSrc = MetastoreShim.dataConnectorToDataSource(connector);
+        if (dataSrc != null) newDataSrcs.put(connectorName, dataSrc);
+      }
+    }
+    return newDataSrcs;
+  }
+
+  /**
+   * Convert DataSource object to DataConnector object.
+   */
+  private static DataConnector dataSourceToDataConnector(DataSource dataSource) {
+    DataConnector connector = new DataConnector(
+        dataSource.getName(), HMS_DATA_CONNECTOR_TYPE, dataSource.getLocation());
+    connector.setDescription(HMS_DATA_CONNECTOR_DESC);
+    connector.putToParameters(
+        HMS_DATA_CONNECTOR_PARAM_KEY_CLASS_NAME, dataSource.getClassName());
+    connector.putToParameters(
+        HMS_DATA_CONNECTOR_PARAM_KEY_API_VERSION, dataSource.getApiVersion());
+    return connector;
+  }
+
+  /**
+   * Convert DataConnector object to DataSource object.
+   */
+  private static DataSource dataConnectorToDataSource(DataConnector connector) {
+    if (!connector.isSetName() || !connector.isSetType() || !connector.isSetUrl()
+        || !connector.isSetDescription() || connector.getParametersSize() == 0
+        || !connector.getType().equalsIgnoreCase(HMS_DATA_CONNECTOR_TYPE)) {
+      return null;
+    }
+    String name = connector.getName();
+    String location = connector.getUrl();
+    String className =
+        connector.getParameters().get(HMS_DATA_CONNECTOR_PARAM_KEY_CLASS_NAME);
+    String apiVersion =
+        connector.getParameters().get(HMS_DATA_CONNECTOR_PARAM_KEY_API_VERSION);
+    if (!Strings.isNullOrEmpty(name) && !Strings.isNullOrEmpty(location) &&
+        !Strings.isNullOrEmpty(className) && !Strings.isNullOrEmpty(apiVersion)) {
+      return new DataSource(name, location, className, apiVersion);
+    }
+    return null;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
index 00fd4b69f..13eda5a7b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
@@ -63,8 +63,8 @@ public class CreateTableDataSrcStmt extends CreateTableStmt {
       }
     }
     // Add table properties from the DataSource catalog object now that we have access
-    // to the catalog. These are stored in the table metadata because DataSource catalog
-    // objects are not currently persisted.
+    // to the catalog. These are stored in the table metadata so that the table could
+    // be scanned without the DataSource catalog object.
     String location = dataSource.getLocation();
     getTblProperties().put(TBL_PROP_LOCATION, location);
     getTblProperties().put(TBL_PROP_CLASS, dataSource.getClassName());
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index be480ec97..a0b22593a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -271,8 +271,7 @@ public abstract class Catalog implements AutoCloseable {
   }
 
   /**
-   * Adds a data source to the in-memory map of data sources. It is not
-   * persisted to the metastore.
+   * Adds a data source to the in-memory map of data sources.
    * @return true if this item was added or false if the existing value was preserved.
    */
   public boolean addDataSource(DataSource dataSource) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 59c24dda5..6e6fae5ae 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -1897,6 +1897,38 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  /**
+   * Loads DataSource objects into the catalog and assigns new versions to all the
+   * loaded DataSource objects.
+   */
+  public void refreshDataSources() throws TException {
+    Map<String, DataSource> newDataSrcs = null;
+    try (MetaStoreClient msClient = getMetaStoreClient()) {
+      // Load all DataSource objects from HMS DataConnector objects.
+      newDataSrcs = MetastoreShim.loadAllDataSources(msClient.getHiveClient());
+      if (newDataSrcs == null) return;
+    }
+    Set<String> oldDataSrcNames = dataSources_.keySet();
+    Set<String> newDataSrcNames = newDataSrcs.keySet();
+    oldDataSrcNames.removeAll(newDataSrcNames);
+    int removedDataSrcNum = 0;
+    for (String dataSrcName: oldDataSrcNames) {
+      // Add removed DataSource objects to deleteLog_.
+      DataSource dataSrc = dataSources_.remove(dataSrcName);
+      if (dataSrc != null) {
+        dataSrc.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(dataSrc.toTCatalogObject());
+        removedDataSrcNum++;
+      }
+    }
+    for (DataSource dataSrc: newDataSrcs.values()) {
+      dataSrc.setCatalogVersion(incrementAndGetCatalogVersion());
+      dataSources_.add(dataSrc);
+    }
+    LOG.info("Finished refreshing DataSources. Added {}. Removed {}.",
+        newDataSrcs.size(), removedDataSrcNum);
+  }
+
   /**
    * Load the list of TableMeta from Hive. If pull_table_types_and_comments=true, the list
    * will contain the table types and comments. Otherwise, we just fetch the table names
@@ -2078,13 +2110,12 @@ public class CatalogServiceCatalog extends Catalog {
     // reset operation itself and to unblock impalads by making the catalog version >
     // INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog()
     ++catalogVersion_;
-    // Assign new versions to all the loaded data sources.
-    for (DataSource dataSource: getDataSources()) {
-      dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
-    }
 
-    // Update db and table metadata
+    // Update data source, db and table metadata
     try {
+      // Refresh DataSource objects from HMS and assign new versions.
+      refreshDataSources();
+
       // Not all Java UDFs are persisted to the metastore. The ones which aren't
       // should be restored once the catalog has been invalidated.
       Map<String, Db> oldDbCache = dbCache_.get();
diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
index 3087a7d30..ecc6a36e7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
@@ -40,11 +40,9 @@ import com.google.common.base.Preconditions;
 
 /**
  * All data source properties are stored as table properties (persisted in the
- * metastore) because the DataSource catalog object is not persisted so the
- * DataSource catalog object will not exist if the catalog server is restarted,
- * but the table does not need the DataSource catalog object in order to scan
- * the table. Tables that contain the TBL_PROP_DATA_SRC_NAME table parameter are
- * assumed to be backed by an external data source.
+ * metastore) so that the table does not need the DataSource catalog object in
+ * order to scan the table. Tables that contain the TBL_PROP_DATA_SRC_NAME table
+ * parameter are assumed to be backed by an external data source.
  */
 public class DataSourceTable extends Table implements FeDataSourceTable {
   private final static Logger LOG = LoggerFactory.getLogger(DataSourceTable.class);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 75fac245a..cd4086584 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -2461,9 +2462,17 @@ public class CatalogOpExecutor {
 
   private void createDataSource(TCreateDataSourceParams params, TDdlExecResponse resp)
       throws ImpalaException {
-    if (LOG.isTraceEnabled()) { LOG.trace("Adding DATA SOURCE: " + params.toString()); }
+    Preconditions.checkNotNull(params);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Adding DATA SOURCE: " + params.toString());
+    }
     DataSource dataSource = DataSource.fromThrift(params.getData_source());
-    DataSource existingDataSource = catalog_.getDataSource(dataSource.getName());
+    Preconditions.checkNotNull(dataSource);
+    String dataSrcName = dataSource.getName();
+    Preconditions.checkState(!Strings.isNullOrEmpty(dataSrcName),
+        "Null or empty DataSource name passed as argument to " +
+        "CatalogOpExecutor.createDataSource");
+    DataSource existingDataSource = catalog_.getDataSource(dataSrcName);
     if (existingDataSource != null) {
       if (!params.if_not_exists) {
         throw new ImpalaRuntimeException("Data source " + dataSource.getName() +
@@ -2474,6 +2483,9 @@ public class CatalogOpExecutor {
       resp.result.setVersion(existingDataSource.getCatalogVersion());
       return;
     }
+    // Create DataSource object in HMS.
+    addDataSourceToHms(dataSource, params.if_not_exists);
+    // Add DataSource object to memory cache,
     catalog_.addDataSource(dataSource);
     resp.result.addToUpdated_catalog_objects(dataSource.toTCatalogObject());
     resp.result.setVersion(dataSource.getCatalogVersion());
@@ -2483,7 +2495,11 @@ public class CatalogOpExecutor {
   private void dropDataSource(TDropDataSourceParams params, TDdlExecResponse resp)
       throws ImpalaException {
     if (LOG.isTraceEnabled()) LOG.trace("Drop DATA SOURCE: " + params.toString());
-    DataSource dataSource = catalog_.removeDataSource(params.getData_source());
+    String dataSrcName = params.getData_source();
+    Preconditions.checkState(!Strings.isNullOrEmpty(dataSrcName),
+        "Null or empty DataSource name passed as argument to " +
+        "CatalogOpExecutor.dropDataSource");
+    DataSource dataSource = catalog_.removeDataSource(dataSrcName);
     if (dataSource == null) {
       if (!params.if_exists) {
         throw new ImpalaRuntimeException("Data source " + params.getData_source() +
@@ -2494,6 +2510,8 @@ public class CatalogOpExecutor {
       resp.result.setVersion(catalog_.getCatalogVersion());
       return;
     }
+    // Drop DataSource object from HMS.
+    dropDataSourceFromHms(dataSrcName, /* ifExists */ false);
     resp.result.addToRemoved_catalog_objects(dataSource.toTCatalogObject());
     resp.result.setVersion(dataSource.getCatalogVersion());
     addSummary(resp, "Data source has been dropped.");
@@ -6196,6 +6214,65 @@ public class CatalogOpExecutor {
     return true;
   }
 
+  /**
+   * Creates a new DataSource in the Hive metastore. Returns true if successful
+   * and false if the call fails and ifNotExists is true.
+   * Note that DataSource object is saved as DataConnector object with type
+   * "impalaDataSource"in HMS.
+   */
+  private boolean addDataSourceToHms(DataSource dataSource, boolean ifNotExists)
+      throws ImpalaRuntimeException{
+    getMetastoreDdlLock().lock();
+    try {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        MetastoreShim.createDataSource(msClient.getHiveClient(), dataSource);
+      } catch(AlreadyExistsException e) {
+        if (!ifNotExists) {
+          throw new ImpalaRuntimeException(
+              String.format(HMS_RPC_ERROR_FORMAT_STR, "createDataConnector"), e);
+        }
+        return false;
+      } catch (TException e) {
+        LOG.error("Error executing createDataConnector() metastore call: " +
+            dataSource.getName(), e);
+        throw new ImpalaRuntimeException(
+            String.format(HMS_RPC_ERROR_FORMAT_STR, "createDataConnector"), e);
+      }
+    } finally {
+      getMetastoreDdlLock().unlock();
+    }
+    return true;
+  }
+
+  /**
+   * Drops the DataSource with given name from Hive metastore. Returns true if
+   * successful and false if the DataSource does not exist and ifExists is true.
+   */
+  private boolean dropDataSourceFromHms(String name, boolean ifExists)
+      throws ImpalaRuntimeException {
+    getMetastoreDdlLock().lock();
+    try {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        try {
+          MetastoreShim.dropDataSource(msClient.getHiveClient(), name, ifExists);
+        } catch (NoSuchObjectException e) {
+          if (!ifExists) {
+            throw new ImpalaRuntimeException(
+                String.format(HMS_RPC_ERROR_FORMAT_STR, "dropDataConnector"), e);
+          }
+          return false;
+        } catch (TException e) {
+          LOG.error("Error executing dropDataConnector HMS call: " + name, e);
+          throw new ImpalaRuntimeException(
+              String.format(HMS_RPC_ERROR_FORMAT_STR, "dropDataDataConnector"), e);
+        }
+      }
+    } finally {
+      getMetastoreDdlLock().unlock();
+    }
+    return true;
+  }
+
   /**
    * Updates the database object in the metastore.
    */
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index fe300fa48..1adfd7efa 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -555,4 +555,11 @@ public class JniCatalog {
           return response;
         });
   }
+
+  /**
+   * Refresh data sources from metadata store.
+   */
+  public void refreshDataSources() throws TException {
+    catalog_.refreshDataSources();
+  }
 }
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 51dcdcf6e..31aa4a272 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -257,3 +257,6 @@ class SkipIfOS:
 class SkipIfApacheHive():
   feature_not_supported = pytest.mark.skipif(IS_APACHE_HIVE,
       reason="Apache Hive does not support this feature")
+  data_connector_not_supported = pytest.mark.skipif(
+      IS_APACHE_HIVE and HIVE_MAJOR_VERSION <= 3,
+      reason="Apache Hive 3.1 or older version do not support DataConnector")
diff --git a/tests/custom_cluster/test_ext_data_sources.py b/tests/custom_cluster/test_ext_data_sources.py
index f12fd1b56..ffa907d06 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -21,6 +21,8 @@ import os
 import subprocess
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.environ import build_flavor_timeout
+from tests.common.skip import SkipIfApacheHive
 
 
 class TestExtDataSources(CustomClusterTestSuite):
@@ -60,6 +62,89 @@ class TestExtDataSources(CustomClusterTestSuite):
     """Run test with batch size less than default size 1024"""
     self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database)
 
+  @SkipIfApacheHive.data_connector_not_supported
+  @pytest.mark.execute_serially
+  def test_restart_catalogd(self, vector, unique_database):
+    """Restart Catalog server after creating a data source. Verify that the data source
+    object is persistent across restarting of Catalog server."""
+    DROP_DATA_SOURCE_QUERY = "DROP DATA SOURCE IF EXISTS test_restart_persistent"
+    CREATE_DATA_SOURCE_QUERY = "CREATE DATA SOURCE test_restart_persistent " \
+        "LOCATION '/test-warehouse/data-sources/jdbc-data-source.jar' " \
+        "CLASS 'org.apache.impala.extdatasource.jdbc.PersistentJdbcDataSource' " \
+        "API_VERSION 'V1'"
+    SHOW_DATA_SOURCE_QUERY = "SHOW DATA SOURCES LIKE 'test_restart_*'"
+
+    # Create a data source and verify that the object is created successfully.
+    self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
+    self.execute_query_expect_success(self.client, CREATE_DATA_SOURCE_QUERY)
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "PersistentJdbcDataSource" in result.get_data()
+
+    # Restart Catalog server.
+    self.cluster.catalogd.restart()
+    wait_time_s = build_flavor_timeout(90, slow_build_timeout=180)
+    self.cluster.statestored.service.wait_for_metric_value('statestore.live-backends',
+        expected_value=4, timeout=wait_time_s)
+
+    # Verify that the data source object is still available after restarting Catalog
+    # server.
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "PersistentJdbcDataSource" in result.get_data()
+    # Remove the data source
+    self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "PersistentJdbcDataSource" not in result.get_data()
+
+  @SkipIfApacheHive.data_connector_not_supported
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
+                     "--statestore_heartbeat_frequency_ms=1000",
+    start_args="--enable_catalogd_ha")
+  def test_catalogd_ha_failover(self):
+    """The test case for cluster started with catalogd HA enabled."""
+    DROP_DATA_SOURCE_QUERY = "DROP DATA SOURCE IF EXISTS test_failover_persistent"
+    CREATE_DATA_SOURCE_QUERY = "CREATE DATA SOURCE test_failover_persistent " \
+        "LOCATION '/test-warehouse/data-sources/jdbc-data-source.jar' " \
+        "CLASS 'org.apache.impala.extdatasource.jdbc.FailoverInSyncJdbcDataSource' " \
+        "API_VERSION 'V1'"
+    SHOW_DATA_SOURCE_QUERY = "SHOW DATA SOURCES LIKE 'test_failover_*'"
+    # Verify two catalogd instances are created with one as active.
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 2)
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_2 = catalogds[1].service
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
+
+    # Create a data source and verify that the object is created successfully.
+    self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
+    self.execute_query_expect_success(self.client, CREATE_DATA_SOURCE_QUERY)
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "FailoverInSyncJdbcDataSource" in result.get_data()
+
+    # Kill active catalogd
+    catalogds[0].kill()
+    # Wait for long enough for the statestore to detect the failure of active catalogd
+    # and assign active role to standby catalogd.
+    catalogd_service_2.wait_for_metric_value(
+        "catalog-server.active-status", expected_value=True, timeout=30)
+    assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
+
+    # Verify that the data source object is available in the catalogd of HA pair.
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "FailoverInSyncJdbcDataSource" in result.get_data()
+    # Remove the data source
+    self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
+    result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
+    assert result.success, str(result)
+    assert "FailoverInSyncJdbcDataSource" not in result.get_data()
+
 
 class TestMySqlExtJdbcTables(CustomClusterTestSuite):
   """Impala query tests for external jdbc tables on MySQL server."""


(impala) 01/02: IMPALA-12380: Securing dbcp.password for JDBC external data source

Posted by wz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9a132bc436e956641e1eb58073b446f71369fb3c
Author: Gaurav Singh <ga...@gmail>
AuthorDate: Tue Dec 5 13:42:27 2023 -0800

    IMPALA-12380: Securing dbcp.password for JDBC
    external data source
    
    In the current implementation of external JDBC data source,
    the user has to provide both the username and password in
    plain text which is not a good practice.
    
    This patch extends the functionality of existing implementation
    to either provide:
    a) username and password
    b) username or key and keystore
    
    If the user provides the password, then that password is used.
    However, if no password is provided and the user provides only the
    key/keystore, then it fetches the password from the secure jceks
    keystore.
    
    Testing:
    - Added unit test TestExtDataSourcesWithKeyStore
    
    Change-Id: Iec83a9b6e00456f0a1bbee747bd752b2cf9bf238
    Reviewed-on: http://gerrit.cloudera.org:8080/20809
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../extdatasource/jdbc/conf/JdbcStorageConfig.java |   7 +-
 .../jdbc/conf/JdbcStorageConfigManager.java        |  57 ++++++++-
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java      |   6 +
 testdata/bin/copy-ext-data-sources.sh              |  17 +++
 .../QueryTest/jdbc-data-source-with-keystore.test  | 127 +++++++++++++++++++++
 tests/common/impala_test_suite.py                  |   4 +-
 tests/query_test/test_ext_data_sources.py          |   5 +
 tests/util/filesystem_utils.py                     |  11 ++
 8 files changed, 229 insertions(+), 5 deletions(-)

diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
index e48fb07ef..36d62e8f2 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
@@ -37,6 +37,12 @@ public enum JdbcStorageConfig {
   DBCP_USERNAME("dbcp.username", false),
   // Password of the user.
   DBCP_PASSWORD("dbcp.password", false),
+  // Key of the keystore.
+  DBCP_PASSWORD_KEY("dbcp.password.key", false),
+  // Keystore URI in the format like jceks://hdfs/test-warehouse/data-sources/test.jceks.
+  // In Impala unit-test environment, URI scheme of filesystem is set as following:
+  // "hdfs" for HDFS, "s3a" for S3, and "ofs" for Ozone.
+  DBCP_PASSWORD_KEYSTORE("dbcp.password.keystore", false),
   // Number of rows to fetch in a batch.
   JDBC_FETCH_SIZE("jdbc.fetch.size", false),
   // SQL query which specify how to get data from external database.
@@ -50,7 +56,6 @@ public enum JdbcStorageConfig {
   private final String propertyName;
   private boolean required = false;
 
-
   JdbcStorageConfig(String propertyName, boolean required) {
     this.propertyName = propertyName;
     this.required = required;
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java
index e5e2e972f..280fbccd5 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java
@@ -17,11 +17,13 @@
 
 package org.apache.impala.extdatasource.jdbc.conf;
 
-
 import java.util.Map;
 import java.util.Map.Entry;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,8 +32,8 @@ import org.slf4j.LoggerFactory;
  */
 public class JdbcStorageConfigManager {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(JdbcStorageConfigManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(
+      JdbcStorageConfigManager.class);
 
   public static Configuration convertMapToConfiguration(Map<String, String> props) {
     checkRequiredPropertiesAreDefined(props);
@@ -44,6 +46,55 @@ public class JdbcStorageConfigManager {
     return conf;
   }
 
+  public static String getPasswordFromProperties(Configuration conf) {
+    String username = conf.get(JdbcStorageConfig.DBCP_USERNAME.getPropertyName());
+    String passwd = conf.get(JdbcStorageConfig.DBCP_PASSWORD.getPropertyName());
+    String keystore = conf.get(JdbcStorageConfig.DBCP_PASSWORD_KEYSTORE.
+        getPropertyName());
+    if (countNonNull(passwd, keystore) > 1) {
+      LOGGER.warn("Only one of " + passwd + ", " + keystore + " can be set");
+    }
+    if (passwd == null && keystore != null) {
+      String key = conf.get(JdbcStorageConfig.DBCP_PASSWORD_KEY.getPropertyName());
+      if (key == null) {
+        key = username;
+      }
+      LOGGER.info("hadoop keystore: " + keystore + " hadoop key: " + key);
+      try {
+        passwd = getPasswdFromKeystore(keystore, key);
+      } catch (IOException e) {
+        LOGGER.error("Failed to get password from keystore " + key + ", error: " + e);
+      }
+    }
+    return passwd;
+  }
+
+  private static int countNonNull(String ... values) {
+    int count = 0;
+    for (String str : values) {
+      if (str != null) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  public static String getPasswdFromKeystore(String keystore, String key)
+      throws IOException {
+    String passwd = null;
+    if (keystore != null && key != null) {
+      Configuration conf = new Configuration();
+      conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, keystore);
+      char[] pwdCharArray = conf.getPassword(key);
+      if (pwdCharArray != null) {
+        passwd = new String(pwdCharArray);
+      } else {
+        LOGGER.error("empty or null password for " + key);
+      }
+    }
+    return passwd;
+  }
+
   private static void checkRequiredPropertiesAreDefined(Map<String, String> props) {
 
     try {
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
index 4ab214d2c..b60c3d15a 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -283,6 +283,12 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
       }
     }
 
+    String passwd = JdbcStorageConfigManager.getPasswordFromProperties(conf);
+    if (passwd != null) {
+      dbProperties.put(JdbcStorageConfig.DBCP_PASSWORD.getPropertyName().replaceFirst(
+          DBCP_CONFIG_PREFIX + "\\.", ""), passwd);
+    }
+
     // essential properties
     String jdbcUrl = conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName());
     String jdbcAuth = conf.get(JdbcStorageConfig.JDBC_AUTH.getPropertyName());
diff --git a/testdata/bin/copy-ext-data-sources.sh b/testdata/bin/copy-ext-data-sources.sh
index 7b651914f..c3896aa24 100755
--- a/testdata/bin/copy-ext-data-sources.sh
+++ b/testdata/bin/copy-ext-data-sources.sh
@@ -54,3 +54,20 @@ hadoop fs -put -f \
 
 echo "Copied" ${IMPALA_HOME}/fe/target/dependency/postgresql-*.jar \
   "into HDFS" ${JDBC_DRIVERS_HDFS_PATH}
+
+# Extract URI scheme from DEFAULT_FS environment variable.
+FILESYSTEM_URI_SCHEME="hdfs"
+if [ ! -z "$DEFAULT_FS" ] && [[ $DEFAULT_FS =~ ":" ]]; then
+  FILESYSTEM_URI_SCHEME=`echo $DEFAULT_FS | cut -d \: -f 1`
+fi
+# Delete hivuser if it already exists in the jceks file
+if [ $(hadoop credential list -provider jceks://${FILESYSTEM_URI_SCHEME}/test-warehouse/\
+data-sources/test.jceks | grep -c 'hiveuser') -eq 1 ]; then
+  hadoop credential delete hiveuser -f -provider jceks://${FILESYSTEM_URI_SCHEME}/\
+test-warehouse/data-sources/test.jceks > /dev/null 2>&1
+fi
+
+# Store password in a Java keystore file on HDFS
+hadoop credential create hiveuser -provider \
+  jceks://${FILESYSTEM_URI_SCHEME}/test-warehouse/data-sources/test.jceks -v password\
+  > /dev/null 2>&1
diff --git a/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source-with-keystore.test b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source-with-keystore.test
new file mode 100644
index 000000000..2e961d9a3
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source-with-keystore.test
@@ -0,0 +1,127 @@
+====
+---- QUERY
+# Create DataSource
+DROP DATA SOURCE IF EXISTS TestJdbcDataSourceWithKeystore;
+CREATE DATA SOURCE TestJdbcDataSourceWithKeystore
+LOCATION '$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-data-source.jar'
+CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource'
+API_VERSION 'V1';
+---- RESULTS
+'Data source has been created.'
+====
+---- QUERY
+# Show created DataSource
+SHOW DATA SOURCES LIKE 'testjdbcdatasourcewithkeystore';
+---- LABELS
+NAME,LOCATION,CLASS NAME,API VERSION
+---- RESULTS
+'testjdbcdatasourcewithkeystore',regex:'.*/test-warehouse/data-sources/jdbc-data-source.jar','org.apache.impala.extdatasource.jdbc.JdbcDataSource','V1'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+# Create external JDBC DataSource table with username, key and keystore
+DROP TABLE IF EXISTS alltypes_jdbc_datasource_keystore;
+CREATE TABLE alltypes_jdbc_datasource_keystore (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE TestJdbcDataSourceWithKeystore(
+'{"database.type":"POSTGRES",
+"jdbc.url":"jdbc:postgresql://$INTERNAL_LISTEN_HOST:5432/functional",
+"jdbc.driver":"org.postgresql.Driver",
+"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar",
+"dbcp.username":"hiveuser",
+"dbcp.password.keystore":"jceks://$FILESYSTEM_URI_SCHEME/test-warehouse/data-sources/test.jceks",
+"dbcp.password.key":"hiveuser",
+"table":"alltypes"}');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Test the jdbc DataSource
+# count(*) with a predicate evaluated by Impala
+select count(*) from alltypes_jdbc_datasource_keystore
+where float_col = 0 and string_col is not NULL
+---- RESULTS
+730
+---- TYPES
+BIGINT
+====
+---- QUERY
+# count(*) with no predicates has no materialized slots
+select count(*) from alltypes_jdbc_datasource_keystore
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_jdbc_datasource_keystore;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+# Create external JDBC DataSource table with username and keystore
+DROP TABLE IF EXISTS alltypes_jdbc_datasource_keystore;
+CREATE TABLE alltypes_jdbc_datasource_keystore (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE TestJdbcDataSourceWithKeystore(
+'{"database.type":"POSTGRES",
+"jdbc.url":"jdbc:postgresql://$INTERNAL_LISTEN_HOST:5432/functional",
+"jdbc.driver":"org.postgresql.Driver",
+"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar",
+"dbcp.username":"hiveuser",
+"dbcp.password.keystore":"jceks://$FILESYSTEM_URI_SCHEME/test-warehouse/data-sources/test.jceks",
+"table":"alltypes"}');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Test the jdbc DataSource
+# count(*) with a predicate evaluated by Impala
+select count(*) from alltypes_jdbc_datasource_keystore
+where float_col = 0 and string_col is not NULL
+---- RESULTS
+730
+---- TYPES
+BIGINT
+====
+---- QUERY
+# count(*) with no predicates has no materialized slots
+select count(*) from alltypes_jdbc_datasource_keystore
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_jdbc_datasource_keystore;
+---- RESULTS
+'Table has been dropped.'
+---- QUERY
+# Drop DataSource
+DROP DATA SOURCE TestJdbcDataSourceWithKeystore;
+---- RESULTS
+'Data source has been dropped.'
+====
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index bbf430206..89e0cc4e7 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -77,7 +77,8 @@ from tests.util.filesystem_utils import (
     S3GUARD_ENABLED,
     ADLS_STORE_NAME,
     FILESYSTEM_PREFIX,
-    FILESYSTEM_NAME)
+    FILESYSTEM_NAME,
+    FILESYSTEM_URI_SCHEME)
 
 from tests.util.hdfs_util import (
   HdfsConfig,
@@ -495,6 +496,7 @@ class ImpalaTestSuite(BaseTestSuite):
     repl = dict(('$' + k, globs[k]) for k in [
         "FILESYSTEM_PREFIX",
         "FILESYSTEM_NAME",
+        "FILESYSTEM_URI_SCHEME",
         "GROUP_NAME",
         "NAMENODE",
         "IMPALA_HOME",
diff --git a/tests/query_test/test_ext_data_sources.py b/tests/query_test/test_ext_data_sources.py
index 094f0cd67..773ec2698 100644
--- a/tests/query_test/test_ext_data_sources.py
+++ b/tests/query_test/test_ext_data_sources.py
@@ -80,3 +80,8 @@ class TestExtDataSources(ImpalaTestSuite):
 
   def test_jdbc_data_source(self, vector, unique_database):
     self.run_test_case('QueryTest/jdbc-data-source', vector, use_db=unique_database)
+
+  def test_jdbc_data_source_with_keystore(self, vector, unique_database):
+    # Impala query tests for external data sources with keystore.
+    self.run_test_case('QueryTest/jdbc-data-source-with-keystore', vector,
+        use_db=unique_database)
diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py
index a3114e18b..42d0e69e3 100644
--- a/tests/util/filesystem_utils.py
+++ b/tests/util/filesystem_utils.py
@@ -85,6 +85,17 @@ def get_secondary_fs_path(path):
   return prepend_with_fs(SECONDARY_FILESYSTEM, path)
 
 
+def get_fs_uri_scheme():
+  # Set default URI scheme as "hdfs".
+  uri_scheme = "hdfs"
+  # Extract URI scheme from DEFAULT_FS environment variable.
+  default_fs = os.getenv("DEFAULT_FS")
+  if default_fs and default_fs.find(':') != -1:
+    uri_scheme = default_fs.split(':')[0]
+  return uri_scheme
+
+
 WAREHOUSE = get_fs_path('/test-warehouse')
 FILESYSTEM_NAME = get_fs_name(FILESYSTEM)
 WAREHOUSE_PREFIX = os.getenv("WAREHOUSE_LOCATION_PREFIX")
+FILESYSTEM_URI_SCHEME = get_fs_uri_scheme()