You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/12/22 21:56:53 UTC

(impala) branch master updated (6c6142ba2 -> ec22a1e1c)

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 6c6142ba2 IMPALA-12633: Remove DCHECK for slow SetQueryInflight
     new 9fd1c8184 IMPALA-12661: Fix ASAN heap-use-after-free in IcebergMetadataScanNode
     new ec22a1e1c IMPALA-12502: Support Impala to Impala federation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/data-source-scan-node.cc               |   6 +
 be/src/exec/data-source-scan-node.h                |   3 +
 .../iceberg-metadata/iceberg-metadata-scan-node.cc |   8 +-
 bin/impala-config.sh                               |   4 +
 .../apache/impala/customcluster/LdapHS2Test.java   | 153 ++++++++++++++++++++-
 .../impala/extdatasource/jdbc/JdbcDataSource.java  |   4 +
 .../extdatasource/jdbc/conf/DatabaseType.java      |   3 +-
 .../extdatasource/jdbc/conf/JdbcStorageConfig.java |   2 +
 .../jdbc/dao/DatabaseAccessorFactory.java          |   4 +
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java      |   8 +-
 ...seAccessor.java => ImpalaDatabaseAccessor.java} |   8 +-
 testdata/bin/download-impala-jdbc-driver.sh        |  72 ++++++++++
 ...ata-source.test => impala-ext-jdbc-tables.test} |  80 ++++++++---
 tests/custom_cluster/test_ext_data_sources.py      |  34 +++++
 14 files changed, 359 insertions(+), 30 deletions(-)
 copy java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/{MySqlDatabaseAccessor.java => ImpalaDatabaseAccessor.java} (82%)
 create mode 100755 testdata/bin/download-impala-jdbc-driver.sh
 copy testdata/workloads/functional-query/queries/QueryTest/{jdbc-data-source.test => impala-ext-jdbc-tables.test} (65%)


(impala) 02/02: IMPALA-12502: Support Impala to Impala federation

Posted by wz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ec22a1e1cad9f2e0ab75a598eb75727c7a044172
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Fri Nov 24 20:56:04 2023 -0800

    IMPALA-12502: Support Impala to Impala federation
    
    This patch adds support to read Impala tables in the Impala cluster
    through JDBC external data source. It also adds a new counter
    NumExternalDataSourceGetNext in profile for the total number of calls
    to ExternalDataSource::GetNext().
    Setting query options for Impala will be supported in a following patch.
    
    Testing:
     - Added an end-to-end unit test to read Impala tables from Impala
       cluster through JDBC external data source.
       Manually ran the unit-test with Impala tables in Impala cluster on a
       remote host by setting $INTERNAL_LISTEN_HOST in jdbc.url as the ip
       address of the remote host on which an Impala cluster is running.
     - Added LDAP test for reading table through JDBC external data source
       with LDAP authentication.
       Manually ran the unit-test with Impala tables in a remote Impala
       cluster.
     - Passed core tests.
    
    Change-Id: I79ad3273932b658cb85c9c17cc834fa1b5fbd64f
    Reviewed-on: http://gerrit.cloudera.org:8080/20731
    Reviewed-by: Abhishek Rawat <ar...@cloudera.com>
    Tested-by: Wenzhe Zhou <wz...@cloudera.com>
---
 be/src/exec/data-source-scan-node.cc               |   6 +
 be/src/exec/data-source-scan-node.h                |   3 +
 bin/impala-config.sh                               |   4 +
 .../apache/impala/customcluster/LdapHS2Test.java   | 153 ++++++++++++-
 .../impala/extdatasource/jdbc/JdbcDataSource.java  |   4 +
 .../extdatasource/jdbc/conf/DatabaseType.java      |   3 +-
 .../extdatasource/jdbc/conf/JdbcStorageConfig.java |   2 +
 .../jdbc/dao/DatabaseAccessorFactory.java          |   4 +
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java      |   8 +-
 .../ImpalaDatabaseAccessor.java}                   |  39 +++-
 testdata/bin/download-impala-jdbc-driver.sh        |  72 +++++++
 .../queries/QueryTest/impala-ext-jdbc-tables.test  | 237 +++++++++++++++++++++
 tests/custom_cluster/test_ext_data_sources.py      |  34 +++
 13 files changed, 557 insertions(+), 12 deletions(-)

diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index f37318184..df2eb7f5a 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -49,6 +49,9 @@ DEFINE_int32(data_source_batch_size, 1024, "Batch size for calls to GetNext() on
 
 namespace impala {
 
+PROFILE_DEFINE_COUNTER(NumExternalDataSourceGetNext, DEBUG, TUnit::UNIT,
+    "The total number of calls to ExternalDataSource::GetNext()");
+
 // $0 = num expected cols, $1 = actual num columns
 const string ERROR_NUM_COLUMNS = "Data source returned unexpected number of columns. "
     "Expected $0 but received $1. This likely indicates a problem with the data source "
@@ -93,6 +96,8 @@ Status DataSourceScanNode::Prepare(RuntimeState* state) {
       data_src_node_.init_string));
 
   cols_next_val_idx_.resize(tuple_desc_->slots().size(), 0);
+  num_ext_data_source_get_next_ =
+      PROFILE_NumExternalDataSourceGetNext.Instantiate(runtime_profile_);
   return Status::OK();
 }
 
@@ -157,6 +162,7 @@ Status DataSourceScanNode::GetNextInputBatch() {
   Ubsan::MemSet(cols_next_val_idx_.data(), 0, sizeof(int) * cols_next_val_idx_.size());
   TGetNextParams params;
   params.__set_scan_handle(scan_handle_);
+  COUNTER_ADD(num_ext_data_source_get_next_, 1);
   RETURN_IF_ERROR(data_source_executor_->GetNext(params, input_batch_.get()));
   RETURN_IF_ERROR(Status(input_batch_->status));
   RETURN_IF_ERROR(ValidateRowBatchSize());
diff --git a/be/src/exec/data-source-scan-node.h b/be/src/exec/data-source-scan-node.h
index 7b1e33714..452bac153 100644
--- a/be/src/exec/data-source-scan-node.h
+++ b/be/src/exec/data-source-scan-node.h
@@ -98,6 +98,9 @@ class DataSourceScanNode : public ScanNode {
   /// the next row batch.
   std::vector<int> cols_next_val_idx_;
 
+  /// The total number of calls to ExternalDataSource::GetNext().
+  RuntimeProfile::Counter* num_ext_data_source_get_next_;
+
   /// Materializes the next row (next_row_idx_) into tuple. 'local_tz' is used as the
   /// local time-zone for materializing 'TYPE_TIMESTAMP' slots.
   Status MaterializeNextRow(const Timezone* local_tz, MemPool* mem_pool, Tuple* tuple);
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 7bc23770b..b6d3547be 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -207,6 +207,10 @@ if [[ $ARCH_NAME == 'aarch64' ]]; then
   export IMPALA_HADOOP_CLIENT_VERSION=3.3.6
   unset IMPALA_HADOOP_CLIENT_URL
 fi
+
+# Impala JDBC driver for testing.
+export IMPALA_SIMBA_JDBC_DRIVER_VERSION=42-2.6.32.1041
+
 # Thrift related environment variables.
 # IMPALA_THRIFT_POM_VERSION is used to populate IMPALA_THRIFT_JAVA_VERSION and
 # thrift.version in java/pom.xml.
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
index 325f1a148..5b9540e77 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
@@ -40,6 +40,8 @@ import org.apache.thrift.transport.THttpClient;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @CreateDS(name = "myDS",
     partitions = { @CreatePartition(name = "test", suffix = "dc=myorg,dc=com") })
@@ -51,6 +53,8 @@ import org.junit.Test;
  * ldap authentication is being used.
  */
 public class LdapHS2Test {
+  private static final Logger LOG = LoggerFactory.getLogger(LdapHS2Test.class);
+
   @ClassRule
   public static CreateLdapServerRule serverRule = new CreateLdapServerRule();
 
@@ -92,7 +96,9 @@ public class LdapHS2Test {
     verifySuccess(fetchResp.getStatus());
     List<TColumn> columns = fetchResp.getResults().getColumns();
     assertEquals(1, columns.size());
-    assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
+    if (expectedResult != null) {
+      assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
+    }
 
     return execResp.getOperationHandle();
   }
@@ -704,4 +710,149 @@ public class LdapHS2Test {
       assertEquals(e.getMessage(), "HTTP Response code: 401");
     }
   }
+
+  /**
+   * Tests LDAP for reading Impala table through JDBC external data source.
+   */
+  @Test
+  public void testImpalaExtJdbcTables() throws Exception {
+    setUp("");
+    verifyMetrics(0, 0);
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+    // Authenticate as 'Test1Ldap' with password '12345'
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'Test1Ldap'.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    TSessionHandle session = openResp.getSessionHandle();
+    // One successful authentication.
+    verifyMetrics(1, 0);
+
+    // Download Impala JDBC driver.
+    String downloadImpalaJdbcDriver = new File(System.getenv("IMPALA_HOME"),
+        "testdata/bin/download-impala-jdbc-driver.sh").getPath();
+    String[] cmd = { downloadImpalaJdbcDriver };
+    RunShellCommand.Run(cmd, /*shouldSucceed*/ true, "", "");
+
+    // Define queries.
+    String fileSystemPrefix = System.getenv("FILESYSTEM_PREFIX");
+    String internalListenHost = System.getenv("INTERNAL_LISTEN_HOST");
+
+    String dropDSQuery = "DROP DATA SOURCE IF EXISTS impala_jdbc_test_ds";
+    String createDSQuery = String.format("CREATE DATA SOURCE impala_jdbc_test_ds " +
+        "LOCATION '%s/test-warehouse/data-sources/jdbc-data-source.jar' " +
+        "CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' " +
+        "API_VERSION 'V1'", fileSystemPrefix);
+    String dropTableQuery = "DROP TABLE IF EXISTS %s";
+    // Set JDBC authentication mechanisms as LDAP (3) with username/password as
+    // TEST_USER_1/TEST_PASSWORD_1.
+    String createTableQuery = String.format("CREATE TABLE impala_jdbc_ext_test_table (" +
+        "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
+        "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
+        "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
+        "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
+        "'{\"database.type\":\"IMPALA\", " +
+          "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
+          "\"jdbc.auth\":\"AuthMech=3\", " +
+          "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
+          "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
+          "ImpalaJDBC42.jar\", " +
+          "\"dbcp.username\":\"%s\", " +
+          "\"dbcp.password\":\"%s\", " +
+          "\"table\":\"alltypes\"}')",
+          internalListenHost, fileSystemPrefix, TEST_USER_1, TEST_PASSWORD_1);
+    // Set JDBC authentication mechanisms as LDAP with wrong password.
+    String createTableWithWrongPassword =
+        String.format("CREATE TABLE impala_jdbc_tbl_wrong_password (" +
+        "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
+        "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
+        "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
+        "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
+        "'{\"database.type\":\"IMPALA\", " +
+          "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
+          "\"jdbc.auth\":\"AuthMech=3\", " +
+          "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
+          "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
+          "ImpalaJDBC42.jar\", " +
+          "\"dbcp.username\":\"%s\", " +
+          "\"dbcp.password\":\"wrong-password\", " +
+          "\"table\":\"alltypes\"}')",
+          internalListenHost, fileSystemPrefix, TEST_USER_1);
+    // Set JDBC authentication mechanisms as LDAP without AuthMech.
+    String createTableWithoutAuthMech =
+        String.format("CREATE TABLE impala_jdbc_tbl_without_auth_mech (" +
+        "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
+        "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
+        "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
+        "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
+        "'{\"database.type\":\"IMPALA\", " +
+          "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
+          "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
+          "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
+          "ImpalaJDBC42.jar\", " +
+          "\"dbcp.username\":\"%s\", " +
+          "\"dbcp.password\":\"%s\", " +
+          "\"table\":\"alltypes\"}')",
+          internalListenHost, fileSystemPrefix, TEST_USER_1, TEST_PASSWORD_1);
+    String selectQuery = "select string_col from %s where id=9";
+
+    // Run queries.
+    //
+    // Create data source and tables.
+    execAndFetch(client, session, dropDSQuery, null);
+    execAndFetch(client, session, createDSQuery, "Data source has been created.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_ext_test_table"), null);
+    execAndFetch(client, session, createTableQuery, "Table has been created.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_tbl_wrong_password"), null);
+    execAndFetch(client, session, createTableWithWrongPassword,
+        "Table has been created.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_tbl_without_auth_mech"), null);
+    execAndFetch(client, session, createTableWithoutAuthMech, "Table has been created.");
+
+    // Successfully access JDBC data source table with LDAP.
+    execAndFetch(client, session,
+        String.format(selectQuery, "impala_jdbc_ext_test_table"), "9");
+    // Negative case for JDBC data source table with wrong password.
+    String expectedError = "Error initialized or created transport for authentication";
+    try {
+      execAndFetch(client, session,
+          String.format(selectQuery, "impala_jdbc_tbl_wrong_password"), "9");
+      fail("Expected error: " + expectedError);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(expectedError));
+    }
+    // Negative case for JDBC data source table without AuthMech.
+    expectedError = "Communication link failure. Failed to connect to server";
+    try {
+      execAndFetch(client, session,
+          String.format(selectQuery, "impala_jdbc_tbl_without_auth_mech"), "9");
+      fail("Expected error: " + expectedError);
+    } catch (Exception e) {
+      assertTrue(String.format("Authentication failed with error: %s", e.getMessage()),
+          e.getMessage().contains(expectedError));
+    }
+
+    // Drop data source and tables.
+    execAndFetch(client, session, dropDSQuery, "Data source has been dropped.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_ext_test_table"),
+        "Table has been dropped.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_tbl_wrong_password"),
+        "Table has been dropped.");
+    execAndFetch(client, session,
+        String.format(dropTableQuery, "impala_jdbc_tbl_without_auth_mech"),
+        "Table has been dropped.");
+
+    // Two successful authentications for each ExecAndFetch().
+    verifyMetrics(31, 0);
+  }
 }
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
index 173753d64..10df71dd5 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
@@ -229,6 +229,9 @@ public class JdbcDataSource implements ExternalDataSource {
           initString = initString.substring(CACHE_CLASS_PREFIX.length());
           cacheClass_ = true;
         }
