You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/12/22 21:56:55 UTC

(impala) 02/02: IMPALA-12502: Support Impala to Impala federation

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ec22a1e1cad9f2e0ab75a598eb75727c7a044172
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Fri Nov 24 20:56:04 2023 -0800

    IMPALA-12502: Support Impala to Impala federation
    
    This patch adds support to read Impala tables in the Impala cluster
    through JDBC external data source. It also adds a new counter
    NumExternalDataSourceGetNext in profile for the total number of calls
    to ExternalDataSource::GetNext().
    Setting query options for Impala will be supported in a following patch.
    
    Testing:
     - Added an end-to-end unit test to read Impala tables from Impala
       cluster through JDBC external data source.
       Manually ran the unit-test with Impala tables in Impala cluster on a
       remote host by setting $INTERNAL_LISTEN_HOST in jdbc.url as the ip
       address of the remote host on which an Impala cluster is running.
     - Added LDAP test for reading table through JDBC external data source
       with LDAP authentication.
       Manually ran the unit-test with Impala tables in a remote Impala
       cluster.
     - Passed core tests.
    
    Change-Id: I79ad3273932b658cb85c9c17cc834fa1b5fbd64f
    Reviewed-on: http://gerrit.cloudera.org:8080/20731
    Reviewed-by: Abhishek Rawat <ar...@cloudera.com>
    Tested-by: Wenzhe Zhou <wz...@cloudera.com>
---
 be/src/exec/data-source-scan-node.cc               |   6 +
 be/src/exec/data-source-scan-node.h                |   3 +
 bin/impala-config.sh                               |   4 +
 .../apache/impala/customcluster/LdapHS2Test.java   | 153 ++++++++++++-
 .../impala/extdatasource/jdbc/JdbcDataSource.java  |   4 +
 .../extdatasource/jdbc/conf/DatabaseType.java      |   3 +-
 .../extdatasource/jdbc/conf/JdbcStorageConfig.java |   2 +
 .../jdbc/dao/DatabaseAccessorFactory.java          |   4 +
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java      |   8 +-
 .../ImpalaDatabaseAccessor.java}                   |  39 +++-
 testdata/bin/download-impala-jdbc-driver.sh        |  72 +++++++
 .../queries/QueryTest/impala-ext-jdbc-tables.test  | 237 +++++++++++++++++++++
 tests/custom_cluster/test_ext_data_sources.py      |  34 +++
 13 files changed, 557 insertions(+), 12 deletions(-)

diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index f37318184..df2eb7f5a 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -49,6 +49,9 @@ DEFINE_int32(data_source_batch_size, 1024, "Batch size for calls to GetNext() on
 
 namespace impala {
 
+PROFILE_DEFINE_COUNTER(NumExternalDataSourceGetNext, DEBUG, TUnit::UNIT,
+    "The total number of calls to ExternalDataSource::GetNext()");
+
 // $0 = num expected cols, $1 = actual num columns
 const string ERROR_NUM_COLUMNS = "Data source returned unexpected number of columns. "
     "Expected $0 but received $1. This likely indicates a problem with the data source "
@@ -93,6 +96,8 @@ Status DataSourceScanNode::Prepare(RuntimeState* state) {
       data_src_node_.init_string));
 
   cols_next_val_idx_.resize(tuple_desc_->slots().size(), 0);
+  num_ext_data_source_get_next_ =
+      PROFILE_NumExternalDataSourceGetNext.Instantiate(runtime_profile_);
   return Status::OK();
 }
 
