You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/19 13:35:06 UTC

[GitHub] [flink] link3280 commented on a diff in pull request #20101: [FLINK-28150][sql-gateway][hive] Introduce the hiveserver2 endpoint and factory

link3280 commented on code in PR #20101:
URL: https://github.com/apache/flink/pull/20101#discussion_r924392099


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.DEFAULT_DATABASE;
+import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_CONF_DIR;
+
+/** Config Options for {@code HiveServer2Endpoint}. */
+@PublicEvolving
+public class HiveServer2EndpointConfigOptions {
+
+    // --------------------------------------------------------------------------------------------
+    // Server Options
+    // --------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<Integer> THRIFT_PORT =

Review Comment:
   Since these options are scoped to Hive endpoint, should we add a prefix like `hive-endpoint.`?



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java:
##########
@@ -115,8 +160,35 @@ public Builder addSessionConfig(Map<String, String> sessionConfig) {
             return this;
         }
 
+        public Builder registerCatalog(String catalogName, Catalog catalog) {
+            this.registeredCatalogs.put(catalogName, catalog);
+            return this;
+        }
+
+        public Builder setDefaultCatalog(@Nullable String defaultCatalog) {
+            this.defaultCatalog = defaultCatalog;
+            return this;
+        }
+
+        public Builder setDefaultDatabase(@Nullable String defaultDatabase) {
+            this.defaultDatabase = defaultDatabase;
+            return this;
+        }
+
+        public Builder registerModule(String moduleName, Module module) {
+            this.registeredModules.put(moduleName, module);

Review Comment:
   ditto



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java:
##########
@@ -115,8 +160,35 @@ public Builder addSessionConfig(Map<String, String> sessionConfig) {
             return this;
         }
 
+        public Builder registerCatalog(String catalogName, Catalog catalog) {
+            this.registeredCatalogs.put(catalogName, catalog);

Review Comment:
   Should we check if the given catalog name already exists?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -157,10 +148,39 @@ public synchronized void reset() {
     // Method to execute commands
     // --------------------------------------------------------------------------------------------
 
+    public void registerCatalog(String catalogName, Catalog catalog) {
+        sessionState.catalogManager.registerCatalog(catalogName, catalog);
+    }
+
+    public void registerModule(String moduleName, Module module) {
+        sessionState.moduleManager.loadModule(moduleName, module);
+    }
+
+    public void setCurrentCatalog(String catalog) {
+        sessionState.catalogManager.setCurrentCatalog(catalog);
+    }
+
+    public void setCurrentDatabase(String database) {
+        sessionState.catalogManager.setCurrentDatabase(database);
+    }
+
     public OperationExecutor createOperationExecutor(Configuration executionConfig) {
         return new OperationExecutor(this, executionConfig);
     }
 
+    /** Close resources, e.g. catalogs. */
+    public void close() {
+        operationManager.close();
+        for (String name : sessionState.catalogManager.listCatalogs()) {
+            sessionState.catalogManager.getCatalog(name).ifPresent(Catalog::close);

Review Comment:
   There might be a CatalogException when closing a catalog. I think it'd be better to wrap it with a try-catch.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.api.utils.ThreadUtils;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.hive.HiveModule;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
+import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
+import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsReq;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsResp;
+import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
+import org.apache.hive.service.rpc.thrift.TGetColumnsResp;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceReq;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceResp;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsResp;
+import org.apache.hive.service.rpc.thrift.TGetInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetInfoResp;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysResp;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
+import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
+import org.apache.hive.service.rpc.thrift.TGetSchemasResp;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesReq;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesResp;
+import org.apache.hive.service.rpc.thrift.TGetTablesReq;
+import org.apache.hive.service.rpc.thrift.TGetTablesResp;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
+import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
+import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * HiveServer2 Endpoint that allows to accept the request from the hive client, e.g. Hive JDBC, Hive
+ * Beeline.
+ */
+public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoint, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveServer2Endpoint.class);
+    private static final HiveServer2EndpointVersion SERVER_VERSION = HIVE_CLI_SERVICE_PROTOCOL_V10;
+    private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
+    private static final String ERROR_MESSAGE =
+            "The HiveServer2 Endpoint currently doesn't support this API.";
+
+    // --------------------------------------------------------------------------------------------
+    // Server attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final SqlGatewayService service;
+    private final int minWorkerThreads;
+    private final int maxWorkerThreads;
+    private final Duration workerKeepAliveTime;
+    private final int requestTimeoutMs;
+    private final int backOffSlotLengthMs;
+    private final long maxMessageSize;
+    private final int port;
+
+    private final Thread serverThread = new Thread(this, "HiveServer2 Endpoint");
+    private ThreadPoolExecutor executor;
+    private TThreadPoolServer server;
+
+    // --------------------------------------------------------------------------------------------
+    // Catalog attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final String catalogName;
+    @Nullable private final String defaultDatabase;
+    @Nullable private final String hiveConfPath;
+    private final boolean allowEmbedded;
+
+    // --------------------------------------------------------------------------------------------
+    // Module attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final String moduleName;
+
+    public HiveServer2Endpoint(
+            SqlGatewayService service,
+            int port,
+            long maxMessageSize,
+            int requestTimeoutMs,
+            int backOffSlotLengthMs,
+            int minWorkerThreads,
+            int maxWorkerThreads,
+            Duration workerKeepAliveTime,
+            String catalogName,
+            @Nullable String hiveConfPath,
+            @Nullable String defaultDatabase,
+            String moduleName) {
+        this(
+                service,
+                port,
+                maxMessageSize,
+                requestTimeoutMs,
+                backOffSlotLengthMs,
+                minWorkerThreads,
+                maxWorkerThreads,
+                workerKeepAliveTime,
+                catalogName,
+                hiveConfPath,
+                defaultDatabase,
+                moduleName,
+                false);
+    }
+
+    @VisibleForTesting
+    public HiveServer2Endpoint(
+            SqlGatewayService service,
+            int port,
+            long maxMessageSize,
+            int requestTimeoutMs,
+            int backOffSlotLengthMs,
+            int minWorkerThreads,
+            int maxWorkerThreads,
+            Duration workerKeepAliveTime,
+            String catalogName,
+            @Nullable String hiveConfPath,
+            @Nullable String defaultDatabase,
+            String moduleName,
+            boolean allowEmbedded) {
+        this.service = service;
+
+        this.port = port;
+        this.maxMessageSize = maxMessageSize;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.backOffSlotLengthMs = backOffSlotLengthMs;
+        this.minWorkerThreads = minWorkerThreads;
+        this.maxWorkerThreads = maxWorkerThreads;
+        this.workerKeepAliveTime = checkNotNull(workerKeepAliveTime);
+
+        this.catalogName = checkNotNull(catalogName);
+        this.hiveConfPath = hiveConfPath;
+        this.defaultDatabase = defaultDatabase;
+        this.allowEmbedded = allowEmbedded;
+
+        this.moduleName = moduleName;
+    }
+
+    @Override
+    public void start() throws Exception {
+        initialize();
+        serverThread.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (server != null) {
+            server.stop();
+        }
+
+        if (executor != null) {
+            executor.shutdown();
+        }
+    }
+
+    @Override
+    public TOpenSessionResp OpenSession(TOpenSessionReq tOpenSessionReq) throws TException {
+        LOG.debug("Client protocol version: " + tOpenSessionReq.getClient_protocol());
+        TOpenSessionResp resp = new TOpenSessionResp();
+        try {
+            // negotiate connection protocol version
+            TProtocolVersion clientProtocol = tOpenSessionReq.getClient_protocol();
+            HiveServer2EndpointVersion sessionVersion =
+                    HiveServer2EndpointVersion.valueOf(
+                            TProtocolVersion.findByValue(
+                                    Math.min(
+                                            clientProtocol.getValue(),
+                                            SERVER_VERSION.getVersion().getValue())));
+
+            // prepare session environment
+            Map<String, String> originSessionConf =
+                    tOpenSessionReq.getConfiguration() == null
+                            ? Collections.emptyMap()
+                            : tOpenSessionReq.getConfiguration();
+            Map<String, String> sessionConfig = new HashMap<>();
+            sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name());
+            sessionConfig.putAll(validateAndNormalize(originSessionConf));
+
+            HiveConf conf = HiveCatalog.createHiveConf(hiveConfPath, null);
+            sessionConfig.forEach(conf::set);
+            Catalog hiveCatalog =
+                    new HiveCatalog(
+                            catalogName,
+                            defaultDatabase,
+                            conf,
+                            HiveShimLoader.getHiveVersion(),
+                            allowEmbedded);
+            Module hiveModule = new HiveModule();
+            SessionHandle sessionHandle =
+                    service.openSession(
+                            SessionEnvironment.newBuilder()
+                                    .setSessionEndpointVersion(sessionVersion)
+                                    .registerCatalog(catalogName, hiveCatalog)
+                                    .registerModule(moduleName, hiveModule)
+                                    .setDefaultCatalog(catalogName)
+                                    .setDefaultDatabase(
+                                            getUsedDefaultDatabase(originSessionConf).orElse(null))
+                                    .addSessionConfig(sessionConfig)
+                                    .build());
+            // response
+            resp.setStatus(OK_STATUS);
+            resp.setServerProtocolVersion(sessionVersion.getVersion());
+            resp.setSessionHandle(toTSessionHandle(sessionHandle));
+            resp.setConfiguration(service.getSessionConfig(sessionHandle));
+        } catch (Exception e) {
+            LOG.error("Failed to openSession.", e);
+            resp.setStatus(toTStatus(e));
+        }
+        return resp;
+    }
+
+    @Override
+    public TCloseSessionResp CloseSession(TCloseSessionReq tCloseSessionReq) throws TException {
+        TCloseSessionResp resp = new TCloseSessionResp();
+        try {
+            SessionHandle sessionHandle = toSessionHandle(tCloseSessionReq.getSessionHandle());
+            service.closeSession(sessionHandle);
+            resp.setStatus(OK_STATUS);
+        } catch (Throwable t) {
+            LOG.warn("Error closing session: ", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
+    }
+
+    @Override
+    public TGetInfoResp GetInfo(TGetInfoReq tGetInfoReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq tExecuteStatementReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq tGetTypeInfoReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetCatalogsResp GetCatalogs(TGetCatalogsReq tGetCatalogsReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetSchemasResp GetSchemas(TGetSchemasReq tGetSchemasReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetTablesResp GetTables(TGetTablesReq tGetTablesReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetTableTypesResp GetTableTypes(TGetTableTypesReq tGetTableTypesReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetColumnsResp GetColumns(TGetColumnsReq tGetColumnsReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetFunctionsResp GetFunctions(TGetFunctionsReq tGetFunctionsReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetPrimaryKeysResp GetPrimaryKeys(TGetPrimaryKeysReq tGetPrimaryKeysReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq tGetCrossReferenceReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq tGetOperationStatusReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TCancelOperationResp CancelOperation(TCancelOperationReq tCancelOperationReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TCloseOperationResp CloseOperation(TCloseOperationReq tCloseOperationReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetResultSetMetadataResp GetResultSetMetadata(
+            TGetResultSetMetadataReq tGetResultSetMetadataReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TFetchResultsResp FetchResults(TFetchResultsReq tFetchResultsReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq tGetDelegationTokenReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TCancelDelegationTokenResp CancelDelegationToken(
+            TCancelDelegationTokenReq tCancelDelegationTokenReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TRenewDelegationTokenResp RenewDelegationToken(
+            TRenewDelegationTokenReq tRenewDelegationTokenReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof HiveServer2Endpoint)) {
+            return false;
+        }
+        HiveServer2Endpoint that = (HiveServer2Endpoint) o;
+        return minWorkerThreads == that.minWorkerThreads
+                && maxWorkerThreads == that.maxWorkerThreads
+                && requestTimeoutMs == that.requestTimeoutMs
+                && backOffSlotLengthMs == that.backOffSlotLengthMs
+                && maxMessageSize == that.maxMessageSize
+                && port == that.port
+                && Objects.equals(workerKeepAliveTime, that.workerKeepAliveTime)
+                && Objects.equals(catalogName, that.catalogName)
+                && Objects.equals(defaultDatabase, that.defaultDatabase)
+                && Objects.equals(hiveConfPath, that.hiveConfPath)
+                && Objects.equals(moduleName, that.moduleName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                minWorkerThreads,
+                maxWorkerThreads,
+                workerKeepAliveTime,
+                requestTimeoutMs,
+                backOffSlotLengthMs,
+                maxMessageSize,
+                port,
+                catalogName,
+                defaultDatabase,
+                hiveConfPath,
+                moduleName);
+    }
+
+    @Override
+    public void run() {
+        try {
+            server.serve();
+        } catch (Throwable t) {
+            LOG.info("Exception caught by " + this.getClass().getSimpleName() + ". Exiting.", t);

Review Comment:
   I think this should be at least error level.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.api.utils.ThreadUtils;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.hive.HiveModule;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
+import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
+import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsReq;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsResp;
+import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
+import org.apache.hive.service.rpc.thrift.TGetColumnsResp;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceReq;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceResp;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsResp;
+import org.apache.hive.service.rpc.thrift.TGetInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetInfoResp;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysResp;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
+import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
+import org.apache.hive.service.rpc.thrift.TGetSchemasResp;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesReq;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesResp;
+import org.apache.hive.service.rpc.thrift.TGetTablesReq;
+import org.apache.hive.service.rpc.thrift.TGetTablesResp;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
+import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
+import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * HiveServer2 Endpoint that allows to accept the request from the hive client, e.g. Hive JDBC, Hive
+ * Beeline.
+ */
+public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoint, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveServer2Endpoint.class);
+    private static final HiveServer2EndpointVersion SERVER_VERSION = HIVE_CLI_SERVICE_PROTOCOL_V10;
+    private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
+    private static final String ERROR_MESSAGE =
+            "The HiveServer2 Endpoint currently doesn't support this API.";
+
+    // --------------------------------------------------------------------------------------------
+    // Server attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final SqlGatewayService service;
+    private final int minWorkerThreads;
+    private final int maxWorkerThreads;
+    private final Duration workerKeepAliveTime;
+    private final int requestTimeoutMs;
+    private final int backOffSlotLengthMs;
+    private final long maxMessageSize;
+    private final int port;
+
+    private final Thread serverThread = new Thread(this, "HiveServer2 Endpoint");
+    private ThreadPoolExecutor executor;
+    private TThreadPoolServer server;
+
+    // --------------------------------------------------------------------------------------------
+    // Catalog attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final String catalogName;
+    @Nullable private final String defaultDatabase;
+    @Nullable private final String hiveConfPath;
+    private final boolean allowEmbedded;
+
+    // --------------------------------------------------------------------------------------------
+    // Module attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final String moduleName;
+
+    public HiveServer2Endpoint(
+            SqlGatewayService service,
+            int port,
+            long maxMessageSize,
+            int requestTimeoutMs,
+            int backOffSlotLengthMs,
+            int minWorkerThreads,
+            int maxWorkerThreads,
+            Duration workerKeepAliveTime,
+            String catalogName,
+            @Nullable String hiveConfPath,
+            @Nullable String defaultDatabase,
+            String moduleName) {
+        this(
+                service,
+                port,
+                maxMessageSize,
+                requestTimeoutMs,
+                backOffSlotLengthMs,
+                minWorkerThreads,
+                maxWorkerThreads,
+                workerKeepAliveTime,
+                catalogName,
+                hiveConfPath,
+                defaultDatabase,
+                moduleName,
+                false);
+    }
+
+    @VisibleForTesting
+    public HiveServer2Endpoint(
+            SqlGatewayService service,
+            int port,
+            long maxMessageSize,
+            int requestTimeoutMs,
+            int backOffSlotLengthMs,
+            int minWorkerThreads,
+            int maxWorkerThreads,
+            Duration workerKeepAliveTime,
+            String catalogName,
+            @Nullable String hiveConfPath,
+            @Nullable String defaultDatabase,
+            String moduleName,
+            boolean allowEmbedded) {
+        this.service = service;
+
+        this.port = port;
+        this.maxMessageSize = maxMessageSize;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.backOffSlotLengthMs = backOffSlotLengthMs;
+        this.minWorkerThreads = minWorkerThreads;
+        this.maxWorkerThreads = maxWorkerThreads;
+        this.workerKeepAliveTime = checkNotNull(workerKeepAliveTime);
+
+        this.catalogName = checkNotNull(catalogName);
+        this.hiveConfPath = hiveConfPath;
+        this.defaultDatabase = defaultDatabase;
+        this.allowEmbedded = allowEmbedded;
+
+        this.moduleName = moduleName;
+    }
+
+    @Override
+    public void start() throws Exception {
+        initialize();
+        serverThread.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (server != null) {
+            server.stop();
+        }
+
+        if (executor != null) {
+            executor.shutdown();
+        }
+    }
+
+    @Override
+    public TOpenSessionResp OpenSession(TOpenSessionReq tOpenSessionReq) throws TException {
+        LOG.debug("Client protocol version: " + tOpenSessionReq.getClient_protocol());
+        TOpenSessionResp resp = new TOpenSessionResp();
+        try {
+            // negotiate connection protocol version
+            TProtocolVersion clientProtocol = tOpenSessionReq.getClient_protocol();
+            HiveServer2EndpointVersion sessionVersion =
+                    HiveServer2EndpointVersion.valueOf(
+                            TProtocolVersion.findByValue(
+                                    Math.min(
+                                            clientProtocol.getValue(),
+                                            SERVER_VERSION.getVersion().getValue())));
+
+            // prepare session environment
+            Map<String, String> originSessionConf =
+                    tOpenSessionReq.getConfiguration() == null
+                            ? Collections.emptyMap()
+                            : tOpenSessionReq.getConfiguration();
+            Map<String, String> sessionConfig = new HashMap<>();
+            sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name());
+            sessionConfig.putAll(validateAndNormalize(originSessionConf));
+
+            HiveConf conf = HiveCatalog.createHiveConf(hiveConfPath, null);
+            sessionConfig.forEach(conf::set);
+            Catalog hiveCatalog =
+                    new HiveCatalog(
+                            catalogName,
+                            defaultDatabase,
+                            conf,
+                            HiveShimLoader.getHiveVersion(),
+                            allowEmbedded);
+            Module hiveModule = new HiveModule();
+            SessionHandle sessionHandle =
+                    service.openSession(
+                            SessionEnvironment.newBuilder()
+                                    .setSessionEndpointVersion(sessionVersion)
+                                    .registerCatalog(catalogName, hiveCatalog)
+                                    .registerModule(moduleName, hiveModule)
+                                    .setDefaultCatalog(catalogName)
+                                    .setDefaultDatabase(
+                                            getUsedDefaultDatabase(originSessionConf).orElse(null))
+                                    .addSessionConfig(sessionConfig)
+                                    .build());
+            // response
+            resp.setStatus(OK_STATUS);
+            resp.setServerProtocolVersion(sessionVersion.getVersion());
+            resp.setSessionHandle(toTSessionHandle(sessionHandle));
+            resp.setConfiguration(service.getSessionConfig(sessionHandle));
+        } catch (Exception e) {
+            LOG.error("Failed to openSession.", e);
+            resp.setStatus(toTStatus(e));
+        }
+        return resp;
+    }
+
+    @Override
+    public TCloseSessionResp CloseSession(TCloseSessionReq tCloseSessionReq) throws TException {
+        TCloseSessionResp resp = new TCloseSessionResp();
+        try {
+            SessionHandle sessionHandle = toSessionHandle(tCloseSessionReq.getSessionHandle());
+            service.closeSession(sessionHandle);
+            resp.setStatus(OK_STATUS);
+        } catch (Throwable t) {
+            LOG.warn("Error closing session: ", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
+    }
+
+    @Override
+    public TGetInfoResp GetInfo(TGetInfoReq tGetInfoReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq tExecuteStatementReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq tGetTypeInfoReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetCatalogsResp GetCatalogs(TGetCatalogsReq tGetCatalogsReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetSchemasResp GetSchemas(TGetSchemasReq tGetSchemasReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetTablesResp GetTables(TGetTablesReq tGetTablesReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetTableTypesResp GetTableTypes(TGetTableTypesReq tGetTableTypesReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetColumnsResp GetColumns(TGetColumnsReq tGetColumnsReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetFunctionsResp GetFunctions(TGetFunctionsReq tGetFunctionsReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetPrimaryKeysResp GetPrimaryKeys(TGetPrimaryKeysReq tGetPrimaryKeysReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq tGetCrossReferenceReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq tGetOperationStatusReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TCancelOperationResp CancelOperation(TCancelOperationReq tCancelOperationReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TCloseOperationResp CloseOperation(TCloseOperationReq tCloseOperationReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetResultSetMetadataResp GetResultSetMetadata(
+            TGetResultSetMetadataReq tGetResultSetMetadataReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TFetchResultsResp FetchResults(TFetchResultsReq tFetchResultsReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq tGetDelegationTokenReq)
+            throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TCancelDelegationTokenResp CancelDelegationToken(
+            TCancelDelegationTokenReq tCancelDelegationTokenReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public TRenewDelegationTokenResp RenewDelegationToken(
+            TRenewDelegationTokenReq tRenewDelegationTokenReq) throws TException {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof HiveServer2Endpoint)) {
+            return false;
+        }
+        HiveServer2Endpoint that = (HiveServer2Endpoint) o;
+        return minWorkerThreads == that.minWorkerThreads
+                && maxWorkerThreads == that.maxWorkerThreads
+                && requestTimeoutMs == that.requestTimeoutMs
+                && backOffSlotLengthMs == that.backOffSlotLengthMs
+                && maxMessageSize == that.maxMessageSize
+                && port == that.port
+                && Objects.equals(workerKeepAliveTime, that.workerKeepAliveTime)
+                && Objects.equals(catalogName, that.catalogName)
+                && Objects.equals(defaultDatabase, that.defaultDatabase)
+                && Objects.equals(hiveConfPath, that.hiveConfPath)
+                && Objects.equals(moduleName, that.moduleName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                minWorkerThreads,
+                maxWorkerThreads,
+                workerKeepAliveTime,
+                requestTimeoutMs,
+                backOffSlotLengthMs,
+                maxMessageSize,
+                port,
+                catalogName,
+                defaultDatabase,
+                hiveConfPath,
+                moduleName);
+    }
+
+    @Override
+    public void run() {
+        try {
+            server.serve();
+        } catch (Throwable t) {
+            LOG.info("Exception caught by " + this.getClass().getSimpleName() + ". Exiting.", t);
+        }
+    }
+
+    private void initialize() {
+        executor =
+                ThreadUtils.newThreadPool(
+                        minWorkerThreads,
+                        maxWorkerThreads,
+                        workerKeepAliveTime.toMillis(),
+                        "hiveserver2-endpoint-thread-pool");
+
+        try {
+            server =
+                    new TThreadPoolServer(
+                            new TThreadPoolServer.Args(
+                                            new TServerSocket(
+                                                    new ServerSocket(
+                                                            port, -1, InetAddress.getByName(null))))
+                                    .processorFactory(
+                                            new TProcessorFactory(
+                                                    new TCLIService.Processor<>(this)))
+                                    .transportFactory(new TTransportFactory())
+                                    // Currently, only support binary mode.
+                                    .protocolFactory(new TBinaryProtocol.Factory())
+                                    .inputProtocolFactory(
+                                            new TBinaryProtocol.Factory(
+                                                    true, true, maxMessageSize, maxMessageSize))
+                                    .requestTimeout(requestTimeoutMs)
+                                    .requestTimeoutUnit(TimeUnit.MILLISECONDS)
+                                    .beBackoffSlotLength(backOffSlotLengthMs)
+                                    .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
+                                    .executorService(executor));
+
+            LOG.info(String.format("HiveServer2 Endpoint begins to listen on %s.", port));

Review Comment:
   Why not log4j built-in formatting?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org