You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/03 03:21:11 UTC

[flink] branch master updated: [FLINK-28151][hive] Allow to cancel the Operation for the HiveServer2 Endpoint.

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 69df7a46bf4 [FLINK-28151][hive] Allow to cancel the Operation for the HiveServer2 Endpoint.
69df7a46bf4 is described below

commit 69df7a46bf439fc94ad1074b0d9ea9b3503632b1
Author: jayce <32...@qq.com>
AuthorDate: Mon Aug 1 15:33:32 2022 +0800

    [FLINK-28151][hive] Allow to cancel the Operation for the HiveServer2 Endpoint.
    
    This closes #20402
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 24 +++++-
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 93 ++++++++++++++++++++++
 .../hive/HiveServer2EndpointStatementITCase.java   |  4 +-
 .../hive/util/HiveServer2EndpointExtension.java    |  4 +
 4 files changed, 122 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 7174a4502f2..07597450a4f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -458,13 +458,33 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public TCancelOperationResp CancelOperation(TCancelOperationReq tCancelOperationReq)
             throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TCancelOperationResp resp = new TCancelOperationResp();
+        try {
+            TOperationHandle operationHandle = tCancelOperationReq.getOperationHandle();
+            service.cancelOperation(
+                    toSessionHandle(operationHandle), toOperationHandle(operationHandle));
+            resp.setStatus(OK_STATUS);
+        } catch (Throwable t) {
+            LOG.error("Failed to CancelOperation.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
     public TCloseOperationResp CloseOperation(TCloseOperationReq tCloseOperationReq)
             throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TCloseOperationResp resp = new TCloseOperationResp();
+        try {
+            TOperationHandle operationHandle = tCloseOperationReq.getOperationHandle();
+            service.closeOperation(
+                    toSessionHandle(operationHandle), toOperationHandle(operationHandle));
+            resp.setStatus(OK_STATUS);
+        } catch (Throwable t) {
+            LOG.error("Failed to CloseOperation.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index ba3ac421d1f..269e1a21f3b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -18,22 +18,36 @@
 
 package org.apache.flink.table.endpoint.hive;
 
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
 import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.session.SessionManager;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
 import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
+import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
+import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
 import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
 import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TTransport;
@@ -46,12 +60,15 @@ import java.sql.Connection;
 import java.util.AbstractMap;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for {@link HiveServer2Endpoint}. */
 public class HiveServer2EndpointITCase extends TestLogger {
@@ -123,6 +140,82 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                                 "Session '%s' does not exist", sessionHandle)));
     }
 
+    @Test
+    public void testCancelOperation() throws Exception {
+        runOperationRequest(
+                tOperationHandle -> {
+                    TCancelOperationResp tCancelOperationResp =
+                            ENDPOINT_EXTENSION
+                                    .getEndpoint()
+                                    .CancelOperation(new TCancelOperationReq(tOperationHandle));
+                    assertThat(tCancelOperationResp.getStatus().getStatusCode())
+                            .isEqualTo(TStatusCode.SUCCESS_STATUS);
+                },
+                ((sessionHandle, operationHandle) ->
+                        assertThat(
+                                        SQL_GATEWAY_SERVICE_EXTENSION
+                                                .getService()
+                                                .getOperationInfo(sessionHandle, operationHandle)
+                                                .getStatus())
+                                .isEqualTo(OperationStatus.CANCELED)));
+    }
+
+    @Test
+    public void testCloseOperation() throws Exception {
+        runOperationRequest(
+                tOperationHandle -> {
+                    TCloseOperationResp resp =
+                            ENDPOINT_EXTENSION
+                                    .getEndpoint()
+                                    .CloseOperation(new TCloseOperationReq(tOperationHandle));
+                    assertThat(resp.getStatus().getStatusCode())
+                            .isEqualTo(TStatusCode.SUCCESS_STATUS);
+                },
+                ((sessionHandle, operationHandle) ->
+                        assertThatThrownBy(
+                                        () ->
+                                                SQL_GATEWAY_SERVICE_EXTENSION
+                                                        .getService()
+                                                        .getOperationInfo(
+                                                                sessionHandle, operationHandle))
+                                .satisfies(
+                                        FlinkAssertions.anyCauseMatches(
+                                                SqlGatewayException.class,
+                                                String.format(
+                                                        "Can not find the submitted operation in the OperationManager with the %s",
+                                                        operationHandle)))));
+    }
+
+    private void runOperationRequest(
+            ThrowingConsumer<TOperationHandle, Exception> manipulateOp,
+            BiConsumerWithException<SessionHandle, OperationHandle, Exception> operationValidator)
+            throws Exception {
+        SessionHandle sessionHandle =
+                SQL_GATEWAY_SERVICE_EXTENSION
+                        .getService()
+                        .openSession(
+                                SessionEnvironment.newBuilder()
+                                        .setSessionEndpointVersion(
+                                                HiveServer2EndpointVersion
+                                                        .HIVE_CLI_SERVICE_PROTOCOL_V10)
+                                        .build());
+        CountDownLatch latch = new CountDownLatch(1);
+        OperationHandle operationHandle =
+                SQL_GATEWAY_SERVICE_EXTENSION
+                        .getService()
+                        .submitOperation(
+                                sessionHandle,
+                                OperationType.UNKNOWN,
+                                () -> {
+                                    latch.await();
+                                    return ResultSet.NOT_READY_RESULTS;
+                                });
+        manipulateOp.accept(
+                toTOperationHandle(sessionHandle, operationHandle, OperationType.UNKNOWN));
+        operationValidator.accept(sessionHandle, operationHandle);
+        SQL_GATEWAY_SERVICE_EXTENSION.getService().closeSession(sessionHandle);
+    }
+
     private TCLIService.Client createClient() throws Exception {
         TTransport transport =
                 HiveAuthUtils.getSocketTransport(
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
index c15050eea52..34abff6fc07 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
@@ -66,16 +66,19 @@ public class HiveServer2EndpointStatementITCase extends AbstractSqlGatewayStatem
             new HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
 
     private Connection connection;
+    private Statement statement;
 
     @BeforeEach
     @Override
     public void before(@TempDir Path temporaryFolder) throws Exception {
         super.before(temporaryFolder);
         connection = ENDPOINT_EXTENSION.getConnection();
+        statement = connection.createStatement();
     }
 
     @AfterEach
     public void after() throws Exception {
+        statement.close();
         connection.close();
     }
 
@@ -91,7 +94,6 @@ public class HiveServer2EndpointStatementITCase extends AbstractSqlGatewayStatem
 
     @Override
     protected String runSingleStatement(String sql) throws Exception {
-        Statement statement = connection.createStatement();
         statement.execute(sql);
 
         ResultSet resultSet = statement.getResultSet();
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
index 5d26965cc92..4cf0143f62b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
@@ -101,6 +101,10 @@ public class HiveServer2EndpointExtension implements BeforeAllCallback, AfterAll
         return checkNotNull(port).getPort();
     }
 
+    public HiveServer2Endpoint getEndpoint() {
+        return endpoint;
+    }
+
     public Connection getConnection() throws Exception {
         // In hive3, if "hive.metastore.schema.verification" is true, the
         // "datanucleus.schema.autoCreateTables" is false during the creation of the HiveConf.