@@ -157,6 +162,7 @@ Status DataSourceScanNode::GetNextInputBatch() {
   Ubsan::MemSet(cols_next_val_idx_.data(), 0, sizeof(int) * cols_next_val_idx_.size());
   TGetNextParams params;
   params.__set_scan_handle(scan_handle_);
+  COUNTER_ADD(num_ext_data_source_get_next_, 1);
   RETURN_IF_ERROR(data_source_executor_->GetNext(params, input_batch_.get()));
   RETURN_IF_ERROR(Status(input_batch_->status));
   RETURN_IF_ERROR(ValidateRowBatchSize());
diff --git a/be/src/exec/data-source-scan-node.h b/be/src/exec/data-source-scan-node.h
index 7b1e33714..452bac153 100644
--- a/be/src/exec/data-source-scan-node.h
+++ b/be/src/exec/data-source-scan-node.h
@@ -98,6 +98,9 @@ class DataSourceScanNode : public ScanNode {
   /// the next row batch.
   std::vector<int> cols_next_val_idx_;
 
+  /// The total number of calls to ExternalDataSource::GetNext().
+  RuntimeProfile::Counter* num_ext_data_source_get_next_;
+
   /// Materializes the next row (next_row_idx_) into tuple. 'local_tz' is used as the
   /// local time-zone for materializing 'TYPE_TIMESTAMP' slots.
   Status MaterializeNextRow(const Timezone* local_tz, MemPool* mem_pool, Tuple* tuple);
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 7bc23770b..b6d3547be 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -207,6 +207,10 @@ if [[ $ARCH_NAME == 'aarch64' ]]; then
   export IMPALA_HADOOP_CLIENT_VERSION=3.3.6
   unset IMPALA_HADOOP_CLIENT_URL
 fi
+
+# Impala JDBC driver for testing.
+export IMPALA_SIMBA_JDBC_DRIVER_VERSION=42-2.6.32.1041
+
 # Thrift related environment variables.
 # IMPALA_THRIFT_POM_VERSION is used to populate IMPALA_THRIFT_JAVA_VERSION and
 # thrift.version in java/pom.xml.
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
index 325f1a148..5b9540e77 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
@@ -40,6 +40,8 @@ import org.apache.thrift.transport.THttpClient;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @CreateDS(name = "myDS",
     partitions = { @CreatePartition(name = "test", suffix = "dc=myorg,dc=com") })
@@ -51,6 +53,8 @@ import org.junit.Test;
  * ldap authentication is being used.
  */
 public class LdapHS2Test {
+  private static final Logger LOG = LoggerFactory.getLogger(LdapHS2Test.class);
+
   @ClassRule
   public static CreateLdapServerRule serverRule = new CreateLdapServerRule();
 
@@ -92,7 +96,9 @@ public class LdapHS2Test {
     verifySuccess(fetchResp.getStatus());
     List<TColumn> columns = fetchResp.getResults().getColumns();
     assertEquals(1, columns.size());
-    assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
+    if (expectedResult != null) {
+      assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
+    }
 
     return execResp.getOperationHandle();
   }
@@ -704,4 +710,149 @@ public class LdapHS2Test {
       assertEquals(e.getMessage(), "HTTP Response code: 401");
     }
   }
+
+  /**
+   * Tests LDAP for reading Impala table through JDBC external data source.
+   */
+  @Test
+  public void testImpalaExtJdbcTables() throws Exception {
+    setUp("");
+    verifyMetrics(0, 0);
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+    // Authenticate as 'Test1Ldap' with password '12345'
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'Test1Ldap'.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    TSessionHandle session = openResp.getSessionHandle();
+    // One successful authentication.
+    verifyMetrics(1, 0);
+
+    // Download Impala JDBC driver.
+    String downloadImpalaJdbcDriver = new File(System.getenv("IMPALA_HOME"),
+        "testdata/bin/download-impala-jdbc-driver.sh").getPath();
+    String[] cmd = { downloadImpalaJdbcDriver };
+    RunShellCommand.Run(cmd, /*shouldSucceed*/ true, "", "");
+
+    // Define queries.
+    String fileSystemPrefix = System.getenv("FILESYSTEM_PREFIX");
+    String internalListenHost = System.getenv("INTERNAL_LISTEN_HOST");
+
+    String dropDSQuery = "DROP DATA SOURCE IF EXISTS impala_jdbc_test_ds";
+    String createDSQuery = String.format("CREATE DATA SOURCE impala_jdbc_test_ds " +
+        "LOCATION '%s/test-warehouse/data-sources/jdbc-data-source.jar' " +
+        "CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' " +
+        "API_VERSION 'V1'", fileSystemPrefix);
+    String dropTableQuery = "DROP TABLE IF EXISTS %s";
+    // Set JDBC authentication mechanisms as LDAP (3) with username/password as
+    // TEST_USER_1/TEST_PASSWORD_1.
+    String createTableQuery = String.format("CREATE TABLE impala_jdbc_ext_test_table (" +
+        "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
+        "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
+        "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
+        "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
+        "'{\"database.type\":\"IMPALA\", " +
+          "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
+          "\"jdbc.auth\":\"AuthMech=3\", " +
+          "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
+          "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
+          "ImpalaJDBC42.jar\", " +
+          "\"dbcp.username\":\"%s\", " +
+          "\"dbcp.password\":\"%s\", " +
+          "\"table\":\"alltypes\"}')",
+          internalListenHost, fileSystemPrefix, TEST_USER_1, TEST_PASSWORD_1);
+    // Set JDBC authentication mechanisms as LDAP with wrong password.
+    String createTableWithWrongPassword =
+        String.format("CREATE TABLE impala_jdbc_tbl_wrong_password (" +
+        "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
+        "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
+        "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
+        "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
+        "'{\"database.type\":\"IMPALA\", " +
+          "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
+          "\"jdbc.auth\":\"AuthMech=3\", " +
+          "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
+          "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
+          "ImpalaJDBC42.jar\", " +
+          "\"dbcp.username\":\"%s\", " +
+          "\"dbcp.password\":\"wrong-password\", " +
+          "\"table\":\"alltypes\"}')",
+          internalListenHost, fileSystemPrefix, TEST_USER_1);
+    // Set JDBC authentication mechanisms as LDAP without AuthMech.
+    String createTableWithoutAuthMech =
+        String.format("CREATE TABLE impala_jdbc_tbl_without_auth_mech (" +
+        "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
+        "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
+        "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
+        "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
+        "'{\"database.type\":\"IMPALA\", " +
+          "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
+          "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
+          "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
+          "ImpalaJDBC42.jar\", " +
+          "\"dbcp.username\":\"%s\", " +
+          "\"dbcp.password\":\"%s\", " +
+          "\"table\":\"alltypes\"}')",
+          internalListenHost, fileSystemPrefix, TEST_USER_1, TEST_PASSWORD_1);
+    String selectQuery = "select string_col from %s where id=9";
+
+    // Run queries.
+    //
+    // Create data source and tables.
+    execAndFetch(client, session, dropDSQuery, null);
+    execAndFetch(client, session, createDSQuery, "Data source has been created.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_ext_test_table"), null);
+    execAndFetch(client, session, createTableQuery, "Table has been created.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_tbl_wrong_password"), null);
+    execAndFetch(client, session, createTableWithWrongPassword,
+        "Table has been created.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_tbl_without_auth_mech"), null);
+    execAndFetch(client, session, createTableWithoutAuthMech, "Table has been created.");
+
+    // Successfully access JDBC data source table with LDAP.
+    execAndFetch(client, session,
+        String.format(selectQuery, "impala_jdbc_ext_test_table"), "9");
+    // Negative case for JDBC data source table with wrong password.
+    String expectedError = "Error initialized or created transport for authentication";
+    try {
+      execAndFetch(client, session,
+          String.format(selectQuery, "impala_jdbc_tbl_wrong_password"), "9");
+      fail("Expected error: " + expectedError);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(expectedError));
+    }
+    // Negative case for JDBC data source table without AuthMech.
+    expectedError = "Communication link failure. Failed to connect to server";
+    try {
+      execAndFetch(client, session,
+          String.format(selectQuery, "impala_jdbc_tbl_without_auth_mech"), "9");
+      fail("Expected error: " + expectedError);
+    } catch (Exception e) {
+      assertTrue(String.format("Authentication failed with error: %s", e.getMessage()),
+          e.getMessage().contains(expectedError));
+    }
+
+    // Drop data source and tables.
+    execAndFetch(client, session, dropDSQuery, "Data source has been dropped.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_ext_test_table"),
+        "Table has been dropped.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_tbl_wrong_password"),
+        "Table has been dropped.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_tbl_without_auth_mech"),
+        "Table has been dropped.");
+
+    // Two successful authentications for each ExecAndFetch().
+    verifyMetrics(31, 0);
+  }
 }
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
index 173753d64..10df71dd5 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
@@ -229,6 +229,9 @@ public class JdbcDataSource implements ExternalDataSource {
           initString = initString.substring(CACHE_CLASS_PREFIX.length());
           cacheClass_ = true;
         }
