You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/09/22 03:25:23 UTC

[flink] 02/02: [FLINK-29229][hive] Fix HiveServer2 endpoint doesn't support execute statements in sync mode

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6493e7c1c6a2eb050e5415c1748ae5c267625ea2
Author: Shengkai <10...@qq.com>
AuthorDate: Thu Sep 8 19:52:23 2022 +0800

    [FLINK-29229][hive] Fix HiveServer2 endpoint doesn't support execute statements in sync mode
    
    This closes #20790
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   |  79 +++++++-
 .../hive/util/ThriftObjectConversions.java         |  17 +-
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 199 ++++++++++++++++++++-
 .../gateway/service/SqlGatewayServiceImpl.java     |   2 +-
 4 files changed, 277 insertions(+), 20 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index e3037d3426b..1bc38b09dfb 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -156,6 +157,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
     private static final String UNSUPPORTED_ERROR_MESSAGE =
             "The HiveServer2 Endpoint currently doesn't support to %s.";
+    private static final Long CHECK_INTERVAL_MS = 100L;
 
     // --------------------------------------------------------------------------------------------
     // Server attributes
@@ -378,13 +380,9 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq tExecuteStatementReq)
             throws TException {
         TExecuteStatementResp resp = new TExecuteStatementResp();
+        SessionHandle sessionHandle = toSessionHandle(tExecuteStatementReq.getSessionHandle());
+        OperationHandle operationHandle = null;
         try {
-            if (!tExecuteStatementReq.isRunAsync()) {
-                throw new UnsupportedOperationException(
-                        "Currently SqlGateway HiveServer2 Endpoint only supports ExecuteStatement in async mode.");
-            }
-
-            SessionHandle sessionHandle = toSessionHandle(tExecuteStatementReq.getSessionHandle());
             String statement =
                     tExecuteStatementReq.isSetStatement()
                             ? tExecuteStatementReq.getStatement()
@@ -395,20 +393,27 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                             : Collections.emptyMap();
             long timeout = tExecuteStatementReq.getQueryTimeout();
 
-            OperationHandle operationHandle =
+            operationHandle =
                     service.executeStatement(
                             sessionHandle,
                             statement,
                             timeout,
                             Configuration.fromMap(executionConfig));
-            resp.setStatus(OK_STATUS);
 
+            if (!tExecuteStatementReq.isRunAsync()) {
+                waitUntilOperationIsTerminated(sessionHandle, operationHandle);
+            }
+
+            resp.setStatus(OK_STATUS);
             resp.setOperationHandle(
                     toTOperationHandle(
                             sessionHandle, operationHandle, TOperationType.EXECUTE_STATEMENT));
         } catch (Throwable t) {
             LOG.error("Failed to ExecuteStatement.", t);
             resp.setStatus(toTStatus(t));
+            if (operationHandle != null) {
+                closeOperationSilently(sessionHandle, operationHandle);
+            }
         }
         return resp;
     }
@@ -718,7 +723,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                     service.fetchResults(
                             sessionHandle,
                             operationHandle,
-                            toFetchOrientation(tFetchResultsReq.getFetchType()),
+                            toFetchOrientation(tFetchResultsReq.getOrientation()),
                             maxRows);
             resp.setStatus(OK_STATUS);
             resp.setHasMoreRows(resultSet.getResultType() != EOS);
@@ -861,6 +866,62 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
         }
     }
 
