You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2024/01/17 23:34:22 UTC

(impala) branch master updated: IMPALA-12642: Support query options for Impala external JDBC table

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new f8e8cd090 IMPALA-12642: Support query options for Impala external JDBC table
f8e8cd090 is described below

commit f8e8cd0906f32373468b2271a38dc1aeb7ba506c
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Wed Dec 27 15:37:23 2023 -0800

    IMPALA-12642: Support query options for Impala external JDBC table
    
    This patch uses JDBC connection string to apply query options to the
    Impala server by setting the properties in "jdbc.properties" when
    creating JDBC external DataSource table.
    jdbc.properties are specified as comma-delimited key=value string, like
    "MEM_LIMIT=1000000000, ENABLED_RUNTIME_FILTER_TYPES=\"BLOOM,MIN_MAX\"".
    
    Fixed Impala to allow value of ENABLED_RUNTIME_FILTER_TYPES to have
    double quotes in the beginning and ending of string.
    
    jdbc.properties can be used for other databases like Postgres and MySQL
    to set additional properties. The test cases will be added in separate
    patch.
    
    Testing:
     - Added end-to-end tests for setting query options on Impala JDBC
       tables.
     - Passed core tests.
    
    Change-Id: I47687b7a93e90cea8ebd5f3fc280c9135bd97992
    Reviewed-on: http://gerrit.cloudera.org:8080/20837
    Reviewed-by: Abhishek Rawat <ar...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options.cc                    | 16 +++++++-
 .../extdatasource/jdbc/conf/JdbcStorageConfig.java |  6 +++
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java      | 46 +++++++++++++++++++++-
 .../jdbc/dao/ImpalaDatabaseAccessor.java           | 15 ++++++-
 .../jdbc/dao/MySqlDatabaseAccessor.java            | 14 +++++++
 .../jdbc/dao/PostgresDatabaseAccessor.java         | 15 +++++++
 .../queries/QueryTest/impala-ext-jdbc-tables.test  |  2 +
 .../queries/QueryTest/jdbc-data-source.test        |  1 +
 .../queries/QueryTest/mysql-ext-jdbc-tables.test   |  1 +
 tests/custom_cluster/test_ext_data_sources.py      | 18 ++++++++-
 10 files changed, 129 insertions(+), 5 deletions(-)

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 0cf88b904..c31c8818c 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -18,6 +18,7 @@
 #include "service/query-options.h"
 
 #include <limits>
+#include <regex>
 #include <sstream>
 
 #include <boost/algorithm/string.hpp>
