You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/03 03:21:11 UTC
[flink] branch master updated: [FLINK-28151][hive] Allow to cancel the Operation for the HiveServer2 Endpoint.
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 69df7a46bf4 [FLINK-28151][hive] Allow to cancel the Operation for the HiveServer2 Endpoint.
69df7a46bf4 is described below
commit 69df7a46bf439fc94ad1074b0d9ea9b3503632b1
Author: jayce <32...@qq.com>
AuthorDate: Mon Aug 1 15:33:32 2022 +0800
[FLINK-28151][hive] Allow to cancel the Operation for the HiveServer2 Endpoint.
This closes #20402
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 24 +++++-
.../endpoint/hive/HiveServer2EndpointITCase.java | 93 ++++++++++++++++++++++
.../hive/HiveServer2EndpointStatementITCase.java | 4 +-
.../hive/util/HiveServer2EndpointExtension.java | 4 +
4 files changed, 122 insertions(+), 3 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 7174a4502f2..07597450a4f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -458,13 +458,33 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public TCancelOperationResp CancelOperation(TCancelOperationReq tCancelOperationReq)
throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TCancelOperationResp resp = new TCancelOperationResp();
+ try {
+ TOperationHandle operationHandle = tCancelOperationReq.getOperationHandle();
+ service.cancelOperation(
+ toSessionHandle(operationHandle), toOperationHandle(operationHandle));
+ resp.setStatus(OK_STATUS);
+ } catch (Throwable t) {
+ LOG.error("Failed to CancelOperation.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
public TCloseOperationResp CloseOperation(TCloseOperationReq tCloseOperationReq)
throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TCloseOperationResp resp = new TCloseOperationResp();
+ try {
+ TOperationHandle operationHandle = tCloseOperationReq.getOperationHandle();
+ service.closeOperation(
+ toSessionHandle(operationHandle), toOperationHandle(operationHandle));
+ resp.setStatus(OK_STATUS);
+ } catch (Throwable t) {
+ LOG.error("Failed to CloseOperation.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index ba3ac421d1f..269e1a21f3b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -18,22 +18,36 @@
package org.apache.flink.table.endpoint.hive;
+import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
+import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
+import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
import org.apache.hive.service.rpc.thrift.TStatusCode;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
@@ -46,12 +60,15 @@ import java.sql.Connection;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for {@link HiveServer2Endpoint}. */
public class HiveServer2EndpointITCase extends TestLogger {
@@ -123,6 +140,82 @@ public class HiveServer2EndpointITCase extends TestLogger {
"Session '%s' does not exist", sessionHandle)));
}
+ @Test
+ public void testCancelOperation() throws Exception {
+ runOperationRequest(
+ tOperationHandle -> {
+ TCancelOperationResp tCancelOperationResp =
+ ENDPOINT_EXTENSION
+ .getEndpoint()
+ .CancelOperation(new TCancelOperationReq(tOperationHandle));
+ assertThat(tCancelOperationResp.getStatus().getStatusCode())
+ .isEqualTo(TStatusCode.SUCCESS_STATUS);
+ },
+ ((sessionHandle, operationHandle) ->
+ assertThat(
+ SQL_GATEWAY_SERVICE_EXTENSION
+ .getService()
+ .getOperationInfo(sessionHandle, operationHandle)
+ .getStatus())
+ .isEqualTo(OperationStatus.CANCELED)));
+ }
+
+ @Test
+ public void testCloseOperation() throws Exception {
+ runOperationRequest(
+ tOperationHandle -> {
+ TCloseOperationResp resp =
+ ENDPOINT_EXTENSION
+ .getEndpoint()
+ .CloseOperation(new TCloseOperationReq(tOperationHandle));
+ assertThat(resp.getStatus().getStatusCode())
+ .isEqualTo(TStatusCode.SUCCESS_STATUS);
+ },
+ ((sessionHandle, operationHandle) ->
+ assertThatThrownBy(
+ () ->
+ SQL_GATEWAY_SERVICE_EXTENSION
+ .getService()
+ .getOperationInfo(
+ sessionHandle, operationHandle))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ SqlGatewayException.class,
+ String.format(
+ "Can not find the submitted operation in the OperationManager with the %s",
+ operationHandle)))));
+ }
+
+ private void runOperationRequest(
+ ThrowingConsumer<TOperationHandle, Exception> manipulateOp,
+ BiConsumerWithException<SessionHandle, OperationHandle, Exception> operationValidator)
+ throws Exception {
+ SessionHandle sessionHandle =
+ SQL_GATEWAY_SERVICE_EXTENSION
+ .getService()
+ .openSession(
+ SessionEnvironment.newBuilder()
+ .setSessionEndpointVersion(
+ HiveServer2EndpointVersion
+ .HIVE_CLI_SERVICE_PROTOCOL_V10)
+ .build());
+ CountDownLatch latch = new CountDownLatch(1);
+ OperationHandle operationHandle =
+ SQL_GATEWAY_SERVICE_EXTENSION
+ .getService()
+ .submitOperation(
+ sessionHandle,
+ OperationType.UNKNOWN,
+ () -> {
+ latch.await();
+ return ResultSet.NOT_READY_RESULTS;
+ });
+ manipulateOp.accept(
+ toTOperationHandle(sessionHandle, operationHandle, OperationType.UNKNOWN));
+ operationValidator.accept(sessionHandle, operationHandle);
+ SQL_GATEWAY_SERVICE_EXTENSION.getService().closeSession(sessionHandle);
+ }
+
private TCLIService.Client createClient() throws Exception {
TTransport transport =
HiveAuthUtils.getSocketTransport(
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
index c15050eea52..34abff6fc07 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
@@ -66,16 +66,19 @@ public class HiveServer2EndpointStatementITCase extends AbstractSqlGatewayStatem
new HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
private Connection connection;
+ private Statement statement;
@BeforeEach
@Override
public void before(@TempDir Path temporaryFolder) throws Exception {
super.before(temporaryFolder);
connection = ENDPOINT_EXTENSION.getConnection();
+ statement = connection.createStatement();
}
@AfterEach
public void after() throws Exception {
+ statement.close();
connection.close();
}
@@ -91,7 +94,6 @@ public class HiveServer2EndpointStatementITCase extends AbstractSqlGatewayStatem
@Override
protected String runSingleStatement(String sql) throws Exception {
- Statement statement = connection.createStatement();
statement.execute(sql);
ResultSet resultSet = statement.getResultSet();
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
index 5d26965cc92..4cf0143f62b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
@@ -101,6 +101,10 @@ public class HiveServer2EndpointExtension implements BeforeAllCallback, AfterAll
return checkNotNull(port).getPort();
}
+ public HiveServer2Endpoint getEndpoint() {
+ return endpoint;
+ }
+
public Connection getConnection() throws Exception {
// In hive3, if "hive.metastore.schema.verification" is true, the
// "datanucleus.schema.autoCreateTables" is false during the creation of the HiveConf.