You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/31 11:25:26 UTC

[flink] branch master updated (44119780ff3 -> 549d4327cf4)

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 44119780ff3 [hotfix][tests] Replace deprecated AbstractThrowableAssert#getRootCause
     new 6630ce7d6fc [FLINK-28938][hive] Fix HiveServer2 Endpoint can not set variable correctly
     new 549d4327cf4 [FLINK-28938][hive] Improve error messages for unsupported interfaces

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 40 +++++++++++-----
 .../endpoint/hive/util/HiveJdbcParameterUtils.java | 44 +++++-------------
 .../delegation/hive/copy/HiveSetProcessor.java     |  7 +++
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 54 ++++++++++++++++++++--
 4 files changed, 97 insertions(+), 48 deletions(-)


[flink] 01/02: [FLINK-28938][hive] Fix HiveServer2 Endpoint can not set variable correctly

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6630ce7d6fcb118900a19bc0bb9143b3d388bf1a
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Aug 23 12:33:03 2022 +0800

    [FLINK-28938][hive] Fix HiveServer2 Endpoint can not set variable correctly
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 21 +++++++++--
 .../endpoint/hive/util/HiveJdbcParameterUtils.java | 44 ++++++----------------
 .../delegation/hive/copy/HiveSetProcessor.java     |  7 ++++
 .../endpoint/hive/HiveServer2EndpointITCase.java   |  7 +++-
 4 files changed, 41 insertions(+), 38 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 6c6335d07b3..f420e9869e6 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -124,7 +124,7 @@ import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
 import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
-import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
+import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.setVariables;
 import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
 import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetColumnsExecutor;
 import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetFunctionsExecutor;
@@ -304,10 +304,10 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name());
             sessionConfig.put(RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.name());
             sessionConfig.put(TABLE_DML_SYNC.key(), "true");
-            sessionConfig.putAll(validateAndNormalize(originSessionConf));
-
             HiveConf conf = HiveCatalog.createHiveConf(hiveConfPath, null);
-            sessionConfig.forEach(conf::set);
+            // set variables to HiveConf or Session's conf
+            setVariables(conf, sessionConfig, originSessionConf);
+
             Catalog hiveCatalog =
                     new HiveCatalog(
                             catalogName,
@@ -401,6 +401,19 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                     tExecuteStatementReq.isSetConfOverlay()
                             ? tExecuteStatementReq.getConfOverlay()
                             : Collections.emptyMap();
+            String loggingOperationEnableVar =
+                    HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED.varname;
+            if (Boolean.parseBoolean(
+                    executionConfig.getOrDefault(
+                            loggingOperationEnableVar,
+                            HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED
+                                    .defaultStrVal))) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "SqlGateway doesn't support logging for operation. Please disable"
+                                        + " it by setting %s to false.",
+                                loggingOperationEnableVar));
+            }
             long timeout = tExecuteStatementReq.getQueryTimeout();
 
             OperationHandle operationHandle =
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java
index 03c2a5385df..123e3f5b77c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java
@@ -19,17 +19,13 @@
 package org.apache.flink.table.endpoint.hive.util;
 
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.delegation.hive.copy.HiveSetProcessor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX;
-import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX;
-import static org.apache.hadoop.hive.conf.SystemVariables.HIVEVAR_PREFIX;
-import static org.apache.hadoop.hive.conf.SystemVariables.METACONF_PREFIX;
-import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX;
-
 /**
  * Utils to normalize and validate hive jdbc conf.
  *
@@ -49,37 +45,21 @@ public class HiveJdbcParameterUtils {
     private static final String USE_PREFIX = "use:";
     private static final String USE_DATABASE = "database";
 
-    public static Map<String, String> validateAndNormalize(Map<String, String> parameters) {
-        Map<String, String> normalized = new HashMap<>();
+    /**
+     * Use the {@param parameters} to set {@param hiveConf} or {@param hiveVariables} according to
+     * what kinds of the parameter belongs.
+     */
+    public static void setVariables(
+            HiveConf hiveConf, Map<String, String> sessionConfigs, Map<String, String> parameters) {
         for (Map.Entry<String, String> entry : parameters.entrySet()) {
             String key = entry.getKey();
-            String normalizedKey = key;
             if (key.startsWith(SET_PREFIX)) {
                 String newKey = key.substring(SET_PREFIX.length());
-                // TODO: use HiveParserSetProcessor when FLINK-28096 is fixed
-                if (newKey.startsWith(ENV_PREFIX)) {
-                    throw new ValidationException(
-                            String.format(
-                                    "Can not set env variables %s during the session connection.",
-                                    key));
-                } else if (newKey.startsWith(SYSTEM_PREFIX)) {
-                    normalizedKey = newKey.substring(SYSTEM_PREFIX.length());
-                } else if (newKey.startsWith(HIVECONF_PREFIX)) {
-                    normalizedKey = newKey.substring(HIVECONF_PREFIX.length());
-                } else if (newKey.startsWith(HIVEVAR_PREFIX)) {
-                    normalizedKey = newKey.substring(HIVEVAR_PREFIX.length());
-                } else if (newKey.startsWith(METACONF_PREFIX)) {
-                    normalizedKey = newKey.substring(METACONF_PREFIX.length());
-                } else {
-                    normalizedKey = newKey;
-                }
-            } else if (key.startsWith(USE_PREFIX)) {
-                // ignore use parameters
-                continue;
+                HiveSetProcessor.setVariable(hiveConf, sessionConfigs, newKey, entry.getValue());
+            } else if (!key.startsWith(USE_PREFIX)) {
+                sessionConfigs.put(key, entry.getValue());
             }
-            normalized.put(normalizedKey, entry.getValue());
         }
-        return normalized;
     }
 
     public static Optional<String> getUsedDefaultDatabase(Map<String, String> parameters) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