@@ -836,14 +837,25 @@ Status impala::SetQueryOption(const string& key, const string& value,
       }
       case TImpalaQueryOptions::ENABLED_RUNTIME_FILTER_TYPES: {
         std::set<TRuntimeFilterType::type> filter_types;
-        if (iequals(value, "all")) {
+        // Impala backend expects comma separated values to be in quotes when executing
+        // SET statement. This is usually the case when running
+        // SET query_option="value1,value2" using a jdbc driver. When using Impala-shell
+        // client, the SET statement is not executed immediately but query options are
+        // updated in the client and applied as part of following statement, so no quotes
+        // are required for Impala-shell SET query_option=value1,value2.
+        // By removing double quotes from the beginning and ending of the option value,
+        // SET ENABLED_RUNTIME_FILTER_TYPES="BLOOM,MIN_MAX" works for jdbc driver,
+        // both SET ENABLED_RUNTIME_FILTER_TYPES="BLOOM,MIN_MAX" and
+        // SET ENABLED_RUNTIME_FILTER_TYPES=BLOOM,MIN_MAX work for Impala-shell.
+        const string filter_value = std::regex_replace(value, std::regex("^\"|\"$"), "");
+        if (iequals(filter_value, "all")) {
           for (const auto& kv : _TRuntimeFilterType_VALUES_TO_NAMES) {
             filter_types.insert(static_cast<TRuntimeFilterType::type>(kv.first));
           }
         } else {
           // Parse and verify the enabled runtime filter types.
           vector<string> str_types;
-          split(str_types, value, is_any_of(","), token_compress_on);
+          split(str_types, filter_value, is_any_of(","), token_compress_on);
           for (const auto& t : str_types) {
             TRuntimeFilterType::type filter_type;
             RETURN_IF_ERROR(GetThriftEnum(t, "runtime filter type",
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
index 36d62e8f2..2611a591b 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
@@ -45,6 +45,12 @@ public enum JdbcStorageConfig {
   DBCP_PASSWORD_KEYSTORE("dbcp.password.keystore", false),
   // Number of rows to fetch in a batch.
   JDBC_FETCH_SIZE("jdbc.fetch.size", false),
+  // Additional properties applied to database engine, like Impala Query options.
+  // Properties are specified as comma-delimited key=value string. For example,
+  // "jdbc.properties":"MEM_LIMIT=1000000000, MAX_ERRORS=10000".
+  // Comma in string type of value is allowed. For example,
+  // "jdbc.properties":"ENABLED_RUNTIME_FILTER_TYPES=\"BLOOM,MIN_MAX\"".
+  JDBC_OPTIONS("jdbc.properties", false),
   // SQL query which specify how to get data from external database.
   // User need to specify either “table” or “query” in the create table statement.
   QUERY("query", false),
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
index b60c3d15a..b05ef01aa 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -30,6 +30,8 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import javax.sql.DataSource;
 
@@ -48,6 +50,7 @@ import org.apache.impala.thrift.TStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -173,6 +176,36 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
     return name;
   }
 
+  protected boolean isAdditionalPropertiesSupported() {
+    return false;
+  }
+
+  protected String getPropertiesDelimiter(boolean precededDelimiter) {
+    return null;
+  }
+
+  protected String getAdditionalProperties(String configProperties) {
+    if (Strings.isNullOrEmpty(configProperties)) return null;
+    String delimiter = getPropertiesDelimiter(/* precededDelimiter */ false);
+    Preconditions.checkState(!Strings.isNullOrEmpty(delimiter));
+    // Extract valid query options.
+    Pattern pattern = Pattern.compile("(\\w*\\s*)=(\\s*\"[^\"]*\"|[^,]*)");
+    Matcher matcher = pattern.matcher(configProperties);
+    StringBuilder sb = new StringBuilder();
+    while (matcher.find()) {
+      Preconditions.checkState(!Strings.isNullOrEmpty(matcher.group(1)));
+      if (Strings.isNullOrEmpty(matcher.group(2))) {
+        LOG.info("Ignore invalid query option '{}'", matcher.group(1));
+        continue;
+      }
+      if (sb.length() > 0) sb.append(delimiter);
+      sb.append(matcher.group(1).trim());
+      sb.append("=");
+      sb.append(matcher.group(2).trim());
+    }
+    return sb.toString();
+  }
+
   /**
    * Uses generic JDBC escape functions to add a limit and offset clause to a query
    * string
@@ -291,10 +324,21 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
 
     // essential properties
     String jdbcUrl = conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName());
+    boolean precededDelimiter = true;
     String jdbcAuth = conf.get(JdbcStorageConfig.JDBC_AUTH.getPropertyName());
     if (!Strings.isNullOrEmpty(jdbcAuth)) {
-      jdbcUrl += ";" + jdbcAuth;
+      jdbcUrl += getPropertiesDelimiter(precededDelimiter) + jdbcAuth;
+      precededDelimiter = false;
+    }
+    if (isAdditionalPropertiesSupported()) {
+      String additionalProperties = getAdditionalProperties(
+          conf.get(JdbcStorageConfig.JDBC_OPTIONS.getPropertyName()));
+      if (!Strings.isNullOrEmpty(additionalProperties)) {
+        jdbcUrl += getPropertiesDelimiter(precededDelimiter) + additionalProperties;
+        if (precededDelimiter) precededDelimiter = false;
+      }
     }
+    LOG.trace("JDBC URL: {}", jdbcUrl);
     dbProperties.put("url", jdbcUrl);
     dbProperties.put("driverClassName",
         conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName()));
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
index 361eba845..96ac5e451 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java
@@ -36,7 +36,6 @@ public class ImpalaDatabaseAccessor extends GenericJdbcDatabaseAccessor {
     }
   }
 
-
   @Override
   protected String addLimitToQuery(String sql, int limit) {
     if (limit != -1) {
@@ -46,4 +45,18 @@ public class ImpalaDatabaseAccessor extends GenericJdbcDatabaseAccessor {
     }
   }
 
+  @Override
+  protected boolean isAdditionalPropertiesSupported() {
+    return true;
+  }
+
+  @Override
+  protected String getPropertiesDelimiter(boolean precededDelimiter) {
+    /* Query options can be set in optional, semicolon-separated key/value string
+     * in JDBC connection string in following format:
+     *     jdbc:impala://[host][:port]/[database][;optionName1=optionValue1]
+     *     [;optionName2=optionValue2]...
+     */
+    return ";";
+  }
 }
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MySqlDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MySqlDatabaseAccessor.java
index cd9ada13b..ad1000d7c 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MySqlDatabaseAccessor.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MySqlDatabaseAccessor.java
@@ -46,4 +46,18 @@ public class MySqlDatabaseAccessor extends GenericJdbcDatabaseAccessor {
     }
   }
 
+  @Override
+  protected boolean isAdditionalPropertiesSupported() {
+    return true;
+  }
+
+  @Override
+  protected String getPropertiesDelimiter(boolean precededDelimiter) {
+    /* Additional properties can be set in optional, preceded by ? and ampersand
+     * separated key/value string in JDBC connection string in following format:
+     *     jdbc:mysql://[host][,failoverhost...][:port]/[database]
+     *     [?propertyName1=propertyValue1][&propertyName2=propertyValue2]...
+     */
+    return precededDelimiter ? "?" : "&";
+  }
 }
diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/PostgresDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/PostgresDatabaseAccessor.java
index 247acc742..dc1ef2402 100644
--- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/PostgresDatabaseAccessor.java
+++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/PostgresDatabaseAccessor.java
@@ -54,4 +54,19 @@ public class PostgresDatabaseAccessor extends GenericJdbcDatabaseAccessor {
     }
     return sql + " LIMIT " + limit;
   }
+
+  @Override
+  protected boolean isAdditionalPropertiesSupported() {
+    return true;
+  }
+
+  @Override
+  protected String getPropertiesDelimiter(boolean precededDelimiter) {
+    /* Additional properties can be set in optional, ampersand-separated key/value string
+     * in JDBC connection string in following format:
+     *     jdbc:postgresql://[host][:port]/[database][?propertyName1=propertyValue1]
+     *     [&propertyName2=propertyValue2]...
+     */
+    return precededDelimiter ? "?" : "&";
+  }
 }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test b/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
