You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/12/22 21:56:55 UTC
(impala) 02/02: IMPALA-12502: Support Impala to Impala federation
This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit ec22a1e1cad9f2e0ab75a598eb75727c7a044172
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Fri Nov 24 20:56:04 2023 -0800
IMPALA-12502: Support Impala to Impala federation
This patch adds support to read Impala tables in the Impala cluster
through JDBC external data source. It also adds a new counter
NumExternalDataSourceGetNext in profile for the total number of calls
to ExternalDataSource::GetNext().
Setting query options for Impala will be supported in a following patch.
Testing:
- Added an end-to-end unit test to read Impala tables from Impala
cluster through JDBC external data source.
Manually ran the unit-test with Impala tables in Impala cluster on a
remote host by setting $INTERNAL_LISTEN_HOST in jdbc.url as the ip
address of the remote host on which an Impala cluster is running.
- Added LDAP test for reading table through JDBC external data source
with LDAP authentication.
Manually ran the unit-test with Impala tables in a remote Impala
cluster.
- Passed core tests.
Change-Id: I79ad3273932b658cb85c9c17cc834fa1b5fbd64f
Reviewed-on: http://gerrit.cloudera.org:8080/20731
Reviewed-by: Abhishek Rawat <ar...@cloudera.com>
Tested-by: Wenzhe Zhou <wz...@cloudera.com>
---
be/src/exec/data-source-scan-node.cc | 6 +
be/src/exec/data-source-scan-node.h | 3 +
bin/impala-config.sh | 4 +
.../apache/impala/customcluster/LdapHS2Test.java | 153 ++++++++++++-
.../impala/extdatasource/jdbc/JdbcDataSource.java | 4 +
.../extdatasource/jdbc/conf/DatabaseType.java | 3 +-
.../extdatasource/jdbc/conf/JdbcStorageConfig.java | 2 +
.../jdbc/dao/DatabaseAccessorFactory.java | 4 +
.../jdbc/dao/GenericJdbcDatabaseAccessor.java | 8 +-
.../ImpalaDatabaseAccessor.java} | 39 +++-
testdata/bin/download-impala-jdbc-driver.sh | 72 +++++++
.../queries/QueryTest/impala-ext-jdbc-tables.test | 237 +++++++++++++++++++++
tests/custom_cluster/test_ext_data_sources.py | 34 +++
13 files changed, 557 insertions(+), 12 deletions(-)
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index f37318184..df2eb7f5a 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -49,6 +49,9 @@ DEFINE_int32(data_source_batch_size, 1024, "Batch size for calls to GetNext() on
namespace impala {
+PROFILE_DEFINE_COUNTER(NumExternalDataSourceGetNext, DEBUG, TUnit::UNIT,
+ "The total number of calls to ExternalDataSource::GetNext()");
+
// $0 = num expected cols, $1 = actual num columns
const string ERROR_NUM_COLUMNS = "Data source returned unexpected number of columns. "
"Expected $0 but received $1. This likely indicates a problem with the data source "
@@ -93,6 +96,8 @@ Status DataSourceScanNode::Prepare(RuntimeState* state) {
data_src_node_.init_string));
cols_next_val_idx_.resize(tuple_desc_->slots().size(), 0);
+ num_ext_data_source_get_next_ =
+ PROFILE_NumExternalDataSourceGetNext.Instantiate(runtime_profile_);
return Status::OK();
}
@@ -157,6 +162,7 @@ Status DataSourceScanNode::GetNextInputBatch() {
Ubsan::MemSet(cols_next_val_idx_.data(), 0, sizeof(int) * cols_next_val_idx_.size());
TGetNextParams params;
params.__set_scan_handle(scan_handle_);
+ COUNTER_ADD(num_ext_data_source_get_next_, 1);
RETURN_IF_ERROR(data_source_executor_->GetNext(params, input_batch_.get()));
RETURN_IF_ERROR(Status(input_batch_->status));
RETURN_IF_ERROR(ValidateRowBatchSize());
diff --git a/be/src/exec/data-source-scan-node.h b/be/src/exec/data-source-scan-node.h
index 7b1e33714..452bac153 100644
--- a/be/src/exec/data-source-scan-node.h
+++ b/be/src/exec/data-source-scan-node.h
@@ -98,6 +98,9 @@ class DataSourceScanNode : public ScanNode {
/// the next row batch.
std::vector<int> cols_next_val_idx_;
+ /// The total number of calls to ExternalDataSource::GetNext().
+ RuntimeProfile::Counter* num_ext_data_source_get_next_;
+
/// Materializes the next row (next_row_idx_) into tuple. 'local_tz' is used as the
/// local time-zone for materializing 'TYPE_TIMESTAMP' slots.
Status MaterializeNextRow(const Timezone* local_tz, MemPool* mem_pool, Tuple* tuple);
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 7bc23770b..b6d3547be 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -207,6 +207,10 @@ if [[ $ARCH_NAME == 'aarch64' ]]; then
export IMPALA_HADOOP_CLIENT_VERSION=3.3.6
unset IMPALA_HADOOP_CLIENT_URL
fi
+
+# Impala JDBC driver for testing.
+export IMPALA_SIMBA_JDBC_DRIVER_VERSION=42-2.6.32.1041
+
# Thrift related environment variables.
# IMPALA_THRIFT_POM_VERSION is used to populate IMPALA_THRIFT_JAVA_VERSION and
# thrift.version in java/pom.xml.
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
index 325f1a148..5b9540e77 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
@@ -40,6 +40,8 @@ import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.junit.ClassRule;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@CreateDS(name = "myDS",
partitions = { @CreatePartition(name = "test", suffix = "dc=myorg,dc=com") })
@@ -51,6 +53,8 @@ import org.junit.Test;
* ldap authentication is being used.
*/
public class LdapHS2Test {
+ private static final Logger LOG = LoggerFactory.getLogger(LdapHS2Test.class);
+
@ClassRule
public static CreateLdapServerRule serverRule = new CreateLdapServerRule();
@@ -92,7 +96,9 @@ public class LdapHS2Test {
verifySuccess(fetchResp.getStatus());
List<TColumn> columns = fetchResp.getResults().getColumns();
assertEquals(1, columns.size());
- assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
+ if (expectedResult != null) {
+ assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
+ }
return execResp.getOperationHandle();
}
@@ -704,4 +710,149 @@ public class LdapHS2Test {
assertEquals(e.getMessage(), "HTTP Response code: 401");
}
}
+
+ /**
+ * Tests LDAP for reading Impala table through JDBC external data source.
+ */
+ @Test
+ public void testImpalaExtJdbcTables() throws Exception {
+ setUp("");
+ verifyMetrics(0, 0);
+ THttpClient transport = new THttpClient("http://localhost:28000");
+ Map<String, String> headers = new HashMap<String, String>();
+ // Authenticate as 'Test1Ldap' with password '12345'
+ headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+ transport.setCustomHeaders(headers);
+ transport.open();
+ TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+ // Open a session which will get username 'Test1Ldap'.
+ TOpenSessionReq openReq = new TOpenSessionReq();
+ TOpenSessionResp openResp = client.OpenSession(openReq);
+ TSessionHandle session = openResp.getSessionHandle();
+ // One successful authentication.
+ verifyMetrics(1, 0);
+
+ // Download Impala JDBC driver.
+ String downloadImpalaJdbcDriver = new File(System.getenv("IMPALA_HOME"),
+ "testdata/bin/download-impala-jdbc-driver.sh").getPath();
+ String[] cmd = { downloadImpalaJdbcDriver };
+ RunShellCommand.Run(cmd, /*shouldSucceed*/ true, "", "");
+
+ // Define queries.
+ String fileSystemPrefix = System.getenv("FILESYSTEM_PREFIX");
+ String internalListenHost = System.getenv("INTERNAL_LISTEN_HOST");
+
+ String dropDSQuery = "DROP DATA SOURCE IF EXISTS impala_jdbc_test_ds";
+ String createDSQuery = String.format("CREATE DATA SOURCE impala_jdbc_test_ds " +
+ "LOCATION '%s/test-warehouse/data-sources/jdbc-data-source.jar' " +
+ "CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' " +
+ "API_VERSION 'V1'", fileSystemPrefix);
+ String dropTableQuery = "DROP TABLE IF EXISTS %s";
+ // Set JDBC authentication mechanisms as LDAP (3) with username/password as
+ // TEST_USER_1/TEST_PASSWORD_1.
+ String createTableQuery = String.format("CREATE TABLE impala_jdbc_ext_test_table (" +
+ "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
+ "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
+ "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
+ "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
+ "'{\"database.type\":\"IMPALA\", " +
+ "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
+ "\"jdbc.auth\":\"AuthMech=3\", " +
+ "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
+ "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
+ "ImpalaJDBC42.jar\", " +
+ "\"dbcp.username\":\"%s\", " +
+ "\"dbcp.password\":\"%s\", " +
+ "\"table\":\"alltypes\"}')",
+ internalListenHost, fileSystemPrefix, TEST_USER_1, TEST_PASSWORD_1);
+ // Set JDBC authentication mechanisms as LDAP with wrong password.
+ String createTableWithWrongPassword =
+ String.format("CREATE TABLE impala_jdbc_tbl_wrong_password (" +
+ "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
+ "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
+ "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
+ "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
+ "'{\"database.type\":\"IMPALA\", " +
+ "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
+ "\"jdbc.auth\":\"AuthMech=3\", " +
+ "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
+ "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
+ "ImpalaJDBC42.jar\", " +
+ "\"dbcp.username\":\"%s\", " +
+ "\"dbcp.password\":\"wrong-password\", " +
+ "\"table\":\"alltypes\"}')",
+ internalListenHost, fileSystemPrefix, TEST_USER_1);
+ // Set JDBC authentication mechanisms as LDAP without AuthMech.
+ String createTableWithoutAuthMech =
+ String.format("CREATE TABLE impala_jdbc_tbl_without_auth_mech (" +
+ "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
+ "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
+ "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
+ "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
+ "'{\"database.type\":\"IMPALA\", " +
+ "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
+ "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
+ "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
+ "ImpalaJDBC42.jar\", " +
+ "\"dbcp.username\":\"%s\", " +
+ "\"dbcp.password\":\"%s\", " +
+ "\"table\":\"alltypes\"}')",
+ internalListenHost, fileSystemPrefix, TEST_USER_1, TEST_PASSWORD_1);
+ String selectQuery = "select string_col from %s where id=9";
+
+ // Run queries.
+ //
+ // Create data source and tables.
+ execAndFetch(client, session, dropDSQuery, null);
+ execAndFetch(client, session, createDSQuery, "Data source has been created.");
+ execAndFetch(client, session,
+ String.format(dropTableQuery, "impala_jdbc_ext_test_table"), null);
+ execAndFetch(client, session, createTableQuery, "Table has been created.");
+ execAndFetch(client, session,
+ String.format(dropTableQuery, "impala_jdbc_tbl_wrong_password"), null);
+ execAndFetch(client, session, createTableWithWrongPassword,
+ "Table has been created.");
+ execAndFetch(client, session,
+ String.format(dropTableQuery, "impala_jdbc_tbl_without_auth_mech"), null);
+ execAndFetch(client, session, createTableWithoutAuthMech, "Table has been created.");
+
+ // Successfully access JDBC data source table with LDAP.
+ execAndFetch(client, session,
+ String.format(selectQuery, "impala_jdbc_ext_test_table"), "9");
+ // Negative case for JDBC data source table with wrong password.
+ String expectedError = "Error initialized or created transport for authentication";
+ try {
+ execAndFetch(client, session,
+ String.format(selectQuery, "impala_jdbc_tbl_wrong_password"), "9");
+ fail("Expected error: " + expectedError);
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains(expectedError));
+ }
+ // Negative case for JDBC data source table without AuthMech.
+ expectedError = "Communication link failure. Failed to connect to server";
+ try {
+ execAndFetch(client, session,
+ String.format(selectQuery, "impala_jdbc_tbl_without_auth_mech"), "9");
+ fail("Expected error: " + expectedError);
+ } catch (Exception e) {
+ assertTrue(String.format("Authentication failed with error: %s", e.getMessage()),
+ e.getMessage().contains(expectedError));
+ }
+
+ // Drop data source and tables.
+ execAndFetch(client, session, dropDSQuery, "Data source has been dropped.");
+ execAndFetch(client, session,
+ String.format(dropTableQuery, "impala_jdbc_ext_test_table"),
+ "Table has been dropped.");
+ execAndFetch(client, session,
+ String.format(dropTableQuery, "impala_jdbc_tbl_wrong_password"),
+ "Table has been dropped.");
+ execAndFetch(client, session,
+ String.format(dropTableQuery, "impala_jdbc_tbl_without_auth_mech"),
+ "Table has been dropped.");
+
+ // Two successful authentications for each ExecAndFetch().
+ verifyMetrics(31, 0);
+ }
}
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
index 173753d64..10df71dd5 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
@@ -229,6 +229,9 @@ public class JdbcDataSource implements ExternalDataSource {
initString = initString.substring(CACHE_CLASS_PREFIX.length());
cacheClass_ = true;
}
+ // Replace '\n' with single space character so that one property setting in
+ // initString can be broken into multiple lines for better readability.
+ initString = initString.replace('\n', ' ');
Map<String, String> config = new ObjectMapper().readValue(initString, typeRef);
tableConfig_ = JdbcStorageConfigManager.convertMapToConfiguration(config);
} catch (JsonProcessingException e) {
@@ -294,6 +297,7 @@ public class JdbcDataSource implements ExternalDataSource {
}
// Execute query and get iterator
tableConfig_.set(JdbcStorageConfig.QUERY.getPropertyName(), sb.toString());
+ LOG.trace("JDBC Query: " + sb.toString());
if (schema_.getColsSize() != 0) {
int limit = -1;
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
index 9b30350bc..a01fb4108 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
@@ -24,5 +24,6 @@ public enum DatabaseType {
ORACLE,
POSTGRES,
MSSQL,
- JETHRO_DATA
+ JETHRO_DATA,
+ IMPALA
}
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
index 0e1ac5ab3..e48fb07ef 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
@@ -26,6 +26,8 @@ public enum JdbcStorageConfig {
// JDBC connection string, including the database type, IP address, port number, and
// database name. For example, "jdbc:postgresql://127.0.0.1:5432/functional
JDBC_URL("jdbc.url", true),
+ // Authentication mechanisms of JDBC driver.
+ JDBC_AUTH("jdbc.auth", false),
// Class name of JDBC driver. For example, "org.postgresql.Driver"
JDBC_DRIVER_CLASS("jdbc.driver", true),
// Driver URL for downloading the Jar file package that is used to access the external
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
index 5415b5763..decde6797 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
@@ -57,6 +57,10 @@ public class DatabaseAccessorFactory {
accessor = new DB2DatabaseAccessor();
break;
+ case IMPALA:
+ accessor = new ImpalaDatabaseAccessor();
+ break;
+
default:
accessor = new GenericJdbcDatabaseAccessor();
break;
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
index b2e820c30..4ab214d2c 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -48,6 +48,7 @@ import org.apache.impala.thrift.TStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
@@ -283,7 +284,12 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
}
// essential properties
- dbProperties.put("url", conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName()));
+ String jdbcUrl = conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName());
+ String jdbcAuth = conf.get(JdbcStorageConfig.JDBC_AUTH.getPropertyName());
+ if (!Strings.isNullOrEmpty(jdbcAuth)) {
+ jdbcUrl += ";" + jdbcAuth;
+ }
+ dbProperties.put("url", jdbcUrl);
dbProperties.put("driverClassName",
conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName()));
dbProperties.put("driverUrl",
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
similarity index 51%
copy from java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
copy to java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
index 9b30350bc..361eba845 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
@@ -15,14 +15,35 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.impala.extdatasource.jdbc.conf;
+package org.apache.impala.extdatasource.jdbc.dao;
+
+/**
+ * Impala specific data accessor. This is needed because Impala JDBC drivers do not
+ * support generic LIMIT and OFFSET escape functions
+ */
+public class ImpalaDatabaseAccessor extends GenericJdbcDatabaseAccessor {
+
+ @Override
+ protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
+ if (offset == 0) {
+ return addLimitToQuery(sql, limit);
+ } else {
+ if (limit != -1) {
+ return sql + " LIMIT " + limit + " OFFSET " + offset;
+ } else {
+ return sql;
+ }
+ }
+ }
+
+
+ @Override
+ protected String addLimitToQuery(String sql, int limit) {
+ if (limit != -1) {
+ return sql + " LIMIT " + limit;
+ } else {
+ return sql;
+ }
+ }
-public enum DatabaseType {
- MYSQL,
- H2,
- DB2,
- ORACLE,
- POSTGRES,
- MSSQL,
- JETHRO_DATA
}
diff --git a/testdata/bin/download-impala-jdbc-driver.sh b/testdata/bin/download-impala-jdbc-driver.sh
new file mode 100755
index 000000000..05d445295
--- /dev/null
+++ b/testdata/bin/download-impala-jdbc-driver.sh
@@ -0,0 +1,72 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# This script download the Impala jdbc driver and copy it to Hadoop FS.
+
+set -euo pipefail
+. $IMPALA_HOME/bin/report_build_error.sh
+setup_report_build_error
+
+. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
+
+EXT_DATA_SOURCES_HDFS_PATH=${FILESYSTEM_PREFIX}/test-warehouse/data-sources
+JDBC_DRIVERS_HDFS_PATH=${EXT_DATA_SOURCES_HDFS_PATH}/jdbc-drivers
+SIMBA_DRIVER_ZIP_FILENAME=ClouderaImpala_JDBC${IMPALA_SIMBA_JDBC_DRIVER_VERSION}
+INNER_SIMBA_DRIVER_ZIP_FILENAME=ClouderaImpalaJDBC${IMPALA_SIMBA_JDBC_DRIVER_VERSION}
+DRIVER_JAR_VERSION=${IMPALA_SIMBA_JDBC_DRIVER_VERSION%-*}
+SIMBA_DRIVER_JAR_FILENAME=ImpalaJDBC${DRIVER_JAR_VERSION}.jar
+
+found=$(hadoop fs -find ${JDBC_DRIVERS_HDFS_PATH} -name ${SIMBA_DRIVER_JAR_FILENAME})
+if [ ! -z "$found" ]; then
+ echo "JDBC driver jar file already exists"
+ exit 0
+fi
+
+hadoop fs -mkdir -p ${JDBC_DRIVERS_HDFS_PATH}
+pushd /tmp
+
+mkdir -p impala_jdbc_driver
+cd impala_jdbc_driver
+
+# Download Impala jdbc driver.
+wget "https://downloads.cloudera.com/connectors/${SIMBA_DRIVER_ZIP_FILENAME}.zip"
+
+# Use Python modules to unzip zip file since 'unzip' command is not available in some
+# testing environments.
+cat > unzip.py <<__EOT__
+import sys
+from zipfile import PyZipFile
+pzf = PyZipFile(sys.argv[1])
+pzf.extractall()
+__EOT__
+
+# Extract driver jar file from zip file.
+python ./unzip.py ${SIMBA_DRIVER_ZIP_FILENAME}.zip
+python ./unzip.py ${SIMBA_DRIVER_ZIP_FILENAME}/${INNER_SIMBA_DRIVER_ZIP_FILENAME}.zip
+
+# Copy driver jar file to Hadoop FS.
+hadoop fs -put -f /tmp/impala_jdbc_driver/${SIMBA_DRIVER_JAR_FILENAME} \
+ ${JDBC_DRIVERS_HDFS_PATH}/${SIMBA_DRIVER_JAR_FILENAME}
+
+echo "Copied ${SIMBA_DRIVER_JAR_FILENAME} into HDFS ${JDBC_DRIVERS_HDFS_PATH}"
+
+cd ..
+rm -rf impala_jdbc_driver
+popd
+
diff --git a/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test b/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
new file mode 100644
index 000000000..8001295d4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
@@ -0,0 +1,237 @@
+====
+---- QUERY
+# Create DataSource
+DROP DATA SOURCE IF EXISTS TestJdbcDataSource;
+CREATE DATA SOURCE TestJdbcDataSource
+LOCATION '$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-data-source.jar'
+CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource'
+API_VERSION 'V1';
+---- RESULTS
+'Data source has been created.'
+====
+---- QUERY
+# Show created DataSource
+SHOW DATA SOURCES LIKE 'testjdbcdatasource';
+---- LABELS
+NAME,LOCATION,CLASS NAME,API VERSION
+---- RESULTS
+'testjdbcdatasource',regex:'.*/test-warehouse/data-sources/jdbc-data-source.jar','org.apache.impala.extdatasource.jdbc.JdbcDataSource','V1'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+# Create external JDBC DataSource table
+DROP TABLE IF EXISTS alltypes_jdbc_datasource;
+CREATE TABLE alltypes_jdbc_datasource (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE TestJdbcDataSource(
+'{"database.type":"IMPALA",
+"jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional",
+"jdbc.auth":"AuthMech=0",
+"jdbc.driver":"com.cloudera.impala.jdbc.Driver",
+"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar",
+"dbcp.username":"impala",
+"dbcp.password":"cloudera",
+"table":"alltypes"}');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Create external JDBC DataSource table
+DROP TABLE IF EXISTS alltypes_jdbc_datasource_2;
+CREATE TABLE alltypes_jdbc_datasource_2 (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE TestJdbcDataSource(
+'{"database.type":"IMPALA",
+"jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional",
+"jdbc.auth":"AuthMech=0",
+"jdbc.driver":"com.cloudera.impala.jdbc.Driver",
+"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar",
+"dbcp.username":"impala",
+"dbcp.password":"cloudera",
+"table":"alltypes"}');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Test the jdbc DataSource
+# count(*) with a predicate evaluated by Impala
+# Binary predicates are pushed to the external jdbc DataSource.
+select count(*) from alltypes_jdbc_datasource
+where float_col = 0 and string_col is not NULL
+---- RESULTS
+730
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 730 .*
+aggregation(SUM, RowsRead): 730
+====
+---- QUERY
+# count(*) with no predicates has no materialized slots
+select count(*) from alltypes_jdbc_datasource
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 7.30K .*
+aggregation(SUM, RowsRead): 7300
+====
+---- QUERY
+# Gets all types including a row with a NULL value. The binary predicates are pushed to
+# the DataSource, "order by" and "limit" are evaluated locally.
+select *
+from alltypes_jdbc_datasource
+where id > 10 and int_col< 5 order by id limit 5 offset 0
+---- RESULTS
+11,false,1,1,1,10,1.100000023841858,10.1,'01/02/09','1',2009-01-02 00:11:00.450000000
+12,true,2,2,2,20,2.200000047683716,20.2,'01/02/09','2',2009-01-02 00:12:00.460000000
+13,false,3,3,3,30,3.299999952316284,30.3,'01/02/09','3',2009-01-02 00:13:00.480000000
+14,true,4,4,4,40,4.400000095367432,40.4,'01/02/09','4',2009-01-02 00:14:00.510000000
+20,true,0,0,0,0,0,0,'01/03/09','0',2009-01-03 00:20:00.900000000
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 4 .*
+row_regex: .*RowsRead: 3.64K .*
+aggregation(SUM, RowsRead): 3644
+====
+---- QUERY
+# Gets specified columns.
+# The binary predicates are pushed to the DataSource, "order by" and "limit" are
+# evaluated locally.
+select id, bool_col, smallint_col, float_col, double_col, date_string_col
+from alltypes_jdbc_datasource
+where id > 10 and int_col< 5 order by id limit 5 offset 0
+---- RESULTS
+11,false,1,1.100000023841858,10.1,'01/02/09'
+12,true,2,2.200000047683716,20.2,'01/02/09'
+13,false,3,3.299999952316284,30.3,'01/02/09'
+14,true,4,4.400000095367432,40.4,'01/02/09'
+20,true,0,0,0,'01/03/09'
+---- TYPES
+INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 4 .*
+row_regex: .*RowsRead: 3.64K .*
+aggregation(SUM, RowsRead): 3644
+====
+---- QUERY
+# Gets specified columns from external jdbc table with case sensitive column names
+# and table name.
+# The binary predicates are pushed to the DataSource, "order by" and "limit" are
+# evaluated locally.
+select id, bool_col, smallint_col, float_col, double_col, date_string_col
+from alltypes_jdbc_datasource_2
+where id > 10 and int_col< 5 order by id limit 5 offset 0
+---- RESULTS
+11,false,1,1.100000023841858,10.1,'01/02/09'
+12,true,2,2.200000047683716,20.2,'01/02/09'
+13,false,3,3.299999952316284,30.3,'01/02/09'
+14,true,4,4.400000095367432,40.4,'01/02/09'
+20,true,0,0,0,'01/03/09'
+---- TYPES
+INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 4 .*
+row_regex: .*RowsRead: 3.64K .*
+aggregation(SUM, RowsRead): 3644
+====
+---- QUERY
+# Inner join with a non jdbc table
+# The binary predicates are pushed to the DataSource, but no predicate defined for
+# local table.
+select a.id, b.int_col
+from alltypes_jdbc_datasource a inner join functional.alltypes b on (a.id = b.id)
+where a.id = 1
+---- RESULTS
+1,1
+---- TYPES
+INT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 1 .*
+aggregation(SUM, RowsRead): 7301
+====
+---- QUERY
+# Inner join with another jdbc table
+# The binary predicates are pushed to the two DataSource Nodes.
+select a.id, b.int_col
+from alltypes_jdbc_datasource a inner join alltypes_jdbc_datasource_2 b on (a.id = b.id)
+where a.id < 3 group by a.id, b.int_col
+---- RESULTS
+0,0
+1,1
+2,2
+---- TYPES
+INT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 3 .*
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Cross join
+# The binary predicates are pushed to the two DataSource Nodes.
+select a.id, b.id
+from alltypes_jdbc_datasource a cross join alltypes_jdbc_datasource b
+where (a.id < 3 and b.id < 3)
+order by a.id, b.id limit 10
+---- RESULTS
+0,0
+0,1
+0,2
+1,0
+1,1
+1,2
+2,0
+2,1
+2,2
+---- TYPES
+INT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 3 .*
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_jdbc_datasource;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_jdbc_datasource_2;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+# Drop DataSource
+DROP DATA SOURCE TestJdbcDataSource;
+---- RESULTS
+'Data source has been dropped.'
+====
diff --git a/tests/custom_cluster/test_ext_data_sources.py b/tests/custom_cluster/test_ext_data_sources.py
index 78f16dd75..f12fd1b56 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -109,3 +109,37 @@ class TestMySqlExtJdbcTables(CustomClusterTestSuite):
def test_mysql_ext_jdbc_tables(self, vector, unique_database):
"""Run tests for external jdbc tables on MySQL"""
self.run_test_case('QueryTest/mysql-ext-jdbc-tables', vector, use_db=unique_database)
+
+
+class TestImpalaExtJdbcTables(CustomClusterTestSuite):
+ """Impala query tests for external jdbc tables in Impala cluster."""
+
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def _download_impala_jdbc_driver(cls):
+ # Download Impala jdbc driver and copy jdbc driver to HDFS.
+ script = os.path.join(
+ os.environ['IMPALA_HOME'], 'testdata/bin/download-impala-jdbc-driver.sh')
+ run_cmd = [script]
+ try:
+ subprocess.check_call(run_cmd, close_fds=True)
+ except subprocess.CalledProcessError:
+ assert False, "Failed to download Impala JDBC driver"
+
+ @classmethod
+ def setup_class(cls):
+ cls._download_impala_jdbc_driver()
+ super(TestImpalaExtJdbcTables, cls).setup_class()
+
+ @classmethod
+ def teardown_class(cls):
+ super(TestImpalaExtJdbcTables, cls).teardown_class()
+
+ @pytest.mark.execute_serially
+ def test_impala_ext_jdbc_tables(self, vector, unique_database):
+ """Run tests for external jdbc tables in Impala cluster"""
+ self.run_test_case(
+ 'QueryTest/impala-ext-jdbc-tables', vector, use_db=unique_database)