You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/08 12:13:19 UTC
[flink] branch master updated: [FLINK-28793][sql-gateway][hive] Allow to GetInfo in the HiveServer2 Endpoint
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ac0bbe608f5 [FLINK-28793][sql-gateway][hive] Allow to GetInfo in the HiveServer2 Endpoint
ac0bbe608f5 is described below
commit ac0bbe608f56fc9514191e7a9da6dc76d54047b6
Author: zhaoweinan <32...@qq.com>
AuthorDate: Wed Aug 3 17:34:51 2022 +0800
[FLINK-28793][sql-gateway][hive] Allow to GetInfo in the HiveServer2 Endpoint
This closes #20444
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 29 +++++++++++++++--
.../endpoint/hive/HiveServer2EndpointITCase.java | 12 +++++++
.../flink/table/gateway/api/SqlGatewayService.java | 12 +++++++
.../table/gateway/api/results/GatewayInfo.java | 38 ++++++++++++++++++++++
.../gateway/api/utils/MockedSqlGatewayService.java | 6 ++++
.../gateway/service/SqlGatewayServiceImpl.java | 6 ++++
6 files changed, 101 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index bcae8c73a42..edb5f1cebe9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.results.GatewayInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
@@ -68,6 +69,7 @@ import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
import org.apache.hive.service.rpc.thrift.TGetFunctionsResp;
import org.apache.hive.service.rpc.thrift.TGetInfoReq;
import org.apache.hive.service.rpc.thrift.TGetInfoResp;
+import org.apache.hive.service.rpc.thrift.TGetInfoValue;
import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
@@ -349,7 +351,31 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public TGetInfoResp GetInfo(TGetInfoReq tGetInfoReq) throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TGetInfoResp resp = new TGetInfoResp();
+ try {
+ GatewayInfo info = service.getGatewayInfo();
+ TGetInfoValue tInfoValue;
+ switch (tGetInfoReq.getInfoType()) {
+ case CLI_SERVER_NAME:
+ case CLI_DBMS_NAME:
+ tInfoValue = TGetInfoValue.stringValue(info.getProductName());
+ break;
+ case CLI_DBMS_VER:
+ tInfoValue = TGetInfoValue.stringValue(info.getVersion().toString());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unrecognized TGetInfoType value: %s.",
+ tGetInfoReq.getInfoType()));
+ }
+ resp.setStatus(OK_STATUS);
+ resp.setInfoValue(tInfoValue);
+ } catch (Throwable t) {
+ LOG.error("Failed to GetInfo.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
@@ -658,7 +684,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
}
// CHECKSTYLE.OFF: MethodName
-
/** To be compatible with Hive3, add a default implementation. */
public TGetQueryIdResp GetQueryId(TGetQueryIdReq tGetQueryIdReq) throws TException {
throw new UnsupportedOperationException(ERROR_MESSAGE);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 120d3786b26..9a82e1d5910 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.endpoint.hive;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.SqlDialect;
@@ -63,6 +64,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import java.net.InetAddress;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.AbstractMap;
@@ -442,6 +444,16 @@ public class HiveServer2EndpointITCase extends TestLogger {
"INTERVAL_DAY_TIME")));
}
+ @Test
+ public void testGetInfo() throws Exception {
+ try (Connection connection = ENDPOINT_EXTENSION.getConnection()) {
+ DatabaseMetaData metaData = connection.getMetaData();
+ assertThat(metaData.getDatabaseProductName()).isEqualTo("Apache Flink");
+ assertThat(metaData.getDatabaseProductVersion())
+ .isEqualTo(FlinkVersion.current().toString());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
private Connection getInitializedConnection() throws Exception {
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index b1e832c07b7..1fd7e339c8d 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.GatewayInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -232,4 +233,15 @@ public interface SqlGatewayService {
String databaseName,
Set<TableKind> tableKinds)
throws SqlGatewayException;
+
+ // -------------------------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------------------------
+
+ /**
+ * Get the info about the {@link SqlGatewayService}.
+ *
+ * @return Returns gateway info.
+ */
+ GatewayInfo getGatewayInfo();
}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/GatewayInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/GatewayInfo.java
new file mode 100644
index 00000000000..a1412a78e35
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/GatewayInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+
+/** Info to describe the {@link SqlGatewayService}. */
+@PublicEvolving
+public class GatewayInfo {
+
+ public static final GatewayInfo INSTANCE = new GatewayInfo();
+
+ public String getProductName() {
+ return "Apache Flink";
+ }
+
+ public FlinkVersion getVersion() {
+ return FlinkVersion.current();
+ }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index aff8fd6e512..8f0b914c223 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.GatewayInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -141,4 +142,9 @@ public class MockedSqlGatewayService implements SqlGatewayService {
throws SqlGatewayException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public GatewayInfo getGatewayInfo() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 174918270ec..3b39430301b 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.GatewayInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -251,6 +252,11 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
}
}
+ @Override
+ public GatewayInfo getGatewayInfo() {
+ return GatewayInfo.INSTANCE;
+ }
+
@VisibleForTesting
Session getSession(SessionHandle sessionHandle) {
return sessionManager.getSession(sessionHandle);