index 8001295d4..b68db24f5 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
@@ -38,6 +38,7 @@ PRODUCED BY DATA SOURCE TestJdbcDataSource(
 '{"database.type":"IMPALA",
 "jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional",
 "jdbc.auth":"AuthMech=0",
+"jdbc.properties":"MEM_LIMIT=1000000000, MAX_ERRORS = 10000, ENABLED_RUNTIME_FILTER_TYPES=\"BLOOM,MIN_MAX\"",
 "jdbc.driver":"com.cloudera.impala.jdbc.Driver",
 "driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar",
 "dbcp.username":"impala",
@@ -65,6 +66,7 @@ PRODUCED BY DATA SOURCE TestJdbcDataSource(
 '{"database.type":"IMPALA",
 "jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional",
 "jdbc.auth":"AuthMech=0",
+"jdbc.properties":"QUERY_TIMEOUT_S=600, REQUEST_POOL= \"default-pool\", DEBUG_ACTION",
 "jdbc.driver":"com.cloudera.impala.jdbc.Driver",
 "driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar",
 "dbcp.username":"impala",
diff --git a/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test
index efc4a77d6..f5b73d568 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test
@@ -37,6 +37,7 @@ CREATE TABLE alltypes_jdbc_datasource (
 PRODUCED BY DATA SOURCE TestJdbcDataSource(
 '{"database.type":"POSTGRES",
 "jdbc.url":"jdbc:postgresql://$INTERNAL_LISTEN_HOST:5432/functional",
+"jdbc.properties":"connect_timeout=20, application_name=\"myapp\"",
 "jdbc.driver":"org.postgresql.Driver",
 "driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar",
 "dbcp.username":"hiveuser",
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mysql-ext-jdbc-tables.test b/testdata/workloads/functional-query/queries/QueryTest/mysql-ext-jdbc-tables.test
index ff0745242..4d42bbc3c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/mysql-ext-jdbc-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/mysql-ext-jdbc-tables.test
@@ -37,6 +37,7 @@ CREATE TABLE alltypes_jdbc_datasource (
 PRODUCED BY DATA SOURCE TestJdbcDataSource(
 '{"database.type":"MYSQL",
 "jdbc.url":"jdbc:mysql://localhost:3306/functional",
+"jdbc.properties":"autoReconnect=false, useUnicode=false",
 "jdbc.driver":"com.mysql.cj.jdbc.Driver",
 "driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/mysql-jdbc.jar",
 "dbcp.username":"hiveuser",
diff --git a/tests/custom_cluster/test_ext_data_sources.py b/tests/custom_cluster/test_ext_data_sources.py
index ffa907d06..405955965 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -16,8 +16,9 @@
 # under the License.
 
 from __future__ import absolute_import, division, print_function
-import pytest
 import os
+import pytest
+import requests
 import subprocess
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
@@ -228,3 +229,18 @@ class TestImpalaExtJdbcTables(CustomClusterTestSuite):
     """Run tests for external jdbc tables in Impala cluster"""
     self.run_test_case(
         'QueryTest/impala-ext-jdbc-tables', vector, use_db=unique_database)
+    # Verify the settings of query options with Queries Web page on Impala coordinator
+    response = requests.get("http://localhost:25000/queries?json")
+    response_json = response.text
+    assert "SET MAX_ERRORS=10000" in response_json, \
+        "No matching option MAX_ERRORS found in the queries site."
+    assert "SET MEM_LIMIT=1000000000" in response_json, \
+        "No matching option MEM_LIMIT found in the queries site."
+    assert "SET ENABLED_RUNTIME_FILTER_TYPES=\\\"BLOOM,MIN_MAX\\\"" in response_json, \
+        "No matching option ENABLED_RUNTIME_FILTER_TYPES found in the queries site."
+    assert "SET QUERY_TIMEOUT_S=600" in response_json, \
+        "No matching option QUERY_TIMEOUT_S found in the queries site."
+    assert "SET REQUEST_POOL=\\\"default-pool\\\"" in response_json, \
+        "No matching option REQUEST_POOL found in the queries site."
+    assert "SET DEBUG_ACTION" not in response_json, \
+        "Matching option DEBUG_ACTION found in the queries site."