You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/31 11:25:28 UTC
[flink] 02/02: [FLINK-28938][hive] Improve error messages for unsupported interfaces
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 549d4327cf4ae9646f74a1da561dcebecd3d47ff
Author: Shengkai <10...@qq.com>
AuthorDate: Mon Aug 29 18:14:47 2022 +0800
[FLINK-28938][hive] Improve error messages for unsupported interfaces
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 45 +++++++++++----------
.../endpoint/hive/HiveServer2EndpointITCase.java | 47 +++++++++++++++++++++-
2 files changed, 69 insertions(+), 23 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index f420e9869e6..55bd8b21987 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -155,8 +155,8 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
private static final Logger LOG = LoggerFactory.getLogger(HiveServer2Endpoint.class);
private static final HiveServer2EndpointVersion SERVER_VERSION = HIVE_CLI_SERVICE_PROTOCOL_V10;
private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
- private static final String ERROR_MESSAGE =
- "The HiveServer2 Endpoint currently doesn't support this API.";
+ private static final String UNSUPPORTED_ERROR_MESSAGE =
+ "The HiveServer2 Endpoint currently doesn't support to %s.";
// --------------------------------------------------------------------------------------------
// Server attributes
@@ -401,19 +401,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
tExecuteStatementReq.isSetConfOverlay()
? tExecuteStatementReq.getConfOverlay()
: Collections.emptyMap();
- String loggingOperationEnableVar =
- HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED.varname;
- if (Boolean.parseBoolean(
- executionConfig.getOrDefault(
- loggingOperationEnableVar,
- HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED
- .defaultStrVal))) {
- throw new IllegalArgumentException(
- String.format(
- "SqlGateway doesn't support logging for operation. Please disable"
- + " it by setting %s to false.",
- loggingOperationEnableVar));
- }
long timeout = tExecuteStatementReq.getQueryTimeout();
OperationHandle operationHandle =
@@ -623,7 +610,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq tGetCrossReferenceReq)
throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ return new TGetCrossReferenceResp(buildErrorStatus("GetCrossReference"));
}
@Override
@@ -706,6 +693,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public TFetchResultsResp FetchResults(TFetchResultsReq tFetchResultsReq) throws TException {
+ if (tFetchResultsReq.getFetchType() != 0) {
+ // Don't log the annoying messages because Hive beeline will fetch the logs until
+ // operation is terminated.
+ return new TFetchResultsResp(
+ toTStatus(
+ new UnsupportedOperationException(
+ "The HiveServer2 endpoint currently doesn't support to fetch logs.")));
+ }
TFetchResultsResp resp = new TFetchResultsResp();
try {
SessionHandle sessionHandle = toSessionHandle(tFetchResultsReq.getOperationHandle());
@@ -761,30 +756,32 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq tGetDelegationTokenReq)
throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ return new TGetDelegationTokenResp(buildErrorStatus("GetDelegationToken"));
}
@Override
public TCancelDelegationTokenResp CancelDelegationToken(
TCancelDelegationTokenReq tCancelDelegationTokenReq) throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ return new TCancelDelegationTokenResp(buildErrorStatus("CancelDelegationToken"));
}
@Override
public TRenewDelegationTokenResp RenewDelegationToken(
TRenewDelegationTokenReq tRenewDelegationTokenReq) throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ return new TRenewDelegationTokenResp(buildErrorStatus("RenewDelegationToken"));
}
// CHECKSTYLE.OFF: MethodName
/** To be compatible with Hive3, add a default implementation. */
public TGetQueryIdResp GetQueryId(TGetQueryIdReq tGetQueryIdReq) throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ throw new TException(
+ new UnsupportedOperationException(
+ String.format(UNSUPPORTED_ERROR_MESSAGE, "GetQueryId")));
}
/** To be compatible with Hive3, add a default implementation. */
public TSetClientInfoResp SetClientInfo(TSetClientInfoReq tSetClientInfoReq) throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ return new TSetClientInfoResp(buildErrorStatus("SetClientInfo"));
}
// CHECKSTYLE.ON: MethodName
@@ -892,4 +889,10 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
return root.getClass().getName() + ": " + root.getMessage();
}
}
+
+ private TStatus buildErrorStatus(String methodName) {
+ return toTStatus(
+ new UnsupportedOperationException(
+ String.format(UNSUPPORTED_ERROR_MESSAGE, methodName)));
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 62ff42d8364..2bcaa020fcd 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.endpoint.hive;
import org.apache.flink.FlinkVersion;
-import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.config.TableConfigOptions;
@@ -46,6 +45,8 @@ import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.HiveStatement;
import org.apache.hive.service.rpc.thrift.TCLIService;
import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
@@ -83,6 +84,7 @@ import java.util.stream.Collectors;
import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
@@ -162,6 +164,47 @@ public class HiveServer2EndpointITCase extends TestLogger {
"Session '%s' does not exist", sessionHandle)));
}
+ @Test
+ public void testGetUnsupportedException() throws Exception {
+ try (HiveConnection connection = (HiveConnection) ENDPOINT_EXTENSION.getConnection();
+ HiveStatement statement = (HiveStatement) connection.createStatement()) {
+ assertThatThrownBy(() -> connection.renewDelegationToken("TokenMessage"))
+ .satisfies(
+ anyCauseMatches(
+ "The HiveServer2 Endpoint currently doesn't support to RenewDelegationToken."));
+ assertThatThrownBy(() -> connection.cancelDelegationToken("TokenMessage"))
+ .satisfies(
+ anyCauseMatches(
+ "The HiveServer2 Endpoint currently doesn't support to CancelDelegationToken."));
+ assertThatThrownBy(() -> connection.getDelegationToken("Flink", "TokenMessage"))
+ .satisfies(
+ anyCauseMatches(
+ "The HiveServer2 Endpoint currently doesn't support to GetDelegationToken."));
+ assertThatThrownBy(
+ () ->
+ connection
+ .getMetaData()
+ .getCrossReference(
+ "hive",
+ "schema",
+ "table",
+ "default_catalog",
+ "default_database",
+ "table"))
+ .satisfies(
+ anyCauseMatches(
+ "The HiveServer2 Endpoint currently doesn't support to GetCrossReference."));
+ assertThatThrownBy(
+ () -> {
+ statement.execute("SHOW TABLES");
+ statement.getQueryLog();
+ })
+ .satisfies(
+ anyCauseMatches(
+ "The HiveServer2 endpoint currently doesn't support to fetch logs."));
+ }
+ }
+
@Test
public void testCancelOperation() throws Exception {
runOperationRequest(
@@ -201,7 +244,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
.getOperationInfo(
sessionHandle, operationHandle))
.satisfies(
- FlinkAssertions.anyCauseMatches(
+ anyCauseMatches(
SqlGatewayException.class,
String.format(
"Can not find the submitted operation in the OperationManager with the %s",