You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/12/08 02:26:25 UTC
[flink] branch master updated: [FLINK-29732][sql-gateway] Support to configureSession in the SqlGatewayService
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e6215d4f7ed [FLINK-29732][sql-gateway] Support to configureSession in the SqlGatewayService
e6215d4f7ed is described below
commit e6215d4f7ede96f1225921f2b0f322c87a5040cd
Author: yuzelin <74...@qq.com>
AuthorDate: Mon Oct 24 00:16:38 2022 +0800
[FLINK-29732][sql-gateway] Support to configureSession in the SqlGatewayService
This closes #21133
---
.../flink/table/gateway/api/SqlGatewayService.java | 14 ++
.../gateway/api/utils/MockedSqlGatewayService.java | 6 +
flink-table/flink-sql-gateway/pom.xml | 7 +
.../gateway/service/SqlGatewayServiceImpl.java | 28 ++++
.../service/operation/OperationExecutor.java | 79 ++++++++--
.../service/operation/OperationManager.java | 26 ++--
.../gateway/service/SqlGatewayServiceITCase.java | 115 +++++++++++++++
.../service/operation/OperationManagerTest.java | 159 +++++++++++++++++++++
8 files changed, 412 insertions(+), 22 deletions(-)
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index 9bdc9c751b7..4619a05c194 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -66,6 +66,20 @@ public interface SqlGatewayService {
*/
void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
+ /**
+ * Using the statement to initialize the Session. It's only allowed to execute
+ * SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR.
+ *
+ * <p>It returns until the execution finishes.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param statement the statement used to configure the session.
+ * @param executionTimeoutMs the execution timeout. Please use non-positive value to forbid the
+ * timeout mechanism.
+ */
+ void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs)
+ throws SqlGatewayException;
+
/**
* Get the current configuration of the {@code Session}.
*
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index a9c6bd77cc7..97cc0c6cf0d 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -54,6 +54,12 @@ public class MockedSqlGatewayService implements SqlGatewayService {
throw new UnsupportedOperationException();
}
+ public void configureSession(
+ SessionHandle sessionHandle, String statement, long executionTimeoutMs)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public Map<String, String> getSessionConfig(SessionHandle sessionHandle)
throws SqlGatewayException {
diff --git a/flink-table/flink-sql-gateway/pom.xml b/flink-table/flink-sql-gateway/pom.xml
index 94d48d0f4db..91475a184b6 100644
--- a/flink-table/flink-sql-gateway/pom.xml
+++ b/flink-table/flink-sql-gateway/pom.xml
@@ -103,6 +103,13 @@
<artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 932a0a0c852..4e2573964b7 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.session.Session;
import org.apache.flink.table.gateway.service.session.SessionManager;
@@ -79,6 +80,31 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
}
}
+ @Override
+ public void configureSession(
+ SessionHandle sessionHandle, String statement, long executionTimeoutMs)
+ throws SqlGatewayException {
+ try {
+ if (executionTimeoutMs > 0) {
+ // TODO: support the feature in FLINK-27838
+ throw new UnsupportedOperationException(
+ "SqlGatewayService doesn't support timeout mechanism now.");
+ }
+
+ OperationManager operationManager = getSession(sessionHandle).getOperationManager();
+ OperationHandle operationHandle =
+ operationManager.submitOperation(
+ handle ->
+ getSession(sessionHandle)
+ .createExecutor()
+ .configureSession(handle, statement));
+ operationManager.awaitOperationTermination(operationHandle);
+ } catch (Throwable t) {
+ LOG.error("Failed to configure session.", t);
+ throw new SqlGatewayException("Failed to configure session.", t);
+ }
+ }
+
@Override
public Map<String, String> getSessionConfig(SessionHandle sessionHandle)
throws SqlGatewayException {
@@ -311,6 +337,8 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
return GatewayInfo.INSTANCE;
}
+ // --------------------------------------------------------------------------------------------
+
@VisibleForTesting
public Session getSession(SessionHandle sessionHandle) {
return sessionManager.getSession(sessionHandle);
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 569e838c3a2..d39e6975ce1 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -44,22 +44,28 @@ import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
+import org.apache.flink.table.operations.UseOperation;
+import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.ddl.AlterOperation;
+import org.apache.flink.table.operations.ddl.CreateOperation;
+import org.apache.flink.table.operations.ddl.DropOperation;
+import org.apache.flink.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -84,6 +90,48 @@ public class OperationExecutor {
this.executionConfig = executionConfig;
}
+ public ResultFetcher configureSession(OperationHandle handle, String statement) {
+ TableEnvironmentInternal tableEnv = getTableEnvironment();
+ List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+ if (parsedOperations.size() > 1) {
+ throw new UnsupportedOperationException(
+ "Unsupported SQL statement! Configure session only accepts a single SQL statement.");
+ }
+ Operation op = parsedOperations.get(0);
+
+ if (!(op instanceof SetOperation)
+ && !(op instanceof ResetOperation)
+ && !(op instanceof CreateOperation)
+ && !(op instanceof DropOperation)
+ && !(op instanceof UseOperation)
+ && !(op instanceof AlterOperation)
+ && !(op instanceof LoadModuleOperation)
+ && !(op instanceof UnloadModuleOperation)
+ && !(op instanceof AddJarOperation)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported statement for configuring session:%s\n"
+ + "The configureSession API only supports to execute statement of type "
+ + "CREATE TABLE, DROP TABLE, ALTER TABLE, "
+ + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+ + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
+ + "CREATE CATALOG, DROP CATALOG, "
+ + "USE CATALOG, USE [CATALOG.]DATABASE, "
+ + "CREATE VIEW, DROP VIEW, "
+ + "LOAD MODULE, UNLOAD MODULE, USE MODULE, "
+ + "ADD JAR.",
+ statement));
+ }
+
+ if (op instanceof SetOperation) {
+ return callSetOperation(tableEnv, handle, (SetOperation) op);
+ } else if (op instanceof ResetOperation) {
+ return callResetOperation(handle, (ResetOperation) op);
+ } else {
+ return callOperation(tableEnv, handle, op);
+ }
+ }
+
public ResultFetcher executeStatement(OperationHandle handle, String statement) {
// Instantiate the TableEnvironment lazily
TableEnvironmentInternal tableEnv = getTableEnvironment();
@@ -114,9 +162,7 @@ public class OperationExecutor {
TableResultInternal result = tableEnv.executeInternal(op);
return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
} else {
- TableResultInternal result = tableEnv.executeInternal(op);
- return new ResultFetcher(
- handle, result.getResolvedSchema(), collect(result.collectInternal()));
+ return callOperation(tableEnv, handle, op);
}
}
@@ -228,7 +274,8 @@ public class OperationExecutor {
return new ResultFetcher(
handle,
TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
- collect(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+ CollectionUtil.iteratorToList(
+ TableResultInternal.TABLE_RESULT_OK.collectInternal()));
} else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
// show all properties
Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
@@ -237,7 +284,7 @@ public class OperationExecutor {
ResolvedSchema.of(
Column.physical(SET_KEY, DataTypes.STRING()),
Column.physical(SET_VALUE, DataTypes.STRING())),
- collect(
+ CollectionUtil.iteratorToList(
configMap.entrySet().stream()
.map(
entry ->
@@ -264,7 +311,8 @@ public class OperationExecutor {
return new ResultFetcher(
handle,
TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
- collect(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+ CollectionUtil.iteratorToList(
+ TableResultInternal.TABLE_RESULT_OK.collectInternal()));
}
private ResultFetcher callModifyOperations(
@@ -289,6 +337,15 @@ public class OperationExecutor {
.toString()))));
}
+ private ResultFetcher callOperation(
+ TableEnvironmentInternal tableEnv, OperationHandle handle, Operation op) {
+ TableResultInternal result = tableEnv.executeInternal(op);
+ return new ResultFetcher(
+ handle,
+ result.getResolvedSchema(),
+ CollectionUtil.iteratorToList(result.collectInternal()));
+ }
+
private Set<TableInfo> listTables(
String catalogName, String databaseName, boolean includeViews) {
CatalogManager catalogManager = sessionContext.getSessionState().catalogManager;
@@ -333,10 +390,4 @@ public class OperationExecutor {
TableKind.VIEW))
.collect(Collectors.toSet()));
}
-
- private List<RowData> collect(Iterator<RowData> tableResult) {
- List<RowData> rows = new ArrayList<>();
- tableResult.forEachRemaining(rows::add);
- return rows;
- }
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 6ca0a28f3cb..29ef1569736 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -137,6 +137,10 @@ public class OperationManager {
});
}
+ public void awaitOperationTermination(OperationHandle operationHandle) throws Exception {
+ getOperation(operationHandle).awaitTermination();
+ }
+
/**
* Get the {@link OperationInfo} of the operation.
*
@@ -307,15 +311,9 @@ public class OperationManager {
}
public ResolvedSchema getResultSchema() throws Exception {
- synchronized (status) {
- while (!status.get().isTerminalStatus()) {
- status.wait();
- }
- }
+ awaitTermination();
OperationStatus current = status.get();
- if (current == OperationStatus.ERROR) {
- throw operationError;
- } else if (current != OperationStatus.FINISHED) {
+ if (current != OperationStatus.FINISHED) {
throw new IllegalStateException(
String.format(
"The result schema is available when the Operation is in FINISHED state but the current status is %s.",
@@ -328,6 +326,18 @@ public class OperationManager {
return new OperationInfo(status.get(), operationError);
}
+ public void awaitTermination() throws Exception {
+ synchronized (status) {
+ while (!status.get().isTerminalStatus()) {
+ status.wait();
+ }
+ }
+ OperationStatus current = status.get();
+ if (current == OperationStatus.ERROR) {
+ throw operationError;
+ }
+ }
+
private ResultSet fetchResultsInternal(Supplier<ResultSet> results) {
OperationStatus currentStatus = status.get();
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index c8c8513bdb6..996cb74778a 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -20,9 +20,11 @@ package org.apache.flink.table.gateway.service;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -54,6 +56,8 @@ import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.TableFunc0;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;
@@ -62,7 +66,9 @@ import org.assertj.core.api.Condition;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -71,6 +77,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
@@ -84,6 +92,8 @@ import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
import static org.apache.flink.table.functions.FunctionKind.OTHER;
import static org.apache.flink.table.functions.FunctionKind.SCALAR;
import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
import static org.apache.flink.types.RowKind.DELETE;
import static org.apache.flink.types.RowKind.INSERT;
import static org.apache.flink.types.RowKind.UPDATE_AFTER;
@@ -157,6 +167,83 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
assertThat(tableEnv.listModules()).contains(moduleName);
}
+ @Test
+ public void testConfigureSessionWithLegalStatement(@TempDir java.nio.file.Path tmpDir)
+ throws Exception {
+ SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+ // SET & RESET
+ service.configureSession(sessionHandle, "SET 'key1' = 'value1';", 0);
+ Map<String, String> config = new HashMap<>();
+ config.put("key1", "value1");
+ assertThat(service.getSessionConfig(sessionHandle)).containsAllEntriesOf(config);
+
+ service.configureSession(sessionHandle, "RESET 'key1';", 0);
+ assertThat(service.getSessionConfig(sessionHandle)).doesNotContainEntry("key1", "value1");
+
+ // CREATE & USE & ALTER & DROP
+ service.configureSession(
+ sessionHandle,
+ "CREATE CATALOG mycat with ('type' = 'generic_in_memory', 'default-database' = 'db');",
+ 0);
+
+ service.configureSession(sessionHandle, "USE CATALOG mycat;", 0);
+ assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("mycat");
+
+ service.configureSession(
+ sessionHandle,
+ "CREATE TABLE db.tbl (score INT) WITH ('connector' = 'datagen');",
+ 0);
+
+ Set<TableKind> tableKinds = new HashSet<>();
+ tableKinds.add(TableKind.TABLE);
+ assertThat(service.listTables(sessionHandle, "mycat", "db", tableKinds))
+ .contains(
+ new TableInfo(ObjectIdentifier.of("mycat", "db", "tbl"), TableKind.TABLE));
+
+ service.configureSession(sessionHandle, "ALTER TABLE db.tbl RENAME TO tbl1;", 0);
+ assertThat(service.listTables(sessionHandle, "mycat", "db", tableKinds))
+ .doesNotContain(
+ new TableInfo(ObjectIdentifier.of("mycat", "db", "tbl"), TableKind.TABLE))
+ .contains(
+ new TableInfo(ObjectIdentifier.of("mycat", "db", "tbl1"), TableKind.TABLE));
+
+ service.configureSession(sessionHandle, "USE CATALOG default_catalog;", 0);
+ service.configureSession(sessionHandle, "DROP CATALOG mycat;", 0);
+ assertThat(service.listCatalogs(sessionHandle)).doesNotContain("mycat");
+
+ // LOAD & UNLOAD MODULE
+ validateStatementResult(
+ sessionHandle,
+ "SHOW FULL MODULES",
+ Collections.singletonList(GenericRowData.of(StringData.fromString("core"), true)));
+
+ service.configureSession(sessionHandle, "UNLOAD MODULE core;", 0);
+ validateStatementResult(sessionHandle, "SHOW FULL MODULES", Collections.emptyList());
+
+ service.configureSession(sessionHandle, "LOAD MODULE core;", 0);
+ validateStatementResult(
+ sessionHandle,
+ "SHOW FULL MODULES",
+ Collections.singletonList(GenericRowData.of(StringData.fromString("core"), true)));
+
+ // ADD JAR
+ String udfClassName = GENERATED_LOWER_UDF_CLASS + new Random().nextInt(50);
+ String jarPath =
+ UserClassLoaderJarTestUtils.createJarFile(
+ new File(tmpDir.toUri()),
+ "test-add-jar.jar",
+ udfClassName,
+ String.format(GENERATED_LOWER_UDF_CODE, udfClassName))
+ .toURI()
+ .getPath();
+ service.configureSession(sessionHandle, String.format("ADD JAR '%s';", jarPath), 0);
+ validateStatementResult(
+ sessionHandle,
+ "SHOW JARS",
+ Collections.singletonList(GenericRowData.of(StringData.fromString(jarPath))));
+ }
+
@Test
public void testFetchResultsInRunning() throws Exception {
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
@@ -711,6 +798,23 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
// Negative tests
// --------------------------------------------------------------------------------------------
+ @Test
+ void testConfigureSessionWithIllegalStatement() {
+ SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+ assertThatThrownBy(() -> service.configureSession(sessionHandle, "SELECT 1;", 0))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Unsupported statement for configuring session:SELECT 1;\n"
+ + "The configureSession API only supports to execute statement of type "
+ + "CREATE TABLE, DROP TABLE, ALTER TABLE, "
+ + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+ + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
+ + "CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, "
+ + "CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE, USE MODULE, ADD JAR."));
+ }
+
@Test
public void testFetchResultsFromCanceledOperation() {
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
@@ -899,4 +1003,15 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
return sessionHandle;
}
+
+ private void validateStatementResult(
+ SessionHandle sessionHandle, String statement, List<RowData> expected) {
+ TableEnvironmentInternal tableEnv =
+ service.getSession(sessionHandle).createExecutor().getTableEnvironment();
+ assertThat(
+ CollectionUtil.iteratorToList(
+ ((TableResultInternal) tableEnv.executeSql(statement))
+ .collectInternal()))
+ .isEqualTo(expected);
+ }
}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
new file mode 100644
index 00000000000..b0a2b5a6218
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.api.utils.ThreadUtils;
+import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link OperationManager}. */
+public class OperationManagerTest {
+
+ private static final ExecutorService EXECUTOR_SERVICE =
+ ThreadUtils.newThreadPool(5, 500, 60_0000, "operation-manager-test");
+
+ private static OperationManager operationManager;
+ private static ResultSet defaultResultSet;
+
+ private final ThreadFactory threadFactory =
+ new ExecutorThreadFactory(
+ "SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE);
+
+ @BeforeAll
+ public static void setUp() {
+ operationManager = new OperationManager(EXECUTOR_SERVICE);
+ defaultResultSet =
+ new ResultSet(
+ PAYLOAD,
+ 1L,
+ ResolvedSchema.of(Column.physical("id", DataTypes.BIGINT())),
+ Collections.singletonList(GenericRowData.of(1L)));
+ }
+
+ @AfterAll
+ public static void cleanUp() {
+ EXECUTOR_SERVICE.shutdown();
+ operationManager.close();
+ }
+
+ @Test
+ public void testRunOperationAsynchronously() throws Exception {
+ OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet);
+
+ assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+ .isNotEqualTo(OperationStatus.ERROR);
+
+ assertThat(operationManager.getOperationResultSchema(operationHandle))
+ .isEqualTo(ResolvedSchema.of(Column.physical("id", DataTypes.BIGINT())));
+
+ assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+ .isEqualTo(OperationStatus.FINISHED);
+ }
+
+ @Test
+ public void testRunOperationSynchronously() throws Exception {
+ OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet);
+ operationManager.awaitOperationTermination(operationHandle);
+
+ assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+ .isEqualTo(OperationStatus.FINISHED);
+
+ assertThat(operationManager.fetchResults(operationHandle, 0, Integer.MAX_VALUE))
+ .isEqualTo(defaultResultSet);
+ }
+
+ @Test
+ public void testCancelOperation() throws Exception {
+ CountDownLatch endRunningLatch = new CountDownLatch(1);
+ OperationHandle operationHandle =
+ operationManager.submitOperation(
+ () -> {
+ endRunningLatch.await();
+ return defaultResultSet;
+ });
+
+ threadFactory.newThread(() -> operationManager.cancelOperation(operationHandle)).start();
+ operationManager.awaitOperationTermination(operationHandle);
+
+ assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+ .isEqualTo(OperationStatus.CANCELED);
+ }
+
+ @Test
+ public void testCloseOperation() throws Exception {
+ CountDownLatch endRunningLatch = new CountDownLatch(1);
+ OperationHandle operationHandle =
+ operationManager.submitOperation(
+ () -> {
+ endRunningLatch.await();
+ return defaultResultSet;
+ });
+
+ threadFactory.newThread(() -> operationManager.closeOperation(operationHandle)).start();
+ operationManager.awaitOperationTermination(operationHandle);
+
+ assertThatThrownBy(() -> operationManager.getOperation(operationHandle))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ SqlGatewayException.class,
+ String.format(
+ "Can not find the submitted operation in the OperationManager with the %s.",
+ operationHandle)));
+ }
+
+ @Test
+ public void testRunOperationSynchronouslyWithError() {
+ OperationHandle operationHandle =
+ operationManager.submitOperation(
+ () -> {
+ throw new SqlExecutionException("Execution error.");
+ });
+
+ assertThatThrownBy(() -> operationManager.awaitOperationTermination(operationHandle))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ SqlExecutionException.class, "Execution error."));
+
+ assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+ .isEqualTo(OperationStatus.ERROR);
+ }
+}