+        // Replace '\n' with single space character so that one property setting in
+        // initString can be broken into multiple lines for better readability.
+        initString = initString.replace('\n', ' ');
         Map<String, String> config = new ObjectMapper().readValue(initString, typeRef);
         tableConfig_ = JdbcStorageConfigManager.convertMapToConfiguration(config);
       } catch (JsonProcessingException e) {
@@ -294,6 +297,7 @@ public class JdbcDataSource implements ExternalDataSource {
     }
     // Execute query and get iterator
     tableConfig_.set(JdbcStorageConfig.QUERY.getPropertyName(), sb.toString());
+    LOG.trace("JDBC Query: " + sb.toString());
 
     if (schema_.getColsSize() != 0) {
       int limit = -1;
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
index 9b30350bc..a01fb4108 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
@@ -24,5 +24,6 @@ public enum DatabaseType {
   ORACLE,
   POSTGRES,
   MSSQL,
-  JETHRO_DATA
+  JETHRO_DATA,
+  IMPALA
 }
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
index 0e1ac5ab3..e48fb07ef 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
@@ -26,6 +26,8 @@ public enum JdbcStorageConfig {
   // JDBC connection string, including the database type, IP address, port number, and
   // database name. For example, "jdbc:postgresql://127.0.0.1:5432/functional
   JDBC_URL("jdbc.url", true),
+  // Authentication mechanisms of JDBC driver.
+  JDBC_AUTH("jdbc.auth", false),
   // Class name of JDBC driver. For example, "org.postgresql.Driver"
   JDBC_DRIVER_CLASS("jdbc.driver", true),
   // Driver URL for downloading the Jar file package that is used to access the external
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
index 5415b5763..decde6797 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
@@ -57,6 +57,10 @@ public class DatabaseAccessorFactory {
         accessor = new DB2DatabaseAccessor();
         break;
 
+      case IMPALA:
+        accessor = new ImpalaDatabaseAccessor();
+        break;
+
       default:
         accessor = new GenericJdbcDatabaseAccessor();
         break;
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
index b2e820c30..4ab214d2c 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -48,6 +48,7 @@ import org.apache.impala.thrift.TStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
@@ -283,7 +284,12 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
     }
 
     // essential properties
-    dbProperties.put("url", conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName()));
+    String jdbcUrl = conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName());
+    String jdbcAuth = conf.get(JdbcStorageConfig.JDBC_AUTH.getPropertyName());
+    if (!Strings.isNullOrEmpty(jdbcAuth)) {
+      jdbcUrl += ";" + jdbcAuth;
+    }
+    dbProperties.put("url", jdbcUrl);
     dbProperties.put("driverClassName",
         conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName()));
     dbProperties.put("driverUrl",
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
similarity index 51%
copy from java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
copy to java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
index 9b30350bc..361eba845 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
@@ -15,14 +15,35 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.impala.extdatasource.jdbc.conf;
+package org.apache.impala.extdatasource.jdbc.dao;
+
+/**
+ * Impala specific data accessor. This is needed because Impala JDBC drivers do not
+ * support generic LIMIT and OFFSET escape functions
+ */
+public class ImpalaDatabaseAccessor extends GenericJdbcDatabaseAccessor {
+
+  @Override
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    } else {
+      if (limit != -1) {
+        return sql + " LIMIT " + limit + " OFFSET " + offset;
+      } else {
+        return sql;
+      }
+    }
+  }
+
+
+  @Override
+  protected String addLimitToQuery(String sql, int limit) {
+    if (limit != -1) {
+      return sql + " LIMIT " + limit;
+    } else {
+      return sql;
+    }
+  }
 