+        // Replace '\n' with single space character so that one property setting in
+        // initString can be broken into multiple lines for better readability.
+        initString = initString.replace('\n', ' ');
         Map<String, String> config = new ObjectMapper().readValue(initString, typeRef);
         tableConfig_ = JdbcStorageConfigManager.convertMapToConfiguration(config);
       } catch (JsonProcessingException e) {
@@ -294,6 +297,7 @@ public class JdbcDataSource implements ExternalDataSource {
     }
     // Execute query and get iterator
     tableConfig_.set(JdbcStorageConfig.QUERY.getPropertyName(), sb.toString());
+    LOG.trace("JDBC Query: " + sb.toString());
 
     if (schema_.getColsSize() != 0) {
       int limit = -1;
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
index 9b30350bc..a01fb4108 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
@@ -24,5 +24,6 @@ public enum DatabaseType {
   ORACLE,
   POSTGRES,
   MSSQL,
-  JETHRO_DATA
+  JETHRO_DATA,
+  IMPALA
 }
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
index 0e1ac5ab3..e48fb07ef 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
@@ -26,6 +26,8 @@ public enum JdbcStorageConfig {
   // JDBC connection string, including the database type, IP address, port number, and
   // database name. For example, "jdbc:postgresql://127.0.0.1:5432/functional
   JDBC_URL("jdbc.url", true),
+  // Authentication mechanisms of JDBC driver.
+  JDBC_AUTH("jdbc.auth", false),
   // Class name of JDBC driver. For example, "org.postgresql.Driver"
   JDBC_DRIVER_CLASS("jdbc.driver", true),
   // Driver URL for downloading the Jar file package that is used to access the external
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
index 5415b5763..decde6797 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
@@ -57,6 +57,10 @@ public class DatabaseAccessorFactory {
         accessor = new DB2DatabaseAccessor();
         break;
 
+      case IMPALA:
+        accessor = new ImpalaDatabaseAccessor();
+        break;
+
       default:
         accessor = new GenericJdbcDatabaseAccessor();
         break;
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
index b2e820c30..4ab214d2c 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -48,6 +48,7 @@ import org.apache.impala.thrift.TStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
@@ -283,7 +284,12 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
     }
 
     // essential properties
-    dbProperties.put("url", conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName()));
+    String jdbcUrl = conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName());
+    String jdbcAuth = conf.get(JdbcStorageConfig.JDBC_AUTH.getPropertyName());
+    if (!Strings.isNullOrEmpty(jdbcAuth)) {
+      jdbcUrl += ";" + jdbcAuth;
+    }
+    dbProperties.put("url", jdbcUrl);
     dbProperties.put("driverClassName",
         conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName()));
     dbProperties.put("driverUrl",
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
similarity index 51%
copy from java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
copy to java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
index 9b30350bc..361eba845 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
@@ -15,14 +15,35 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.impala.extdatasource.jdbc.conf;
+package org.apache.impala.extdatasource.jdbc.dao;
+
+/**
+ * Impala specific data accessor. This is needed because Impala JDBC drivers do not
+ * support generic LIMIT and OFFSET escape functions
+ */
+public class ImpalaDatabaseAccessor extends GenericJdbcDatabaseAccessor {
+
+  @Override
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    } else {
+      if (limit != -1) {
+        return sql + " LIMIT " + limit + " OFFSET " + offset;
+      } else {
+        return sql;
+      }
+    }
+  }
+
+
+  @Override
+  protected String addLimitToQuery(String sql, int limit) {
+    if (limit != -1) {
+      return sql + " LIMIT " + limit;
+    } else {
+      return sql;
+    }
+  }
 
