You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/20 09:22:53 UTC

[GitHub] [flink] luoyuxia commented on a diff in pull request #20101: [FLINK-28150][sql-gateway][hive] Introduce the hiveserver2 endpoint and factory

luoyuxia commented on code in PR #20101:
URL: https://github.com/apache/flink/pull/20101#discussion_r922086709


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive.util;
+
+import org.apache.flink.table.gateway.api.HandleIdentifier;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hive.service.rpc.thrift.THandleIdentifier;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/** Conversion between thrift object and flink object. */
+public class ThriftObjectConversions {
+
+    public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) {
+        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
+    }
+
+    public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) {
+        return new SessionHandle(toHandleIdentifier(tSessionHandle.getSessionId()));
+    }
+
+    public static TStatus toTStatus(Throwable t) {
+        TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS);
+        String errMsg = ExceptionUtils.stringifyException(t);
+        tStatus.setErrorMessage(errMsg);
+        tStatus.setInfoMessages(toString(t));
+        return tStatus;
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static THandleIdentifier toTHandleIdentifier(HandleIdentifier identifier) {
+        byte[] guid = new byte[16];
+        byte[] secret = new byte[16];
+        ByteBuffer guidBB = ByteBuffer.wrap(guid);
+        ByteBuffer secretBB = ByteBuffer.wrap(secret);
+
+        guidBB.putLong(identifier.getPublicId().getMostSignificantBits());
+        guidBB.putLong(identifier.getPublicId().getLeastSignificantBits());
+        secretBB.putLong(identifier.getSecretId().getMostSignificantBits());
+        secretBB.putLong(identifier.getSecretId().getLeastSignificantBits());
+        return new THandleIdentifier(ByteBuffer.wrap(guid), ByteBuffer.wrap(secret));
+    }
+
+    private static HandleIdentifier toHandleIdentifier(THandleIdentifier tHandleId) {
+        ByteBuffer bb = ByteBuffer.wrap(tHandleId.getGuid());
+        UUID publicId = new UUID(bb.getLong(), bb.getLong());
+        bb = ByteBuffer.wrap(tHandleId.getSecret());
+        UUID secretId = new UUID(bb.getLong(), bb.getLong());
+        return new HandleIdentifier(publicId, secretId);
+    }
+
+    /**
+     * Converts a {@link Throwable} object into a flattened list of texts including its stack trace
+     * and the stack traces of the nested causes.
+     *
+     * @param ex a {@link Throwable} object
+     * @return a flattened list of texts including the {@link Throwable} object's stack trace and
+     *     the stack traces of the nested causes.
+     */
+    private static List<String> toString(Throwable ex) {
+        return toString(ex, null);
+    }
+
+    private static List<String> toString(Throwable cause, StackTraceElement[] parent) {
+        StackTraceElement[] trace = cause.getStackTrace();
+        int m = trace.length - 1;
+        if (parent != null) {
+            int n = parent.length - 1;
+            while (m >= 0 && n >= 0 && trace[m].equals(parent[n])) {
+                m--;
+                n--;
+            }
+        }
+        List<String> detail = enroll(cause, trace, m);
+        cause = cause.getCause();
+        if (cause != null) {
+            detail.addAll(toString(cause, trace));
+        }
+        return detail;
+    }
+
+    private static List<String> enroll(Throwable ex, StackTraceElement[] trace, int max) {
+        List<String> details = new ArrayList<String>();

Review Comment:
   nit:
   ```suggestion
           List<String> details = new ArrayList<>();
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive.util;
+
+import org.apache.flink.table.gateway.api.HandleIdentifier;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hive.service.rpc.thrift.THandleIdentifier;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/** Conversion between thrift object and flink object. */
+public class ThriftObjectConversions {
+
+    public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) {
+        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
+    }
+
+    public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) {
+        return new SessionHandle(toHandleIdentifier(tSessionHandle.getSessionId()));
+    }
+
+    public static TStatus toTStatus(Throwable t) {
+        TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS);
+        String errMsg = ExceptionUtils.stringifyException(t);
+        tStatus.setErrorMessage(errMsg);

Review Comment:
   Is the `errorMessage`and `InfoMessages` redundant here since both of them actually contain the all messages from  error stacktrace?
   In Hive, the `errorMessage` only contains detailMessage the instead of  whole stacktrace.
   Should we keep consistent with Hive?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.api.utils.ThreadUtils;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.hive.HiveModule;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
+import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
+import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsReq;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsResp;
+import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
+import org.apache.hive.service.rpc.thrift.TGetColumnsResp;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceReq;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceResp;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsResp;
+import org.apache.hive.service.rpc.thrift.TGetInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetInfoResp;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysResp;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
+import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
+import org.apache.hive.service.rpc.thrift.TGetSchemasResp;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesReq;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesResp;
+import org.apache.hive.service.rpc.thrift.TGetTablesReq;
+import org.apache.hive.service.rpc.thrift.TGetTablesResp;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
+import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
+import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * HiveServer2 Endpoint that allows to accept the request from the hive client, e.g. Hive JDBC, Hive
+ * Beeline.
+ */
+public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoint, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveServer2Endpoint.class);
+    private static final HiveServer2EndpointVersion SERVER_VERSION = HIVE_CLI_SERVICE_PROTOCOL_V10;
+    private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
+    private static final String ERROR_MESSAGE =
+            "The HiveServer2 Endpoint currently doesn't support this API.";
+
+    // --------------------------------------------------------------------------------------------
+    // Server attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final SqlGatewayService service;
+    private final int minWorkerThreads;
+    private final int maxWorkerThreads;
+    private final Duration workerKeepAliveTime;
+    private final int requestTimeoutMs;
+    private final int backOffSlotLengthMs;
+    private final long maxMessageSize;
+    private final int port;
+
+    private final Thread serverThread = new Thread(this, "HiveServer2 Endpoint");
+    private ThreadPoolExecutor executor;
+    private TThreadPoolServer server;
+
+    // --------------------------------------------------------------------------------------------
+    // Catalog attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final String catalogName;
+    @Nullable private final String defaultDatabase;
+    @Nullable private final String hiveConfPath;
+    private final boolean allowEmbedded;
+
+    // --------------------------------------------------------------------------------------------
+    // Module attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final String moduleName;
+
+    public HiveServer2Endpoint(
+            SqlGatewayService service,
+            int port,
+            long maxMessageSize,
+            int requestTimeoutMs,
+            int backOffSlotLengthMs,
+            int minWorkerThreads,
+            int maxWorkerThreads,
+            Duration workerKeepAliveTime,
+            String catalogName,
+            @Nullable String hiveConfPath,
+            @Nullable String defaultDatabase,
+            String moduleName) {
+        this(
+                service,
+                port,
+                maxMessageSize,
+                requestTimeoutMs,
+                backOffSlotLengthMs,
+                minWorkerThreads,
+                maxWorkerThreads,
+                workerKeepAliveTime,
+                catalogName,
+                hiveConfPath,
+                defaultDatabase,
+                moduleName,
+                false);
+    }
+
+    @VisibleForTesting
+    public HiveServer2Endpoint(
+            SqlGatewayService service,
+            int port,
+            long maxMessageSize,
+            int requestTimeoutMs,
+            int backOffSlotLengthMs,
+            int minWorkerThreads,
+            int maxWorkerThreads,
+            Duration workerKeepAliveTime,
+            String catalogName,
+            @Nullable String hiveConfPath,
+            @Nullable String defaultDatabase,
+            String moduleName,
+            boolean allowEmbedded) {
+        this.service = service;
+
+        this.port = port;
+        this.maxMessageSize = maxMessageSize;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.backOffSlotLengthMs = backOffSlotLengthMs;
+        this.minWorkerThreads = minWorkerThreads;
+        this.maxWorkerThreads = maxWorkerThreads;
+        this.workerKeepAliveTime = checkNotNull(workerKeepAliveTime);
+
+        this.catalogName = checkNotNull(catalogName);
+        this.hiveConfPath = hiveConfPath;
+        this.defaultDatabase = defaultDatabase;
+        this.allowEmbedded = allowEmbedded;
+
+        this.moduleName = moduleName;
+    }
+
+    @Override
+    public void start() throws Exception {
+        initialize();
+        serverThread.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (server != null) {
+            server.stop();
+        }
+
+        if (executor != null) {
+            executor.shutdown();
+        }
+    }
+
+    @Override
+    public TOpenSessionResp OpenSession(TOpenSessionReq tOpenSessionReq) throws TException {
+        LOG.debug("Client protocol version: " + tOpenSessionReq.getClient_protocol());
+        TOpenSessionResp resp = new TOpenSessionResp();
+        try {
+            // negotiate connection protocol version
+            TProtocolVersion clientProtocol = tOpenSessionReq.getClient_protocol();
+            HiveServer2EndpointVersion sessionVersion =
+                    HiveServer2EndpointVersion.valueOf(
+                            TProtocolVersion.findByValue(
+                                    Math.min(
+                                            clientProtocol.getValue(),
+                                            SERVER_VERSION.getVersion().getValue())));
+
+            // prepare session environment
+            Map<String, String> originSessionConf =
+                    tOpenSessionReq.getConfiguration() == null
+                            ? Collections.emptyMap()
+                            : tOpenSessionReq.getConfiguration();
+            Map<String, String> sessionConfig = new HashMap<>();
+            sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name());

Review Comment:
    Hive dialect is the default dialect?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.api.utils.ThreadUtils;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.hive.HiveModule;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
+import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
+import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsReq;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsResp;
+import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
+import org.apache.hive.service.rpc.thrift.TGetColumnsResp;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceReq;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceResp;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsResp;
+import org.apache.hive.service.rpc.thrift.TGetInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetInfoResp;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysResp;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
+import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
+import org.apache.hive.service.rpc.thrift.TGetSchemasResp;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesReq;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesResp;
+import org.apache.hive.service.rpc.thrift.TGetTablesReq;
+import org.apache.hive.service.rpc.thrift.TGetTablesResp;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
+import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
+import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * HiveServer2 Endpoint that allows to accept the request from the hive client, e.g. Hive JDBC, Hive
+ * Beeline.
+ */
+public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoint, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveServer2Endpoint.class);
+    private static final HiveServer2EndpointVersion SERVER_VERSION = HIVE_CLI_SERVICE_PROTOCOL_V10;
+    private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
+    private static final String ERROR_MESSAGE =
+            "The HiveServer2 Endpoint currently doesn't support this API.";
+
+    // --------------------------------------------------------------------------------------------
+    // Server attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final SqlGatewayService service;
+    private final int minWorkerThreads;
+    private final int maxWorkerThreads;
+    private final Duration workerKeepAliveTime;
+    private final int requestTimeoutMs;
+    private final int backOffSlotLengthMs;
+    private final long maxMessageSize;
+    private final int port;
+
+    private final Thread serverThread = new Thread(this, "HiveServer2 Endpoint");
+    private ThreadPoolExecutor executor;
+    private TThreadPoolServer server;
+
+    // --------------------------------------------------------------------------------------------
+    // Catalog attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final String catalogName;
+    @Nullable private final String defaultDatabase;
+    @Nullable private final String hiveConfPath;
+    private final boolean allowEmbedded;
+
+    // --------------------------------------------------------------------------------------------
+    // Module attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final String moduleName;
+
+    public HiveServer2Endpoint(
+            SqlGatewayService service,
+            int port,
+            long maxMessageSize,
+            int requestTimeoutMs,
+            int backOffSlotLengthMs,
+            int minWorkerThreads,
+            int maxWorkerThreads,
+            Duration workerKeepAliveTime,
+            String catalogName,
+            @Nullable String hiveConfPath,
+            @Nullable String defaultDatabase,
+            String moduleName) {
+        this(
+                service,
+                port,
+                maxMessageSize,
+                requestTimeoutMs,
+                backOffSlotLengthMs,
+                minWorkerThreads,
+                maxWorkerThreads,
+                workerKeepAliveTime,
+                catalogName,
+                hiveConfPath,
+                defaultDatabase,
+                moduleName,
+                false);
+    }
+
+    @VisibleForTesting
+    public HiveServer2Endpoint(
+            SqlGatewayService service,
+            int port,
+            long maxMessageSize,
+            int requestTimeoutMs,
+            int backOffSlotLengthMs,
+            int minWorkerThreads,
+            int maxWorkerThreads,
+            Duration workerKeepAliveTime,
+            String catalogName,
+            @Nullable String hiveConfPath,
+            @Nullable String defaultDatabase,
+            String moduleName,
+            boolean allowEmbedded) {
+        this.service = service;
+
+        this.port = port;
+        this.maxMessageSize = maxMessageSize;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.backOffSlotLengthMs = backOffSlotLengthMs;
+        this.minWorkerThreads = minWorkerThreads;
+        this.maxWorkerThreads = maxWorkerThreads;
+        this.workerKeepAliveTime = checkNotNull(workerKeepAliveTime);
+
+        this.catalogName = checkNotNull(catalogName);
+        this.hiveConfPath = hiveConfPath;
+        this.defaultDatabase = defaultDatabase;
+        this.allowEmbedded = allowEmbedded;
+
+        this.moduleName = moduleName;
+    }
+
+    @Override
+    public void start() throws Exception {
+        initialize();
+        serverThread.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (server != null) {
+            server.stop();
+        }
+
+        if (executor != null) {
+            executor.shutdown();
+        }
+    }
+
+    @Override
+    public TOpenSessionResp OpenSession(TOpenSessionReq tOpenSessionReq) throws TException {
+        LOG.debug("Client protocol version: " + tOpenSessionReq.getClient_protocol());
+        TOpenSessionResp resp = new TOpenSessionResp();
+        try {
+            // negotiate connection protocol version
+            TProtocolVersion clientProtocol = tOpenSessionReq.getClient_protocol();
+            HiveServer2EndpointVersion sessionVersion =
+                    HiveServer2EndpointVersion.valueOf(
+                            TProtocolVersion.findByValue(

Review Comment:
   `TProtocolVersion.findByValue` may return null, then HiveServer2EndpointVersion.valueOf(null) will throw the exception will be the message `Unknown TProtocolVersion: null`, which may not user friendly.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointVersion.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+
+/** Mapping between {@link HiveServer2EndpointVersion} and {@link TProtocolVersion} in Hive. */
+@PublicEvolving
+public enum HiveServer2EndpointVersion implements EndpointVersion {
+    HIVE_CLI_SERVICE_PROTOCOL_V1(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1),
+
+    HIVE_CLI_SERVICE_PROTOCOL_V2(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2),
+
+    HIVE_CLI_SERVICE_PROTOCOL_V3(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V3),
+
+    HIVE_CLI_SERVICE_PROTOCOL_V4(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V4),
+
+    HIVE_CLI_SERVICE_PROTOCOL_V5(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V5),
+
+    HIVE_CLI_SERVICE_PROTOCOL_V6(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6),
+
+    HIVE_CLI_SERVICE_PROTOCOL_V7(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7),
+
+    HIVE_CLI_SERVICE_PROTOCOL_V8(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8),
+
+    HIVE_CLI_SERVICE_PROTOCOL_V9(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9),
+
+    HIVE_CLI_SERVICE_PROTOCOL_V10(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10);

Review Comment:
   The version for Hive3 will be up to `TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11`. But since miss it seems won't cause any problem, it's just a reminder.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java:
##########
@@ -0,0 +1,126 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
+import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.AbstractMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** ITCase for {@link HiveServer2Endpoint}. */
+public class HiveServer2EndpointITCase extends TestLogger {
+
+    @RegisterExtension
+    @Order(1)
+    public static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
+
+    @RegisterExtension
+    @Order(2)
+    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
+
+    @RegisterExtension
+    @Order(3)
+    public static final HiveServer2EndpointExtension ENDPOINT_EXTENSION =
+            new HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+    @Test
+    public void testOpenCloseJdbcConnection() throws Exception {
+        SessionManager sessionManager = SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager();
+        int originSessionCount = sessionManager.currentSessionCount();
+        try (Connection ignore = getConnection()) {
+            assertEquals(1 + originSessionCount, sessionManager.currentSessionCount());
+        }
+        assertEquals(originSessionCount, sessionManager.currentSessionCount());
+    }
+
+    @Test
+    public void testOpenSessionWithConfig() throws Exception {

Review Comment:
   Is it for open session with config? Seems we haven't set configuration in this test.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java:
##########
@@ -0,0 +1,126 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
+import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.AbstractMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** ITCase for {@link HiveServer2Endpoint}. */
+public class HiveServer2EndpointITCase extends TestLogger {
+
+    @RegisterExtension
+    @Order(1)
+    public static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
+
+    @RegisterExtension
+    @Order(2)
+    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
+
+    @RegisterExtension
+    @Order(3)
+    public static final HiveServer2EndpointExtension ENDPOINT_EXTENSION =
+            new HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+    @Test
+    public void testOpenCloseJdbcConnection() throws Exception {
+        SessionManager sessionManager = SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager();
+        int originSessionCount = sessionManager.currentSessionCount();
+        try (Connection ignore = getConnection()) {
+            assertEquals(1 + originSessionCount, sessionManager.currentSessionCount());

Review Comment:
   Seems  `assertEquals` should be `assertThat().isEquals()` since we are to use assertions in this [discuss](https://lists.apache.org/thread/33t7hz8w873p1bc5msppk65792z08rgt).  It'll be better to keep unification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org