-public enum DatabaseType {
-  MYSQL,
-  H2,
-  DB2,
-  ORACLE,
-  POSTGRES,
-  MSSQL,
-  JETHRO_DATA
 }
diff --git a/testdata/bin/download-impala-jdbc-driver.sh b/testdata/bin/download-impala-jdbc-driver.sh
new file mode 100755
index 000000000..05d445295
--- /dev/null
+++ b/testdata/bin/download-impala-jdbc-driver.sh
@@ -0,0 +1,72 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# This script download the Impala jdbc driver and copy it to Hadoop FS.
+
+set -euo pipefail
+. $IMPALA_HOME/bin/report_build_error.sh
+setup_report_build_error
+
+. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
+
+EXT_DATA_SOURCES_HDFS_PATH=${FILESYSTEM_PREFIX}/test-warehouse/data-sources
+JDBC_DRIVERS_HDFS_PATH=${EXT_DATA_SOURCES_HDFS_PATH}/jdbc-drivers
+SIMBA_DRIVER_ZIP_FILENAME=ClouderaImpala_JDBC${IMPALA_SIMBA_JDBC_DRIVER_VERSION}
+INNER_SIMBA_DRIVER_ZIP_FILENAME=ClouderaImpalaJDBC${IMPALA_SIMBA_JDBC_DRIVER_VERSION}
+DRIVER_JAR_VERSION=${IMPALA_SIMBA_JDBC_DRIVER_VERSION%-*}
+SIMBA_DRIVER_JAR_FILENAME=ImpalaJDBC${DRIVER_JAR_VERSION}.jar
+
+found=$(hadoop fs -find ${JDBC_DRIVERS_HDFS_PATH} -name ${SIMBA_DRIVER_JAR_FILENAME})
+if [ ! -z "$found" ]; then
+  echo "JDBC driver jar file already exists"
+  exit 0
+fi
+
+hadoop fs -mkdir -p ${JDBC_DRIVERS_HDFS_PATH}
+pushd /tmp
+
+mkdir -p impala_jdbc_driver
+cd impala_jdbc_driver
+
+# Download Impala jdbc driver.
+wget "https://downloads.cloudera.com/connectors/${SIMBA_DRIVER_ZIP_FILENAME}.zip"
+
+# Use Python modules to unzip zip file since 'unzip' command is not available in some
+# testing environments.
+cat > unzip.py <<__EOT__
+import sys
+from zipfile import PyZipFile
+pzf = PyZipFile(sys.argv[1])
+pzf.extractall()
+__EOT__
+
+# Extract driver jar file from zip file.
+python ./unzip.py ${SIMBA_DRIVER_ZIP_FILENAME}.zip
+python ./unzip.py ${SIMBA_DRIVER_ZIP_FILENAME}/${INNER_SIMBA_DRIVER_ZIP_FILENAME}.zip
+
+# Copy driver jar file to Hadoop FS.
+hadoop fs -put -f /tmp/impala_jdbc_driver/${SIMBA_DRIVER_JAR_FILENAME} \
+    ${JDBC_DRIVERS_HDFS_PATH}/${SIMBA_DRIVER_JAR_FILENAME}
+
+echo "Copied ${SIMBA_DRIVER_JAR_FILENAME} into HDFS ${JDBC_DRIVERS_HDFS_PATH}"
+
+cd ..
+rm -rf impala_jdbc_driver
+popd
+
diff --git a/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test b/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
new file mode 100644
index 000000000..8001295d4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
@@ -0,0 +1,237 @@
+====
+---- QUERY
+# Create DataSource
+DROP DATA SOURCE IF EXISTS TestJdbcDataSource;
+CREATE DATA SOURCE TestJdbcDataSource
+LOCATION '$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-data-source.jar'
+CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource'
+API_VERSION 'V1';
+---- RESULTS
+'Data source has been created.'
+====
+---- QUERY
+# Show created DataSource
+SHOW DATA SOURCES LIKE 'testjdbcdatasource';
+---- LABELS
+NAME,LOCATION,CLASS NAME,API VERSION
+---- RESULTS
+'testjdbcdatasource',regex:'.*/test-warehouse/data-sources/jdbc-data-source.jar','org.apache.impala.extdatasource.jdbc.JdbcDataSource','V1'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+# Create external JDBC DataSource table
+DROP TABLE IF EXISTS alltypes_jdbc_datasource;
+CREATE TABLE alltypes_jdbc_datasource (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE TestJdbcDataSource(
+'{"database.type":"IMPALA",
+"jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional",
+"jdbc.auth":"AuthMech=0",
+"jdbc.driver":"com.cloudera.impala.jdbc.Driver",
+"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar",
+"dbcp.username":"impala",
+"dbcp.password":"cloudera",
+"table":"alltypes"}');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Create external JDBC DataSource table
+DROP TABLE IF EXISTS alltypes_jdbc_datasource_2;
+CREATE TABLE alltypes_jdbc_datasource_2 (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE TestJdbcDataSource(
+'{"database.type":"IMPALA",
+"jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional",
+"jdbc.auth":"AuthMech=0",
+"jdbc.driver":"com.cloudera.impala.jdbc.Driver",
+"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar",
+"dbcp.username":"impala",
+"dbcp.password":"cloudera",
+"table":"alltypes"}');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Test the jdbc DataSource
+# count(*) with a predicate evaluated by Impala
+# Binary predicates are pushed to the external jdbc DataSource.
+select count(*) from alltypes_jdbc_datasource
+where float_col = 0 and string_col is not NULL
+---- RESULTS
+730
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 730 .*
+aggregation(SUM, RowsRead): 730
+====
+---- QUERY
+# count(*) with no predicates has no materialized slots
+select count(*) from alltypes_jdbc_datasource
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 7.30K .*
+aggregation(SUM, RowsRead): 7300
+====
+---- QUERY
+# Gets all types including a row with a NULL value. The binary predicates are pushed to
+# the DataSource, "order by" and "limit" are evaluated locally.
+select *
+from alltypes_jdbc_datasource
+where id > 10 and int_col< 5 order by id limit 5 offset 0
+---- RESULTS
+11,false,1,1,1,10,1.100000023841858,10.1,'01/02/09','1',2009-01-02 00:11:00.450000000
+12,true,2,2,2,20,2.200000047683716,20.2,'01/02/09','2',2009-01-02 00:12:00.460000000
+13,false,3,3,3,30,3.299999952316284,30.3,'01/02/09','3',2009-01-02 00:13:00.480000000
+14,true,4,4,4,40,4.400000095367432,40.4,'01/02/09','4',2009-01-02 00:14:00.510000000
+20,true,0,0,0,0,0,0,'01/03/09','0',2009-01-03 00:20:00.900000000
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 4 .*
+row_regex: .*RowsRead: 3.64K .*
+aggregation(SUM, RowsRead): 3644
+====
+---- QUERY
+# Gets specified columns.
+# The binary predicates are pushed to the DataSource, "order by" and "limit" are
+# evaluated locally.
+select id, bool_col, smallint_col, float_col, double_col, date_string_col
+from alltypes_jdbc_datasource
+where id > 10 and int_col< 5 order by id limit 5 offset 0
+---- RESULTS
+11,false,1,1.100000023841858,10.1,'01/02/09'
+12,true,2,2.200000047683716,20.2,'01/02/09'
+13,false,3,3.299999952316284,30.3,'01/02/09'
+14,true,4,4.400000095367432,40.4,'01/02/09'
+20,true,0,0,0,'01/03/09'
+---- TYPES
+INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 4 .*
+row_regex: .*RowsRead: 3.64K .*
+aggregation(SUM, RowsRead): 3644
+====
+---- QUERY
+# Gets specified columns from external jdbc table with case sensitive column names
+# and table name.
+# The binary predicates are pushed to the DataSource, "order by" and "limit" are
+# evaluated locally.
+select id, bool_col, smallint_col, float_col, double_col, date_string_col
+from alltypes_jdbc_datasource_2
+where id > 10 and int_col< 5 order by id limit 5 offset 0
+---- RESULTS
+11,false,1,1.100000023841858,10.1,'01/02/09'
+12,true,2,2.200000047683716,20.2,'01/02/09'
+13,false,3,3.299999952316284,30.3,'01/02/09'
+14,true,4,4.400000095367432,40.4,'01/02/09'
+20,true,0,0,0,'01/03/09'
+---- TYPES
+INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 4 .*
+row_regex: .*RowsRead: 3.64K .*
+aggregation(SUM, RowsRead): 3644
+====
+---- QUERY
+# Inner join with a non jdbc table
+# The binary predicates are pushed to the DataSource, but no predicate defined for
+# local table.
+select a.id, b.int_col
+from alltypes_jdbc_datasource a inner join functional.alltypes b on (a.id = b.id)
+where a.id = 1
+---- RESULTS
+1,1
+---- TYPES
+INT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 1 .*
+aggregation(SUM, RowsRead): 7301
+====
+---- QUERY
+# Inner join with another jdbc table
+# The binary predicates are pushed to the two DataSource Nodes.
+select a.id, b.int_col
+from alltypes_jdbc_datasource a inner join alltypes_jdbc_datasource_2 b on (a.id = b.id)
+where a.id < 3 group by a.id, b.int_col
+---- RESULTS
+0,0
+1,1
+2,2
+---- TYPES
+INT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 3 .*
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Cross join
+# The binary predicates are pushed to the two DataSource Nodes.
+select a.id, b.id
+from alltypes_jdbc_datasource a cross join alltypes_jdbc_datasource b
+where (a.id < 3 and b.id < 3)
+order by a.id, b.id limit 10
+---- RESULTS
+0,0
+0,1
+0,2
+1,0
+1,1
+1,2
+2,0
+2,1
+2,2
+---- TYPES
+INT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumExternalDataSourceGetNext: 1 .*
+row_regex: .*RowsRead: 3 .*
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_jdbc_datasource;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_jdbc_datasource_2;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+# Drop DataSource
+DROP DATA SOURCE TestJdbcDataSource;
+---- RESULTS
+'Data source has been dropped.'
+====
diff --git a/tests/custom_cluster/test_ext_data_sources.py b/tests/custom_cluster/test_ext_data_sources.py
index 78f16dd75..f12fd1b56 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -109,3 +109,37 @@ class TestMySqlExtJdbcTables(CustomClusterTestSuite):
   def test_mysql_ext_jdbc_tables(self, vector, unique_database):
     """Run tests for external jdbc tables on MySQL"""
     self.run_test_case('QueryTest/mysql-ext-jdbc-tables', vector, use_db=unique_database)
+
+
+class TestImpalaExtJdbcTables(CustomClusterTestSuite):
+  """Impala query tests for external jdbc tables in Impala cluster."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def _download_impala_jdbc_driver(cls):
+    # Download Impala jdbc driver and copy jdbc driver to HDFS.
+    script = os.path.join(
+        os.environ['IMPALA_HOME'], 'testdata/bin/download-impala-jdbc-driver.sh')
+    run_cmd = [script]
+    try:
+      subprocess.check_call(run_cmd, close_fds=True)
+    except subprocess.CalledProcessError:
+      assert False, "Failed to download Impala JDBC driver"
+
+  @classmethod
+  def setup_class(cls):
+    cls._download_impala_jdbc_driver()
+    super(TestImpalaExtJdbcTables, cls).setup_class()
+
+  @classmethod
+  def teardown_class(cls):
+    super(TestImpalaExtJdbcTables, cls).teardown_class()
+
+  @pytest.mark.execute_serially
+  def test_impala_ext_jdbc_tables(self, vector, unique_database):
+    """Run tests for external jdbc tables in Impala cluster"""
+    self.run_test_case(
+        'QueryTest/impala-ext-jdbc-tables', vector, use_db=unique_database)


(impala) 01/02: IMPALA-12661: Fix ASAN heap-use-after-free in IcebergMetadataScanNode

Posted by wz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9fd1c81845e7d84ed6b9a903aa6ebe37a8abd254
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Thu Dec 21 19:07:09 2023 +0100

    IMPALA-12661: Fix ASAN heap-use-after-free in IcebergMetadataScanNode
    
    The ASAN builds detected that the IcebergMetadataScanNode uses heap
    allocated memory after it has been freed.
    
    In CreateFieldAccessors() method, during tree traversal, the
    current_type variable is reassigned to its children which is part of
    of the object. However, by the end of the assignment the rhs object will
    be destroyed. To fix this issue, the variable was replaced with a pointer.
    
    Testing:
     - Ran tests on ASAN build
    
    Change-Id: I6df9c9cb6914a0c6c93b61aa0dd02acfdba68851
    Reviewed-on: http://gerrit.cloudera.org:8080/20829
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
index f7f5c9a6d..d779992fb 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
@@ -104,12 +104,12 @@ Status IcebergMetadataScanNode::CreateFieldAccessors() {
       // STRUCT node that stores the primitive type. Because, that struct node has the
       // field id list of its childs.
       int root_type_index = slot_desc->col_path()[0];
-      ColumnType current_type =
-          tuple_desc_->table_desc()->col_descs()[root_type_index].type();
+      ColumnType* current_type = &const_cast<ColumnType&>(
+          tuple_desc_->table_desc()->col_descs()[root_type_index].type());
       for (int i = 1; i < slot_desc->col_path().size() - 1; ++i) {
-        current_type = current_type.children[slot_desc->col_path()[i]];
+        current_type = &current_type->children[slot_desc->col_path()[i]];
       }
-      int field_id = current_type.field_ids[slot_desc->col_path().back()];
+      int field_id = current_type->field_ids[slot_desc->col_path().back()];
       RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
     } else {
       // For primitives in the top level tuple, use the ColumnDescriptor