You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/08 12:13:19 UTC

[flink] branch master updated: [FLINK-28793][sql-gateway][hive] Allow to GetInfo in the HiveServer2 Endpoint

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ac0bbe608f5 [FLINK-28793][sql-gateway][hive] Allow to GetInfo in the HiveServer2 Endpoint
ac0bbe608f5 is described below

commit ac0bbe608f56fc9514191e7a9da6dc76d54047b6
Author: zhaoweinan <32...@qq.com>
AuthorDate: Wed Aug 3 17:34:51 2022 +0800

    [FLINK-28793][sql-gateway][hive] Allow to GetInfo in the HiveServer2 Endpoint
    
    This closes #20444
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 29 +++++++++++++++--
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 12 +++++++
 .../flink/table/gateway/api/SqlGatewayService.java | 12 +++++++
 .../table/gateway/api/results/GatewayInfo.java     | 38 ++++++++++++++++++++++
 .../gateway/api/utils/MockedSqlGatewayService.java |  6 ++++
 .../gateway/service/SqlGatewayServiceImpl.java     |  6 ++++
 6 files changed, 101 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index bcae8c73a42..edb5f1cebe9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.results.GatewayInfo;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
@@ -68,6 +69,7 @@ import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
 import org.apache.hive.service.rpc.thrift.TGetFunctionsResp;
 import org.apache.hive.service.rpc.thrift.TGetInfoReq;
 import org.apache.hive.service.rpc.thrift.TGetInfoResp;
+import org.apache.hive.service.rpc.thrift.TGetInfoValue;
 import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
 import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
 import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
@@ -349,7 +351,31 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
     @Override
     public TGetInfoResp GetInfo(TGetInfoReq tGetInfoReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetInfoResp resp = new TGetInfoResp();
+        try {
+            GatewayInfo info = service.getGatewayInfo();
+            TGetInfoValue tInfoValue;
+            switch (tGetInfoReq.getInfoType()) {
+                case CLI_SERVER_NAME:
+                case CLI_DBMS_NAME:
+                    tInfoValue = TGetInfoValue.stringValue(info.getProductName());
+                    break;
+                case CLI_DBMS_VER:
+                    tInfoValue = TGetInfoValue.stringValue(info.getVersion().toString());
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Unrecognized TGetInfoType value: %s.",
+                                    tGetInfoReq.getInfoType()));
+            }
+            resp.setStatus(OK_STATUS);
+            resp.setInfoValue(tInfoValue);
+        } catch (Throwable t) {
+            LOG.error("Failed to GetInfo.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
@@ -658,7 +684,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     }
 
     // CHECKSTYLE.OFF: MethodName
-
     /** To be compatible with Hive3, add a default implementation. */
     public TGetQueryIdResp GetQueryId(TGetQueryIdReq tGetQueryIdReq) throws TException {
         throw new UnsupportedOperationException(ERROR_MESSAGE);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 120d3786b26..9a82e1d5910 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.endpoint.hive;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.SqlDialect;
@@ -63,6 +64,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.net.InetAddress;
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.ResultSetMetaData;
 import java.sql.Statement;
 import java.util.AbstractMap;
@@ -442,6 +444,16 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                                 "INTERVAL_DAY_TIME")));
     }
 
+    @Test
+    public void testGetInfo() throws Exception {
+        try (Connection connection = ENDPOINT_EXTENSION.getConnection()) {
+            DatabaseMetaData metaData = connection.getMetaData();
+            assertThat(metaData.getDatabaseProductName()).isEqualTo("Apache Flink");
+            assertThat(metaData.getDatabaseProductVersion())
+                    .isEqualTo(FlinkVersion.current().toString());
+        }
+    }
+
     // --------------------------------------------------------------------------------------------
 
     private Connection getInitializedConnection() throws Exception {
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index b1e832c07b7..1fd7e339c8d 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.GatewayInfo;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -232,4 +233,15 @@ public interface SqlGatewayService {
             String databaseName,
             Set<TableKind> tableKinds)
             throws SqlGatewayException;
+
+    // -------------------------------------------------------------------------------------------
+    // Utilities
+    // -------------------------------------------------------------------------------------------
+
+    /**
+     * Get the info about the {@link SqlGatewayService}.
+     *
+     * @return Returns gateway info.
+     */
+    GatewayInfo getGatewayInfo();
 }
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/GatewayInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/GatewayInfo.java
new file mode 100644
index 00000000000..a1412a78e35
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/GatewayInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+
+/** Info to describe the {@link SqlGatewayService}. */
+@PublicEvolving
+public class GatewayInfo {
+
+    public static final GatewayInfo INSTANCE = new GatewayInfo();
+
+    public String getProductName() {
+        return "Apache Flink";
+    }
+
+    public FlinkVersion getVersion() {
+        return FlinkVersion.current();
+    }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index aff8fd6e512..8f0b914c223 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.GatewayInfo;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -141,4 +142,9 @@ public class MockedSqlGatewayService implements SqlGatewayService {
             throws SqlGatewayException {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public GatewayInfo getGatewayInfo() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 174918270ec..3b39430301b 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
+import org.apache.flink.table.gateway.api.results.GatewayInfo;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -251,6 +252,11 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
         }
     }
 
+    @Override
+    public GatewayInfo getGatewayInfo() {
+        return GatewayInfo.INSTANCE;
+    }
+
     @VisibleForTesting
     Session getSession(SessionHandle sessionHandle) {
         return sessionManager.getSession(sessionHandle);