You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/31 11:25:28 UTC

[flink] 02/02: [FLINK-28938][hive] Improve error messages for unsupported interfaces

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 549d4327cf4ae9646f74a1da561dcebecd3d47ff
Author: Shengkai <10...@qq.com>
AuthorDate: Mon Aug 29 18:14:47 2022 +0800

    [FLINK-28938][hive] Improve error messages for unsupported interfaces
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 45 +++++++++++----------
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 47 +++++++++++++++++++++-
 2 files changed, 69 insertions(+), 23 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index f420e9869e6..55bd8b21987 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -155,8 +155,8 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     private static final Logger LOG = LoggerFactory.getLogger(HiveServer2Endpoint.class);
     private static final HiveServer2EndpointVersion SERVER_VERSION = HIVE_CLI_SERVICE_PROTOCOL_V10;
     private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
-    private static final String ERROR_MESSAGE =
-            "The HiveServer2 Endpoint currently doesn't support this API.";
+    private static final String UNSUPPORTED_ERROR_MESSAGE =
+            "The HiveServer2 Endpoint currently doesn't support to %s.";
 
     // --------------------------------------------------------------------------------------------
     // Server attributes
@@ -401,19 +401,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                     tExecuteStatementReq.isSetConfOverlay()
                             ? tExecuteStatementReq.getConfOverlay()
                             : Collections.emptyMap();
-            String loggingOperationEnableVar =
-                    HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED.varname;
-            if (Boolean.parseBoolean(
-                    executionConfig.getOrDefault(
-                            loggingOperationEnableVar,
-                            HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED
-                                    .defaultStrVal))) {
-                throw new IllegalArgumentException(
-                        String.format(
-                                "SqlGateway doesn't support logging for operation. Please disable"
-                                        + " it by setting %s to false.",
-                                loggingOperationEnableVar));
-            }
             long timeout = tExecuteStatementReq.getQueryTimeout();
 
             OperationHandle operationHandle =
@@ -623,7 +610,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq tGetCrossReferenceReq)
             throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        return new TGetCrossReferenceResp(buildErrorStatus("GetCrossReference"));
     }
 
     @Override
@@ -706,6 +693,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
     @Override
     public TFetchResultsResp FetchResults(TFetchResultsReq tFetchResultsReq) throws TException {
+        if (tFetchResultsReq.getFetchType() != 0) {
+            // Don't log the annoying messages because Hive beeline will fetch the logs until
+            // operation is terminated.
+            return new TFetchResultsResp(
+                    toTStatus(
+                            new UnsupportedOperationException(
+                                    "The HiveServer2 endpoint currently doesn't support to fetch logs.")));
+        }
         TFetchResultsResp resp = new TFetchResultsResp();
         try {
             SessionHandle sessionHandle = toSessionHandle(tFetchResultsReq.getOperationHandle());
@@ -761,30 +756,32 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq tGetDelegationTokenReq)
             throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        return new TGetDelegationTokenResp(buildErrorStatus("GetDelegationToken"));
     }
 
     @Override
     public TCancelDelegationTokenResp CancelDelegationToken(
             TCancelDelegationTokenReq tCancelDelegationTokenReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        return new TCancelDelegationTokenResp(buildErrorStatus("CancelDelegationToken"));
     }
 
     @Override
     public TRenewDelegationTokenResp RenewDelegationToken(
             TRenewDelegationTokenReq tRenewDelegationTokenReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        return new TRenewDelegationTokenResp(buildErrorStatus("RenewDelegationToken"));
     }
 
     // CHECKSTYLE.OFF: MethodName
     /** To be compatible with Hive3, add a default implementation. */
     public TGetQueryIdResp GetQueryId(TGetQueryIdReq tGetQueryIdReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        throw new TException(
+                new UnsupportedOperationException(
+                        String.format(UNSUPPORTED_ERROR_MESSAGE, "GetQueryId")));
     }
 
     /** To be compatible with Hive3, add a default implementation. */
     public TSetClientInfoResp SetClientInfo(TSetClientInfoReq tSetClientInfoReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        return new TSetClientInfoResp(buildErrorStatus("SetClientInfo"));
     }
     // CHECKSTYLE.ON: MethodName
 
@@ -892,4 +889,10 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             return root.getClass().getName() + ": " + root.getMessage();
         }
     }
+
+    private TStatus buildErrorStatus(String methodName) {
+        return toTStatus(
+                new UnsupportedOperationException(
+                        String.format(UNSUPPORTED_ERROR_MESSAGE, methodName)));
+    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 62ff42d8364..2bcaa020fcd 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.endpoint.hive;
 
 import org.apache.flink.FlinkVersion;
-import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.config.TableConfigOptions;
@@ -46,6 +45,8 @@ import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
 import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.HiveStatement;
 import org.apache.hive.service.rpc.thrift.TCLIService;
 import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
 import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
@@ -83,6 +84,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
@@ -162,6 +164,47 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                                 "Session '%s' does not exist", sessionHandle)));
     }
 
+    @Test
+    public void testGetUnsupportedException() throws Exception {
+        try (HiveConnection connection = (HiveConnection) ENDPOINT_EXTENSION.getConnection();
+                HiveStatement statement = (HiveStatement) connection.createStatement()) {
+            assertThatThrownBy(() -> connection.renewDelegationToken("TokenMessage"))
+                    .satisfies(
+                            anyCauseMatches(
+                                    "The HiveServer2 Endpoint currently doesn't support to RenewDelegationToken."));
+            assertThatThrownBy(() -> connection.cancelDelegationToken("TokenMessage"))
+                    .satisfies(
+                            anyCauseMatches(
+                                    "The HiveServer2 Endpoint currently doesn't support to CancelDelegationToken."));
+            assertThatThrownBy(() -> connection.getDelegationToken("Flink", "TokenMessage"))
+                    .satisfies(
+                            anyCauseMatches(
+                                    "The HiveServer2 Endpoint currently doesn't support to GetDelegationToken."));
+            assertThatThrownBy(
+                            () ->
+                                    connection
+                                            .getMetaData()
+                                            .getCrossReference(
+                                                    "hive",
+                                                    "schema",
+                                                    "table",
+                                                    "default_catalog",
+                                                    "default_database",
+                                                    "table"))
+                    .satisfies(
+                            anyCauseMatches(
+                                    "The HiveServer2 Endpoint currently doesn't support to GetCrossReference."));
+            assertThatThrownBy(
+                            () -> {
+                                statement.execute("SHOW TABLES");
+                                statement.getQueryLog();
+                            })
+                    .satisfies(
+                            anyCauseMatches(
+                                    "The HiveServer2 endpoint currently doesn't support to fetch logs."));
+        }
+    }
+
     @Test
     public void testCancelOperation() throws Exception {
         runOperationRequest(
@@ -201,7 +244,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                                         .getOperationInfo(
                                                                 sessionHandle, operationHandle))
                                 .satisfies(
-                                        FlinkAssertions.anyCauseMatches(
+                                        anyCauseMatches(
                                                 SqlGatewayException.class,
                                                 String.format(
                                                         "Can not find the submitted operation in the OperationManager with the %s",