You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/08 02:26:45 UTC
[flink] 01/02: [FLINK-28851][sql-gateway][hive] Remove useless OperationType
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3aaa160bd80a5e89447118fa1bc9cfea6bd368d1
Author: Shengkai <10...@qq.com>
AuthorDate: Sun Aug 7 17:52:38 2022 +0800
[FLINK-28851][sql-gateway][hive] Remove useless OperationType
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 13 +++----
.../hive/util/ThriftObjectConversions.java | 23 ++-----------
.../endpoint/hive/HiveServer2EndpointITCase.java | 5 ++-
.../hive/util/ThriftObjectConversionsTest.java | 4 +--
.../flink/table/gateway/api/SqlGatewayService.java | 5 +--
.../table/gateway/api/operation/OperationType.java | 40 ----------------------
.../table/gateway/api/results/OperationInfo.java | 22 +++---------
.../gateway/api/utils/MockedSqlGatewayService.java | 4 +--
.../gateway/service/SqlGatewayServiceImpl.java | 7 ++--
.../service/operation/OperationManager.java | 22 +++---------
.../gateway/service/SqlGatewayServiceITCase.java | 17 +++------
11 files changed, 29 insertions(+), 133 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 07c383e516e..ca99cad3191 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
@@ -88,6 +87,7 @@ import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationType;
import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
@@ -381,7 +381,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
resp.setOperationHandle(
toTOperationHandle(
- sessionHandle, operationHandle, OperationType.EXECUTE_STATEMENT));
+ sessionHandle, operationHandle, TOperationType.EXECUTE_STATEMENT));
} catch (Throwable t) {
LOG.error("Failed to ExecuteStatement.", t);
resp.setStatus(toTStatus(t));
@@ -401,13 +401,11 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
SessionHandle sessionHandle = toSessionHandle(tGetCatalogsReq.getSessionHandle());
OperationHandle operationHandle =
service.submitOperation(
- sessionHandle,
- OperationType.LIST_CATALOGS,
- createGetCatalogsExecutor(service, sessionHandle));
+ sessionHandle, createGetCatalogsExecutor(service, sessionHandle));
resp.setStatus(OK_STATUS);
resp.setOperationHandle(
toTOperationHandle(
- sessionHandle, operationHandle, OperationType.LIST_CATALOGS));
+ sessionHandle, operationHandle, TOperationType.GET_CATALOGS));
} catch (Throwable t) {
LOG.error("Failed to GetCatalogs.", t);
resp.setStatus(toTStatus(t));
@@ -423,7 +421,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
OperationHandle operationHandle =
service.submitOperation(
sessionHandle,
- OperationType.LIST_SCHEMAS,
createGetSchemasExecutor(
service,
sessionHandle,
@@ -432,7 +429,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
resp.setStatus(OK_STATUS);
resp.setOperationHandle(
- toTOperationHandle(sessionHandle, operationHandle, OperationType.LIST_SCHEMAS));
+ toTOperationHandle(sessionHandle, operationHandle, TOperationType.GET_SCHEMAS));
} catch (Throwable t) {
LOG.error("Failed to GetSchemas.", t);
resp.setStatus(toTStatus(t));
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 8577fbd16ff..d78d4cbcb73 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.types.DataType;
@@ -144,10 +143,10 @@ public class ThriftObjectConversions {
public static TOperationHandle toTOperationHandle(
SessionHandle sessionHandle,
OperationHandle operationHandle,
- OperationType operationType) {
+ TOperationType operationType) {
return new TOperationHandle(
toTHandleIdentifier(operationHandle.getIdentifier(), sessionHandle.getIdentifier()),
- toTOperationType(operationType),
+ operationType,
true);
}
@@ -165,24 +164,6 @@ public class ThriftObjectConversions {
// Operation related conversions
// --------------------------------------------------------------------------------------------
- public static TOperationType toTOperationType(OperationType type) {
- switch (type) {
- case EXECUTE_STATEMENT:
- return TOperationType.EXECUTE_STATEMENT;
- case LIST_CATALOGS:
- return TOperationType.GET_CATALOGS;
- case LIST_SCHEMAS:
- return TOperationType.GET_SCHEMAS;
- case LIST_TABLES:
- return TOperationType.GET_TABLES;
- case UNKNOWN:
- return TOperationType.UNKNOWN;
- default:
- throw new IllegalArgumentException(
- String.format("Unknown operation type: %s.", type));
- }
- }
-
public static TOperationState toTOperationState(OperationStatus operationStatus) {
switch (operationStatus) {
case INITIALIZED:
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index d3737a79009..c287a93e15e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
@@ -54,6 +53,7 @@ import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationType;
import org.apache.hive.service.rpc.thrift.TStatusCode;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
@@ -471,13 +471,12 @@ public class HiveServer2EndpointITCase extends TestLogger {
.getService()
.submitOperation(
sessionHandle,
- OperationType.UNKNOWN,
() -> {
latch.await();
return ResultSet.NOT_READY_RESULTS;
});
manipulateOp.accept(
- toTOperationHandle(sessionHandle, operationHandle, OperationType.UNKNOWN));
+ toTOperationHandle(sessionHandle, operationHandle, TOperationType.UNKNOWN));
operationValidator.accept(sessionHandle, operationHandle);
SQL_GATEWAY_SERVICE_EXTENSION.getService().closeSession(sessionHandle);
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
index 2a5ffb3561d..4b7655d3c07 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -45,6 +44,7 @@ import org.apache.hive.service.cli.TableSchema;
import org.apache.hive.service.cli.operation.ClassicTableTypeMapping.ClassicTableTypes;
import org.apache.hive.service.rpc.thrift.TOperationHandle;
import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
import org.apache.hive.service.rpc.thrift.TRowSet;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -118,7 +118,7 @@ class ThriftObjectConversionsTest {
OperationHandle originOperationHandle = OperationHandle.create();
TOperationHandle tOperationHandle =
toTOperationHandle(
- originSessionHandle, originOperationHandle, OperationType.UNKNOWN);
+ originSessionHandle, originOperationHandle, TOperationType.UNKNOWN);
assertThat(toSessionHandle(tOperationHandle)).isEqualTo(originSessionHandle);
assertThat(toOperationHandle(tOperationHandle)).isEqualTo(originOperationHandle);
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index cb4cb3d9392..b1e832c07b7 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -87,12 +86,10 @@ public interface SqlGatewayService {
* execution and assign the {@link OperationHandle} for later to retrieve the results.
*
* @param sessionHandle handle to identify the session.
- * @param type describe the operation type.
* @param executor the main logic to get the execution results.
* @return Returns the handle for later retrieve results.
*/
- OperationHandle submitOperation(
- SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor)
+ OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor)
throws SqlGatewayException;
/**
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
deleted file mode 100644
index a41012846d1..00000000000
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.gateway.api.operation;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/** The Operation Type. */
-@PublicEvolving
-public enum OperationType {
- /** The type indicates the operation executes statements. */
- EXECUTE_STATEMENT,
-
- /** The type indicates the operation list catalogs. */
- LIST_CATALOGS,
-
- /** The type indicates the operation list schemas. */
- LIST_SCHEMAS,
-
- /** The type indicates the operation list tables. */
- LIST_TABLES,
-
- /** The type indicates the operation is unknown. */
- UNKNOWN;
-}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
index a85404f8400..b05860e6b26 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.gateway.api.results;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.util.ExceptionUtils;
import javax.annotation.Nullable;
@@ -33,24 +32,17 @@ import java.util.Optional;
public class OperationInfo {
private final OperationStatus status;
- private final OperationType type;
@Nullable private final Exception exception;
- public OperationInfo(OperationStatus status, OperationType type) {
- this(status, type, null);
+ public OperationInfo(OperationStatus status) {
+ this(status, null);
}
- public OperationInfo(
- OperationStatus status, OperationType type, @Nullable Exception exception) {
+ public OperationInfo(OperationStatus status, @Nullable Exception exception) {
this.status = status;
- this.type = type;
this.exception = exception;
}
- public OperationType getType() {
- return type;
- }
-
public OperationStatus getStatus() {
return status;
}
@@ -68,14 +60,12 @@ public class OperationInfo {
return false;
}
OperationInfo that = (OperationInfo) o;
- return status == that.status
- && type == that.type
- && Objects.equals(exception, that.exception);
+ return status == that.status && Objects.equals(exception, that.exception);
}
@Override
public int hashCode() {
- return Objects.hash(status, type, exception);
+ return Objects.hash(status, exception);
}
@Override
@@ -83,8 +73,6 @@ public class OperationInfo {
return "OperationInfo{"
+ "status="
+ status
- + ", type="
- + type
+ ", exception="
+ (exception == null ? "null" : ExceptionUtils.stringifyException(exception))
+ '}';
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index 9075a8066fb..aff8fd6e512 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
-import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -63,8 +62,7 @@ public class MockedSqlGatewayService implements SqlGatewayService {
@Override
public OperationHandle submitOperation(
- SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor)
- throws SqlGatewayException {
+ SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException {
throw new UnsupportedOperationException();
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index e9e886589eb..174918270ec 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
-import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -98,10 +97,9 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
@Override
public OperationHandle submitOperation(
- SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor)
- throws SqlGatewayException {
+ SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException {
try {
- return getSession(sessionHandle).getOperationManager().submitOperation(type, executor);
+ return getSession(sessionHandle).getOperationManager().submitOperation(executor);
} catch (Throwable e) {
LOG.error("Failed to submitOperation.", e);
throw new SqlGatewayException("Failed to submitOperation.", e);
@@ -172,7 +170,6 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
return getSession(sessionHandle)
.getOperationManager()
.submitOperation(
- OperationType.EXECUTE_STATEMENT,
handle ->
getSession(sessionHandle)
.createExecutor(executionConfig)
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index f493c24f868..d001b7c3062 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -80,17 +79,14 @@ public class OperationManager {
* the lifecycle of the {@link Operation}, including register resources, fire the execution and
* so on.
*
- * @param operationType The type of the submitted operation.
* @param executor Worker to execute.
* @return OperationHandle to fetch the results or check the status.
*/
- public OperationHandle submitOperation(
- OperationType operationType, Callable<ResultSet> executor) {
+ public OperationHandle submitOperation(Callable<ResultSet> executor) {
OperationHandle handle = OperationHandle.create();
Operation operation =
new Operation(
handle,
- operationType,
() -> {
ResultSet resultSet = executor.call();
return new ResultFetcher(
@@ -106,15 +102,13 @@ public class OperationManager {
* lifecycle of the {@link Operation}, including register resources, fire the execution and so
* on.
*
- * @param operationType The type of the submitted operation.
* @param fetcherSupplier offer the fetcher to get the results.
* @return OperationHandle to fetch the results or check the status.
*/
public OperationHandle submitOperation(
- OperationType operationType, Function<OperationHandle, ResultFetcher> fetcherSupplier) {
+ Function<OperationHandle, ResultFetcher> fetcherSupplier) {
OperationHandle handle = OperationHandle.create();
- Operation operation =
- new Operation(handle, operationType, () -> fetcherSupplier.apply(handle));
+ Operation operation = new Operation(handle, () -> fetcherSupplier.apply(handle));
submitOperationInternal(handle, operation);
return handle;
}
@@ -210,22 +204,16 @@ public class OperationManager {
private final OperationHandle operationHandle;
- private final OperationType operationType;
private final AtomicReference<OperationStatus> status;
-
private final Callable<ResultFetcher> resultSupplier;
private volatile FutureTask<?> invocation;
private volatile ResultFetcher resultFetcher;
private volatile SqlExecutionException operationError;
- public Operation(
- OperationHandle operationHandle,
- OperationType operationType,
- Callable<ResultFetcher> resultSupplier) {
+ public Operation(OperationHandle operationHandle, Callable<ResultFetcher> resultSupplier) {
this.operationHandle = operationHandle;
this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
- this.operationType = operationType;
this.resultSupplier = resultSupplier;
}
@@ -331,7 +319,7 @@ public class OperationManager {
}
public OperationInfo getOperationInfo() {
- return new OperationInfo(status.get(), operationType, operationError);
+ return new OperationInfo(status.get(), operationError);
}
private ResultSet fetchResultsInternal(Supplier<ResultSet> results) {
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 70cf14cee6b..ec370610edb 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -187,11 +186,10 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
startRunningLatch.await();
assertThat(service.getOperationInfo(sessionHandle, operationHandle))
- .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
+ .isEqualTo(new OperationInfo(OperationStatus.RUNNING));
endRunningLatch.countDown();
- OperationInfo expectedInfo =
- new OperationInfo(OperationStatus.FINISHED, OperationType.UNKNOWN);
+ OperationInfo expectedInfo = new OperationInfo(OperationStatus.FINISHED);
CommonTestUtils.waitUtil(
() -> service.getOperationInfo(sessionHandle, operationHandle).equals(expectedInfo),
@@ -230,12 +228,12 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
startRunningLatch.await();
assertThat(service.getOperationInfo(sessionHandle, operationHandle))
- .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
+ .isEqualTo(new OperationInfo(OperationStatus.RUNNING));
service.cancelOperation(sessionHandle, operationHandle);
assertThat(service.getOperationInfo(sessionHandle, operationHandle))
- .isEqualTo(new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN));
+ .isEqualTo(new OperationInfo(OperationStatus.CANCELED));
service.closeOperation(sessionHandle, operationHandle);
assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
}
@@ -537,7 +535,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
() ->
service.submitOperation(
sessionHandle,
- OperationType.UNKNOWN,
() -> {
startRunning.countDown();
terminateRunning.await();
@@ -562,7 +559,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
handles.add(
service.submitOperation(
sessionHandle,
- OperationType.UNKNOWN,
() -> {
// If execute in parallel, the value of v may be overridden by
// another thread
@@ -596,7 +592,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
operations.add(
service.submitOperation(
sessionHandle,
- OperationType.UNKNOWN,
() -> {
latch.await();
return getDefaultResultSet();
@@ -608,7 +603,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
() ->
service.submitOperation(
sessionHandle,
- OperationType.UNKNOWN,
() -> {
latch.await();
return getDefaultResultSet();
@@ -627,7 +621,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
CountDownLatch success = new CountDownLatch(1);
service.submitOperation(
sessionHandle,
- OperationType.UNKNOWN,
() -> {
success.countDown();
return getDefaultResultSet();
@@ -701,7 +694,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
SessionHandle sessionHandle, RunnableWithException executor) {
return service.submitOperation(
sessionHandle,
- OperationType.UNKNOWN,
() -> {
executor.run();
return getDefaultResultSet();
@@ -735,7 +727,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
OperationHandle operationHandle =
service.submitOperation(
sessionHandle,
- OperationType.UNKNOWN,
() -> {
operationIsRunning.await();
return executor.call();