index 51ad1dddd17..ff8b7de2c8b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
@@ -78,6 +78,13 @@ public class HiveSetProcessor {
                         String.format("'SET %s=%s' FAILED.", varname, varvalue), e);
             }
         } else {
+            // here is a little of different from Hive's behavior,
+            // if there's no prefix, we also put it to passed hiveVariables for flink
+            // may use it as its own configurations.
+            // Otherwise, there's no way to set Flink's configuration using Hive's set command.
+            hiveVariables.put(
+                    varname,
+                    new VariableSubstitution(() -> hiveVariables).substitute(hiveConf, varvalue));
             setConf(hiveConf, hiveVariables, varname, varname, varvalue);
         }
     }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 4f474bf4254..62ff42d8364 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -125,7 +125,9 @@ public class HiveServer2EndpointITCase extends TestLogger {
         configs.put(MAX_LENGTH_GENERATED_CODE.key(), "-1");
         // simulate to set config using hive jdbc
         configs.put("set:hiveconf:key", "value");
-        // TODO: set hivevar when FLINK-28096 is fixed
+        configs.put("set:system:ks", "vs");
+        configs.put("set:key1", "value1");
+        configs.put("set:hivevar:key2", "${hiveconf:common-key}");
         openSessionReq.setConfiguration(configs);
         TOpenSessionResp openSessionResp = client.OpenSession(openSessionReq);
         SessionHandle sessionHandle =
@@ -140,7 +142,8 @@ public class HiveServer2EndpointITCase extends TestLogger {
                         new AbstractMap.SimpleEntry<>(TABLE_DML_SYNC.key(), "true"),
                         new AbstractMap.SimpleEntry<>(RUNTIME_MODE.key(), BATCH.name()),
                         new AbstractMap.SimpleEntry<>(MAX_LENGTH_GENERATED_CODE.key(), "-1"),
-                        new AbstractMap.SimpleEntry<>("key", "value"));
+                        new AbstractMap.SimpleEntry<>("key1", "value1"),
+                        new AbstractMap.SimpleEntry<>("key2", "common-val"));
     }
 
     @Test


[flink] 02/02: [FLINK-28938][hive] Improve error messages for unsupported interfaces

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 549d4327cf4ae9646f74a1da561dcebecd3d47ff
Author: Shengkai <10...@qq.com>
AuthorDate: Mon Aug 29 18:14:47 2022 +0800

    [FLINK-28938][hive] Improve error messages for unsupported interfaces
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 45 +++++++++++----------
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 47 +++++++++++++++++++++-
 2 files changed, 69 insertions(+), 23 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index f420e9869e6..55bd8b21987 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -155,8 +155,8 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     private static final Logger LOG = LoggerFactory.getLogger(HiveServer2Endpoint.class);
     private static final HiveServer2EndpointVersion SERVER_VERSION = HIVE_CLI_SERVICE_PROTOCOL_V10;
     private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
-    private static final String ERROR_MESSAGE =
-            "The HiveServer2 Endpoint currently doesn't support this API.";
+    private static final String UNSUPPORTED_ERROR_MESSAGE =
+            "The HiveServer2 Endpoint currently doesn't support to %s.";
 
     // --------------------------------------------------------------------------------------------
     // Server attributes
@@ -401,19 +401,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                     tExecuteStatementReq.isSetConfOverlay()
                             ? tExecuteStatementReq.getConfOverlay()
                             : Collections.emptyMap();
-            String loggingOperationEnableVar =
-                    HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED.varname;
-            if (Boolean.parseBoolean(
-                    executionConfig.getOrDefault(
-                            loggingOperationEnableVar,
-                            HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED
-                                    .defaultStrVal))) {
-                throw new IllegalArgumentException(
-                        String.format(
-                                "SqlGateway doesn't support logging for operation. Please disable"
-                                        + " it by setting %s to false.",
-                                loggingOperationEnableVar));
-            }
             long timeout = tExecuteStatementReq.getQueryTimeout();
 
             OperationHandle operationHandle =
@@ -623,7 +610,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq tGetCrossReferenceReq)
             throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        return new TGetCrossReferenceResp(buildErrorStatus("GetCrossReference"));
     }
 
     @Override