-public enum DatabaseType {
-  MYSQL,
-  H2,
-  DB2,
-  ORACLE,
-  POSTGRES,
-  MSSQL,
-  JETHRO_DATA
 }
diff --git a/testdata/bin/download-impala-jdbc-driver.sh b/testdata/bin/download-impala-jdbc-driver.sh
new file mode 100755
index 000000000..05d445295
--- /dev/null
+++ b/testdata/bin/download-impala-jdbc-driver.sh
@@ -0,0 +1,72 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# This script download the Impala jdbc driver and copy it to Hadoop FS.
+
+set -euo pipefail
+. $IMPALA_HOME/bin/report_build_error.sh
+setup_report_build_error
+
+. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
+
+EXT_DATA_SOURCES_HDFS_PATH=${FILESYSTEM_PREFIX}/test-warehouse/data-sources
+JDBC_DRIVERS_HDFS_PATH=${EXT_DATA_SOURCES_HDFS_PATH}/jdbc-drivers
+SIMBA_DRIVER_ZIP_FILENAME=ClouderaImpala_JDBC${IMPALA_SIMBA_JDBC_DRIVER_VERSION}
+INNER_SIMBA_DRIVER_ZIP_FILENAME=ClouderaImpalaJDBC${IMPALA_SIMBA_JDBC_DRIVER_VERSION}
+DRIVER_JAR_VERSION=${IMPALA_SIMBA_JDBC_DRIVER_VERSION%-*}
+SIMBA_DRIVER_JAR_FILENAME=ImpalaJDBC${DRIVER_JAR_VERSION}.jar
+
+found=$(hadoop fs -find ${JDBC_DRIVERS_HDFS_PATH} -name ${SIMBA_DRIVER_JAR_FILENAME})
+if [ ! -z "$found" ]; then
+  echo "JDBC driver jar file already exists"
+  exit 0
+fi
+
+hadoop fs -mkdir -p ${JDBC_DRIVERS_HDFS_PATH}
+pushd /tmp
+
+mkdir -p impala_jdbc_driver
+cd impala_jdbc_driver
+
+# Download Impala jdbc driver.
+wget "https://downloads.cloudera.com/connectors/${SIMBA_DRIVER_ZIP_FILENAME}.zip"
+
+# Use Python modules to unzip zip file since 'unzip' command is not available in some
+# testing environments.
+cat > unzip.py <<__EOT__
+import sys
+from zipfile import PyZipFile
+pzf = PyZipFile(sys.argv[1])
+pzf.extractall()
+__EOT__
+
+# Extract driver jar file from zip file.
+python ./unzip.py ${SIMBA_DRIVER_ZIP_FILENAME}.zip
+python ./unzip.py ${SIMBA_DRIVER_ZIP_FILENAME}/${INNER_SIMBA_DRIVER_ZIP_FILENAME}.zip
+
+# Copy driver jar file to Hadoop FS.
+hadoop fs -put -f /tmp/impala_jdbc_driver/${SIMBA_DRIVER_JAR_FILENAME} \
+    ${JDBC_DRIVERS_HDFS_PATH}/${SIMBA_DRIVER_JAR_FILENAME}
+
+echo "Copied ${SIMBA_DRIVER_JAR_FILENAME} into HDFS ${JDBC_DRIVERS_HDFS_PATH}"
+
+cd ..
+rm -rf impala_jdbc_driver
+popd
+
diff --git a/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test b/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
new file mode 100644
index 000000000..8001295d4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
@@ -0,0 +1,237 @@
+====
+---- QUERY
+# Create DataSource
+DROP DATA SOURCE IF EXISTS TestJdbcDataSource;
+CREATE DATA SOURCE TestJdbcDataSource
+LOCATION '$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-data-source.jar'
+CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource'
+API_VERSION 'V1';
+---- RESULTS
+'Data source has been created.'
+====
+---- QUERY
+# Show created DataSource
+SHOW DATA SOURCES LIKE 'testjdbcdatasource';
+---- LABELS
+NAME,LOCATION,CLASS NAME,API VERSION
+---- RESULTS
+'testjdbcdatasource',regex:'.*/test-warehouse/data-sources/jdbc-data-source.jar','org.apache.impala.extdatasource.jdbc.JdbcDataSource','V1'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+# Create external JDBC DataSource table
+DROP TABLE IF EXISTS alltypes_jdbc_datasource;
+CREATE TABLE alltypes_jdbc_datasource (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE TestJdbcDataSource(
+'{"database.type":"IMPALA",
+"jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional",
+"jdbc.auth":"AuthMech=0",
+"jdbc.driver":"com.cloudera.impala.jdbc.Driver",
+"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar",
+"dbcp.username":"impala",
+"dbcp.password":"cloudera",
+"table":"alltypes"}');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Create external JDBC DataSource table
+DROP TABLE IF EXISTS alltypes_jdbc_datasource_2;
+CREATE TABLE alltypes_jdbc_datasource_2 (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE TestJdbcDataSource(
+'{"database.type":"IMPALA",
+"jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional",
+"jdbc.auth":"AuthMech=0",
+"jdbc.driver":"com.cloudera.impala.jdbc.Driver",
+"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar",
+"dbcp.username":"impala",
+"dbcp.password":"cloudera",
+"table":"alltypes"}');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Test the jdbc DataSource
+# count(*) with a predicate evaluated by Impala
+# Binary predicates are pushed to the external jdbc DataSource.
+select count(*) from alltypes_jdbc_datasource
+where float_col = 0 and string_col is not NULL
+---- RESULTS
+730
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 730 .*
+aggregation(SUM, RowsRead): 730
+====
+---- QUERY
+# count(*) with no predicates has no materialized slots
+select count(*) from alltypes_jdbc_datasource
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 7.30K .*
+aggregation(SUM, RowsRead): 7300
+====
+---- QUERY
+# Gets all types including a row with a NULL value. The binary predicates are pushed to
+# the DataSource, "order by" and "limit" are evaluated locally.
+select *
+from alltypes_jdbc_datasource
+where id > 10 and int_col< 5 order by id limit 5 offset 0
+---- RESULTS
+11,false,1,1,1,10,1.100000023841858,10.1,'01/02/09','1',2009-01-02 00:11:00.450000000
+12,true,2,2,2,20,2.200000047683716,20.2,'01/02/09','2',2009-01-02 00:12:00.460000000
+13,false,3,3,3,30,3.299999952316284,30.3,'01/02/09','3',2009-01-02 00:13:00.480000000
+14,true,4,4,4,40,4.400000095367432,40.4,'01/02/09','4',2009-01-02 00:14:00.510000000
+20,true,0,0,0,0,0,0,'01/03/09','0',2009-01-03 00:20:00.900000000
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 4 .*
+row_regex: .*RowsRead: 3.64K .*
+aggregation(SUM, RowsRead): 3644
+====
+---- QUERY
+# Gets specified columns.
+# The binary predicates are pushed to the DataSource, "order by" and "limit" are
+# evaluated locally.
+select id, bool_col, smallint_col, float_col, double_col, date_string_col
+from alltypes_jdbc_datasource
+where id > 10 and int_col< 5 order by id limit 5 offset 0
+---- RESULTS
+11,false,1,1.100000023841858,10.1,'01/02/09'
+12,true,2,2.200000047683716,20.2,'01/02/09'
+13,false,3,3.299999952316284,30.3,'01/02/09'
+14,true,4,4.400000095367432,40.4,'01/02/09'
+20,true,0,0,0,'01/03/09'
+---- TYPES
+INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 4 .*
+row_regex: .*RowsRead: 3.64K .*
+aggregation(SUM, RowsRead): 3644
+====
+---- QUERY
+# Gets specified columns from external jdbc table with case sensitive column names
+# and table name.
+# The binary predicates are pushed to the DataSource, "order by" and "limit" are
+# evaluated locally.
+select id, bool_col, smallint_col, float_col, double_col, date_string_col
+from alltypes_jdbc_datasource_2
+where id > 10 and int_col< 5 order by id limit 5 offset 0
+---- RESULTS
+11,false,1,1.100000023841858,10.1,'01/02/09'
+12,true,2,2.200000047683716,20.2,'01/02/09'
+13,false,3,3.299999952316284,30.3,'01/02/09'
+14,true,4,4.400000095367432,40.4,'01/02/09'
+20,true,0,0,0,'01/03/09'
+---- TYPES
+INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 4 .*
+row_regex: .*RowsRead: 3.64K .*
+aggregation(SUM, RowsRead): 3644
+====
+---- QUERY
+# Inner join with a non jdbc table
+# The binary predicates are pushed to the DataSource, but no predicate defined for
+# local table.
+select a.id, b.int_col
+from alltypes_jdbc_datasource a inner join functional.alltypes b on (a.id = b.id)
+where a.id = 1
+---- RESULTS
+1,1
+---- TYPES
+INT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 1 .*
+aggregation(SUM, RowsRead): 7301
+====
+---- QUERY
+# Inner join with another jdbc table
+# The binary predicates are pushed to the two DataSource Nodes.
+select a.id, b.int_col
+from alltypes_jdbc_datasource a inner join alltypes_jdbc_datasource_2 b on (a.id = b.id)
+where a.id < 3 group by a.id, b.int_col
+---- RESULTS
+0,0
+1,1
+2,2
+---- TYPES
+INT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 3 .*
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Cross join
+# The binary predicates are pushed to the two DataSource Nodes.
+select a.id, b.id
+from alltypes_jdbc_datasource a cross join alltypes_jdbc_datasource b
+where (a.id < 3 and b.id < 3)
+order by a.id, b.id limit 10
+---- RESULTS
+0,0
+0,1
+0,2
+1,0
+1,1
+1,2
+2,0
+2,1
+2,2
+---- TYPES
+INT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 3 .*
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_jdbc_datasource;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_jdbc_datasource_2;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+# Drop DataSource
+DROP DATA SOURCE TestJdbcDataSource;
+---- RESULTS
+'Data source has been dropped.'
+====
diff --git a/tests/custom_cluster/test_ext_data_sources.py b/tests/custom_cluster/test_ext_data_sources.py
index 78f16dd75..f12fd1b56 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -109,3 +109,37 @@ class TestMySqlExtJdbcTables(CustomClusterTestSuite):
   def test_mysql_ext_jdbc_tables(self, vector, unique_database):
     """Run tests for external jdbc tables on MySQL"""
     self.run_test_case('QueryTest/mysql-ext-jdbc-tables', vector, use_db=unique_database)
+
+
+class TestImpalaExtJdbcTables(CustomClusterTestSuite):
+  """Impala query tests for external jdbc tables in Impala cluster."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def _download_impala_jdbc_driver(cls):
+    # Download Impala jdbc driver and copy jdbc driver to HDFS.
+    script = os.path.join(
+        os.environ['IMPALA_HOME'], 'testdata/bin/download-impala-jdbc-driver.sh')
+    run_cmd = [script]
+    try:
+      subprocess.check_call(run_cmd, close_fds=True)
+    except subprocess.CalledProcessError:
+      assert False, "Failed to download Impala JDBC driver"
+
+  @classmethod
+  def setup_class(cls):
+    cls._download_impala_jdbc_driver()
+    super(TestImpalaExtJdbcTables, cls).setup_class()
+
+  @classmethod
+  def teardown_class(cls):
+    super(TestImpalaExtJdbcTables, cls).teardown_class()
+
+  @pytest.mark.execute_serially
+  def test_impala_ext_jdbc_tables(self, vector, unique_database):
+    """Run tests for external jdbc tables in Impala cluster"""
+    self.run_test_case(
+        'QueryTest/impala-ext-jdbc-tables', vector, use_db=unique_database)