+    /**
+     * Similar solution comparing to the {@code
+     * org.apache.hive.jdbc.HiveStatement#waitForOperationToComplete}.
+     *
+     * <p>The better solution is to introduce an interface similar to {@link TableResult#await()}.
+     */
+    private void waitUntilOperationIsTerminated(
+            SessionHandle sessionHandle, OperationHandle operationHandle) throws Exception {
+        OperationInfo info;
+        do {
+            info = service.getOperationInfo(sessionHandle, operationHandle);
+            switch (info.getStatus()) {
+                case INITIALIZED:
+                case PENDING:
+                case RUNNING:
+                    Thread.sleep(CHECK_INTERVAL_MS);
+                    break;
+                case CANCELED:
+                case TIMEOUT:
+                    throw new SqlGatewayException(
+                            String.format(
+                                    "The operation %s's status is %s for the session %s.",
+                                    operationHandle, info.getStatus(), sessionHandle));
+                case ERROR:
+                    throw new SqlGatewayException(
+                            String.format(
+                                    "The operation %s's status is %s for the session %s.",
+                                    operationHandle, info.getStatus(), sessionHandle),
+                            info.getException()
+                                    .orElseThrow(
+                                            () ->
+                                                    new SqlGatewayException(
+                                                            "Impossible! ERROR status should contains the error.")));
+                case FINISHED:
+                    return;
+                default:
+                    throw new SqlGatewayException(
+                            String.format("Unknown status: %s.", info.getStatus()));
+            }
+        } while (true);
+    }
+
+    private void closeOperationSilently(
+            SessionHandle sessionHandle, OperationHandle operationHandle) {
+        try {
+            service.closeOperation(sessionHandle, operationHandle);
+        } catch (Throwable t) {
+            // ignore
+            LOG.error(
+                    String.format(
+                            "Close the operation %s for the session %s silently.",
+                            operationHandle, sessionHandle),
+                    t);
+        }
+    }
+
     private String stringifyException(Throwable t) {
         if (isVerbose) {
             return ExceptionUtils.stringifyException(t);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 2df486bad48..35a40ea187f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -191,14 +191,15 @@ public class ThriftObjectConversions {
     // Statement related conversions
     // --------------------------------------------------------------------------------------------
 
-    public static FetchOrientation toFetchOrientation(int fetchOrientation) {
-        if (fetchOrientation == TFetchOrientation.FETCH_NEXT.getValue()) {
-            return FetchOrientation.FETCH_NEXT;
-        } else if (fetchOrientation == TFetchOrientation.FETCH_PRIOR.getValue()) {
-            return FetchOrientation.FETCH_PRIOR;
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format("Unsupported fetch orientation: %s.", fetchOrientation));
+    public static FetchOrientation toFetchOrientation(TFetchOrientation fetchOrientation) {
+        switch (fetchOrientation) {
+            case FETCH_PRIOR:
+                return FetchOrientation.FETCH_PRIOR;
+            case FETCH_NEXT:
+                return FetchOrientation.FETCH_NEXT;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unsupported fetch orientation: %s.", fetchOrientation));
         }
     }
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 5a7bd41ba0d..0f782e1e8cc 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -19,9 +19,13 @@
 package org.apache.flink.table.endpoint.hive;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.SqlDialect;
-import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -34,19 +38,24 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
 import org.apache.flink.table.gateway.service.session.SessionManager;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
+import org.apache.flink.test.junit5.InjectClusterClient;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.FutureTaskWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
 import org.apache.hadoop.hive.serde2.thrift.Type;
 import org.apache.hive.jdbc.HiveConnection;
 import org.apache.hive.jdbc.HiveStatement;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.rpc.thrift.TCLIService;
 import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
 import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
@@ -54,10 +63,17 @@ import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
 import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
 import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
 import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TFetchOrientation;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
 import org.apache.hive.service.rpc.thrift.TOperationType;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TTransport;
@@ -75,20 +91,27 @@ import java.sql.Types;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.configuration.PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID;
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
+import static org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -146,7 +169,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
         assertThat(actualConfig.entrySet())
                 .contains(
                         new AbstractMap.SimpleEntry<>(
-                                TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()),
+                                TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()),
                         new AbstractMap.SimpleEntry<>(TABLE_DML_SYNC.key(), "true"),
                         new AbstractMap.SimpleEntry<>(RUNTIME_MODE.key(), BATCH.name()),
                         new AbstractMap.SimpleEntry<>(MAX_LENGTH_GENERATED_CODE.key(), "-1"),
@@ -572,6 +595,112 @@ public class HiveServer2EndpointITCase extends TestLogger {
         }
     }
 
+    @Test
+    public void testExecuteStatementInSyncMode() throws Exception {
+        TCLIService.Client client = createClient();
+        TSessionHandle sessionHandle = client.OpenSession(new TOpenSessionReq()).getSessionHandle();
+        TOperationHandle operationHandle =
+                client.ExecuteStatement(new TExecuteStatementReq(sessionHandle, "SHOW CATALOGS"))
+                        .getOperationHandle();
+
+        assertThat(
+                        client.GetOperationStatus(new TGetOperationStatusReq(operationHandle))
+                                .getOperationState())
+                .isEqualTo(TOperationState.FINISHED_STATE);
+
+        RowSet rowSet =
+                RowSetFactory.create(
+                        client.FetchResults(
+                                        new TFetchResultsReq(
+                                                operationHandle,
+                                                TFetchOrientation.FETCH_NEXT,
+                                                Integer.MAX_VALUE))
+                                .getResults(),
+                        HIVE_CLI_SERVICE_PROTOCOL_V10);
+        Iterator<Object[]> iterator = rowSet.iterator();
+        List<List<Object>> actual = new ArrayList<>();
+        while (iterator.hasNext()) {
+            actual.add(new ArrayList<>(Arrays.asList(iterator.next())));
+        }
+        assertThat(actual).isEqualTo(Collections.singletonList(Collections.singletonList("hive")));
+    }
+
+    @Test
+    public void testExecuteStatementInSyncModeWithCompileException() throws Exception {
+        TCLIService.Client client = createClient();
+        TSessionHandle tSessionHandle =
+                client.OpenSession(new TOpenSessionReq()).getSessionHandle();
+        TExecuteStatementReq req =
+                new TExecuteStatementReq(tSessionHandle, "SELECT * FROM `non_exist_table`");
+        TExecuteStatementResp resp = client.ExecuteStatement(req);
+        assertThat(resp.getStatus().getInfoMessages())
+                .matches(
+                        causes ->
+                                causes.stream()
+                                        .anyMatch(
+                                                cause ->
+                                                        cause.contains(
+                                                                "Table not found 'non_exist_table'")));
+        assertThat(
+                        ((SqlGatewayServiceImpl) (SQL_GATEWAY_SERVICE_EXTENSION.getService()))
+                                .getSession(toSessionHandle(tSessionHandle))
+                                .getOperationManager()
+                                .getOperationCount())
+                .isEqualTo(0);
+    }
+
+    @Test
+    public void testExecuteStatementInSyncModeWithRuntimeException1() throws Exception {
+        runExecuteStatementInSyncModeWithRuntimeException(
+                (tSessionHandle, future) -> {
+                    createClient().CloseSession(new TCloseSessionReq(tSessionHandle));
+
+                    TExecuteStatementResp resp = future.get(10, TimeUnit.SECONDS);
+                    assertThat(resp.getStatus().getInfoMessages())
+                            .matches(
+                                    causes ->
+                                            causes.stream()
+                                                    .anyMatch(
+                                                            cause ->
+                                                                    // Close the session before or
+                                                                    // after
+                                                                    // submitting the job
+                                                                    cause.contains(
+                                                                                    "Failed to execute statement.")
+                                                                            || cause.contains(
+                                                                                    "Failed to getOperationInfo")));
+                });
+    }
+
+    @Test
+    public void testExecuteStatementInSyncModeWithRuntimeException2(
+            @InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception {
+        runExecuteStatementInSyncModeWithRuntimeException(
+                (tSessionHandle, future) -> {
+                    waitUntilJobIsRunning(restClusterClient);
+                    JobID jobID =
+                            JobID.fromHexString(
+                                    SQL_GATEWAY_SERVICE_EXTENSION
+                                            .getService()
+                                            .getSessionConfig(toSessionHandle(tSessionHandle))
+                                            .get(PIPELINE_FIXED_JOB_ID.key()));
+
+                    restClusterClient.cancel(jobID).get();
+
+                    TExecuteStatementResp resp = future.get(10, TimeUnit.SECONDS);
+                    assertThat(resp.getStatus().getInfoMessages())
+                            .matches(
+                                    causes ->
+                                            causes.stream()
+                                                    .anyMatch(
+                                                            cause ->
+                                                                    cause.contains(
+                                                                            String.format(
+                                                                                    "Job failed (JobID: %s)",
+                                                                                    jobID))));
+                });
+    }
+
     // --------------------------------------------------------------------------------------------
 
     private static void initializeEnvironment() throws Exception {
@@ -786,4 +915,70 @@ public class HiveServer2EndpointITCase extends TestLogger {
         }
         return actual;
     }
+
+    private void runExecuteStatementInSyncModeWithRuntimeException(
+            BiConsumerWithException<
+                            TSessionHandle,
+                            FutureTaskWithException<TExecuteStatementResp>,
+                            Exception>
+                    checker)
+            throws Exception {
+        TCLIService.Client client = createClient();
+        TOpenSessionReq openSessionReq = new TOpenSessionReq();
+        openSessionReq.putToConfiguration(
+                RUNTIME_MODE.key(), RuntimeExecutionMode.STREAMING.name());
+        openSessionReq.putToConfiguration(TABLE_SQL_DIALECT.key(), SqlDialect.DEFAULT.name());
+        openSessionReq.putToConfiguration(PIPELINE_FIXED_JOB_ID.key(), JobID.generate().toString());
+        TSessionHandle tSessionHandle = client.OpenSession(openSessionReq).getSessionHandle();
+
+        List<String> initSql =
+                Arrays.asList(
+                        "CREATE TEMPORARY TABLE source(\n"
+                                + "  a INT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'datagen'"
+                                + ")",
+                        "CREATE TEMPORARY TABLE sink(\n"
+                                + "  a INT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'blackhole'"
+                                + ")");
+
+        for (String sql : initSql) {
+            TExecuteStatementReq statementReq = new TExecuteStatementReq(tSessionHandle, sql);
+            client.ExecuteStatement(statementReq);
+        }
+
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        FutureTaskWithException<TExecuteStatementResp> future =
+                new FutureTaskWithException<>(
+                        () -> {
+                            countDownLatch.countDown();
+                            // Thrift client is not thread-safe.
+                            return createClient()
+                                    .ExecuteStatement(
+                                            new TExecuteStatementReq(
+                                                    tSessionHandle,
+                                                    "INSERT INTO sink SELECT * FROM source"));
+                        });
+        Thread submitter = new Thread(future);
+        submitter.start();
+        countDownLatch.await();
+
+        checker.accept(tSessionHandle, future);
+    }
+
+    private void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
+        while (getRunningJobs(client).isEmpty()) {
+            Thread.sleep(50);
+        }
+    }
+
+    private List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+        Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+        return statusMessages.stream()
+                .filter(status -> !status.getJobState().isGloballyTerminalState())
+                .map(JobStatusMessage::getJobId)
+                .collect(Collectors.toList());
+    }
 }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 4bdbafe5ab7..932a0a0c852 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -312,7 +312,7 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
     }
 
     @VisibleForTesting
-    Session getSession(SessionHandle sessionHandle) {
+    public Session getSession(SessionHandle sessionHandle) {
         return sessionManager.getSession(sessionHandle);
     }
 }