@@ -706,6 +693,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
     @Override
     public TFetchResultsResp FetchResults(TFetchResultsReq tFetchResultsReq) throws TException {
+        if (tFetchResultsReq.getFetchType() != 0) {
+            // Don't log the annoying messages because Hive beeline will fetch the logs until
+            // operation is terminated.
+            return new TFetchResultsResp(
+                    toTStatus(
+                            new UnsupportedOperationException(
+                                    "The HiveServer2 endpoint currently doesn't support to fetch logs.")));
+        }
         TFetchResultsResp resp = new TFetchResultsResp();
         try {
             SessionHandle sessionHandle = toSessionHandle(tFetchResultsReq.getOperationHandle());
@@ -761,30 +756,32 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq tGetDelegationTokenReq)
             throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        return new TGetDelegationTokenResp(buildErrorStatus("GetDelegationToken"));
     }
 
     @Override
     public TCancelDelegationTokenResp CancelDelegationToken(
             TCancelDelegationTokenReq tCancelDelegationTokenReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        return new TCancelDelegationTokenResp(buildErrorStatus("CancelDelegationToken"));
     }
 
     @Override
     public TRenewDelegationTokenResp RenewDelegationToken(
             TRenewDelegationTokenReq tRenewDelegationTokenReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        return new TRenewDelegationTokenResp(buildErrorStatus("RenewDelegationToken"));
     }
 
     // CHECKSTYLE.OFF: MethodName
     /** To be compatible with Hive3, add a default implementation. */
     public TGetQueryIdResp GetQueryId(TGetQueryIdReq tGetQueryIdReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        throw new TException(
+                new UnsupportedOperationException(
+                        String.format(UNSUPPORTED_ERROR_MESSAGE, "GetQueryId")));
     }
 
     /** To be compatible with Hive3, add a default implementation. */
     public TSetClientInfoResp SetClientInfo(TSetClientInfoReq tSetClientInfoReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        return new TSetClientInfoResp(buildErrorStatus("SetClientInfo"));
     }
     // CHECKSTYLE.ON: MethodName
 
@@ -892,4 +889,10 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             return root.getClass().getName() + ": " + root.getMessage();
         }
     }
+
+    private TStatus buildErrorStatus(String methodName) {
+        return toTStatus(
+                new UnsupportedOperationException(
+                        String.format(UNSUPPORTED_ERROR_MESSAGE, methodName)));
+    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 62ff42d8364..2bcaa020fcd 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.endpoint.hive;
 
 import org.apache.flink.FlinkVersion;
-import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.config.TableConfigOptions;
@@ -46,6 +45,8 @@ import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
 import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.HiveStatement;
 import org.apache.hive.service.rpc.thrift.TCLIService;
 import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
 import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
@@ -83,6 +84,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
@@ -162,6 +164,47 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                                 "Session '%s' does not exist", sessionHandle)));
     }
 
+    @Test
+    public void testGetUnsupportedException() throws Exception {
+        try (HiveConnection connection = (HiveConnection) ENDPOINT_EXTENSION.getConnection();
+                HiveStatement statement = (HiveStatement) connection.createStatement()) {
+            assertThatThrownBy(() -> connection.renewDelegationToken("TokenMessage"))
+                    .satisfies(
+                            anyCauseMatches(
+                                    "The HiveServer2 Endpoint currently doesn't support to RenewDelegationToken."));
+            assertThatThrownBy(() -> connection.cancelDelegationToken("TokenMessage"))
+                    .satisfies(
+                            anyCauseMatches(
+                                    "The HiveServer2 Endpoint currently doesn't support to CancelDelegationToken."));
+            assertThatThrownBy(() -> connection.getDelegationToken("Flink", "TokenMessage"))
+                    .satisfies(
+                            anyCauseMatches(
+                                    "The HiveServer2 Endpoint currently doesn't support to GetDelegationToken."));
+            assertThatThrownBy(
+                            () ->
+                                    connection
+                                            .getMetaData()
+                                            .getCrossReference(
+                                                    "hive",
+                                                    "schema",
+                                                    "table",
+                                                    "default_catalog",
+                                                    "default_database",
+                                                    "table"))
+                    .satisfies(
+                            anyCauseMatches(
+                                    "The HiveServer2 Endpoint currently doesn't support to GetCrossReference."));
+            assertThatThrownBy(
+                            () -> {
+                                statement.execute("SHOW TABLES");
+                                statement.getQueryLog();
+                            })
+                    .satisfies(
+                            anyCauseMatches(
+                                    "The HiveServer2 endpoint currently doesn't support to fetch logs."));
+        }
+    }
+
     @Test
     public void testCancelOperation() throws Exception {
         runOperationRequest(
@@ -201,7 +244,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                                         .getOperationInfo(
                                                                 sessionHandle, operationHandle))
                                 .satisfies(
-                                        FlinkAssertions.anyCauseMatches(
+                                        anyCauseMatches(
                                                 SqlGatewayException.class,
                                                 String.format(
                                                         "Can not find the submitted operation in the OperationManager with the %s",