You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/08 02:26:45 UTC

[flink] 01/02: [FLINK-28851][sql-gateway][hive] Remove useless OperationType

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3aaa160bd80a5e89447118fa1bc9cfea6bd368d1
Author: Shengkai <10...@qq.com>
AuthorDate: Sun Aug 7 17:52:38 2022 +0800

    [FLINK-28851][sql-gateway][hive] Remove useless OperationType
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 13 +++----
 .../hive/util/ThriftObjectConversions.java         | 23 ++-----------
 .../endpoint/hive/HiveServer2EndpointITCase.java   |  5 ++-
 .../hive/util/ThriftObjectConversionsTest.java     |  4 +--
 .../flink/table/gateway/api/SqlGatewayService.java |  5 +--
 .../table/gateway/api/operation/OperationType.java | 40 ----------------------
 .../table/gateway/api/results/OperationInfo.java   | 22 +++---------
 .../gateway/api/utils/MockedSqlGatewayService.java |  4 +--
 .../gateway/service/SqlGatewayServiceImpl.java     |  7 ++--
 .../service/operation/OperationManager.java        | 22 +++---------
 .../gateway/service/SqlGatewayServiceITCase.java   | 17 +++------
 11 files changed, 29 insertions(+), 133 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 07c383e516e..ca99cad3191 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
@@ -88,6 +87,7 @@ import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationType;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
 import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
 import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
@@ -381,7 +381,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
             resp.setOperationHandle(
                     toTOperationHandle(
-                            sessionHandle, operationHandle, OperationType.EXECUTE_STATEMENT));
+                            sessionHandle, operationHandle, TOperationType.EXECUTE_STATEMENT));
         } catch (Throwable t) {
             LOG.error("Failed to ExecuteStatement.", t);
             resp.setStatus(toTStatus(t));
@@ -401,13 +401,11 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             SessionHandle sessionHandle = toSessionHandle(tGetCatalogsReq.getSessionHandle());
             OperationHandle operationHandle =
                     service.submitOperation(
-                            sessionHandle,
-                            OperationType.LIST_CATALOGS,
-                            createGetCatalogsExecutor(service, sessionHandle));
+                            sessionHandle, createGetCatalogsExecutor(service, sessionHandle));
             resp.setStatus(OK_STATUS);
             resp.setOperationHandle(
                     toTOperationHandle(
-                            sessionHandle, operationHandle, OperationType.LIST_CATALOGS));
+                            sessionHandle, operationHandle, TOperationType.GET_CATALOGS));
         } catch (Throwable t) {
             LOG.error("Failed to GetCatalogs.", t);
             resp.setStatus(toTStatus(t));
@@ -423,7 +421,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             OperationHandle operationHandle =
                     service.submitOperation(
                             sessionHandle,
-                            OperationType.LIST_SCHEMAS,
                             createGetSchemasExecutor(
                                     service,
                                     sessionHandle,
@@ -432,7 +429,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
             resp.setStatus(OK_STATUS);
             resp.setOperationHandle(
-                    toTOperationHandle(sessionHandle, operationHandle, OperationType.LIST_SCHEMAS));
+                    toTOperationHandle(sessionHandle, operationHandle, TOperationType.GET_SCHEMAS));
         } catch (Throwable t) {
             LOG.error("Failed to GetSchemas.", t);
             resp.setStatus(toTStatus(t));
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 8577fbd16ff..d78d4cbcb73 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.types.DataType;
@@ -144,10 +143,10 @@ public class ThriftObjectConversions {
     public static TOperationHandle toTOperationHandle(
             SessionHandle sessionHandle,
             OperationHandle operationHandle,
-            OperationType operationType) {
+            TOperationType operationType) {
         return new TOperationHandle(
                 toTHandleIdentifier(operationHandle.getIdentifier(), sessionHandle.getIdentifier()),
-                toTOperationType(operationType),
+                operationType,
                 true);
     }
 
@@ -165,24 +164,6 @@ public class ThriftObjectConversions {
     // Operation related conversions
     // --------------------------------------------------------------------------------------------
 
-    public static TOperationType toTOperationType(OperationType type) {
-        switch (type) {
-            case EXECUTE_STATEMENT:
-                return TOperationType.EXECUTE_STATEMENT;
-            case LIST_CATALOGS:
-                return TOperationType.GET_CATALOGS;
-            case LIST_SCHEMAS:
-                return TOperationType.GET_SCHEMAS;
-            case LIST_TABLES:
-                return TOperationType.GET_TABLES;
-            case UNKNOWN:
-                return TOperationType.UNKNOWN;
-            default:
-                throw new IllegalArgumentException(
-                        String.format("Unknown operation type: %s.", type));
-        }
-    }
-
     public static TOperationState toTOperationState(OperationStatus operationStatus) {
         switch (operationStatus) {
             case INITIALIZED:
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index d3737a79009..c287a93e15e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
 import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
@@ -54,6 +53,7 @@ import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationType;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TTransport;
@@ -471,13 +471,12 @@ public class HiveServer2EndpointITCase extends TestLogger {
                         .getService()
                         .submitOperation(
                                 sessionHandle,
-                                OperationType.UNKNOWN,
                                 () -> {
                                     latch.await();
                                     return ResultSet.NOT_READY_RESULTS;
                                 });
         manipulateOp.accept(
-                toTOperationHandle(sessionHandle, operationHandle, OperationType.UNKNOWN));
+                toTOperationHandle(sessionHandle, operationHandle, TOperationType.UNKNOWN));
         operationValidator.accept(sessionHandle, operationHandle);
         SQL_GATEWAY_SERVICE_EXTENSION.getService().closeSession(sessionHandle);
     }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
index 2a5ffb3561d..4b7655d3c07 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -45,6 +44,7 @@ import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.operation.ClassicTableTypeMapping.ClassicTableTypes;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
 import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
 import org.apache.hive.service.rpc.thrift.TRowSet;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -118,7 +118,7 @@ class ThriftObjectConversionsTest {
         OperationHandle originOperationHandle = OperationHandle.create();
         TOperationHandle tOperationHandle =
                 toTOperationHandle(
-                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN);
+                        originSessionHandle, originOperationHandle, TOperationType.UNKNOWN);
 
         assertThat(toSessionHandle(tOperationHandle)).isEqualTo(originSessionHandle);
         assertThat(toOperationHandle(tOperationHandle)).isEqualTo(originOperationHandle);
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index cb4cb3d9392..b1e832c07b7 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -87,12 +86,10 @@ public interface SqlGatewayService {
      * execution and assign the {@link OperationHandle} for later to retrieve the results.
      *
      * @param sessionHandle handle to identify the session.
-     * @param type describe the operation type.
      * @param executor the main logic to get the execution results.
      * @return Returns the handle for later retrieve results.
      */
-    OperationHandle submitOperation(
-            SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor)
+    OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor)
             throws SqlGatewayException;
 
     /**
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
deleted file mode 100644
index a41012846d1..00000000000
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.gateway.api.operation;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/** The Operation Type. */
-@PublicEvolving
-public enum OperationType {
-    /** The type indicates the operation executes statements. */
-    EXECUTE_STATEMENT,
-
-    /** The type indicates the operation list catalogs. */
-    LIST_CATALOGS,
-
-    /** The type indicates the operation list schemas. */
-    LIST_SCHEMAS,
-
-    /** The type indicates the operation list tables. */
-    LIST_TABLES,
-
-    /** The type indicates the operation is unknown. */
-    UNKNOWN;
-}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
index a85404f8400..b05860e6b26 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.gateway.api.results;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nullable;
@@ -33,24 +32,17 @@ import java.util.Optional;
 public class OperationInfo {
 
     private final OperationStatus status;
-    private final OperationType type;
     @Nullable private final Exception exception;
 
-    public OperationInfo(OperationStatus status, OperationType type) {
-        this(status, type, null);
+    public OperationInfo(OperationStatus status) {
+        this(status, null);
     }
 
-    public OperationInfo(
-            OperationStatus status, OperationType type, @Nullable Exception exception) {
+    public OperationInfo(OperationStatus status, @Nullable Exception exception) {
         this.status = status;
-        this.type = type;
         this.exception = exception;
     }
 
-    public OperationType getType() {
-        return type;
-    }
-
     public OperationStatus getStatus() {
         return status;
     }
@@ -68,14 +60,12 @@ public class OperationInfo {
             return false;
         }
         OperationInfo that = (OperationInfo) o;
-        return status == that.status
-                && type == that.type
-                && Objects.equals(exception, that.exception);
+        return status == that.status && Objects.equals(exception, that.exception);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(status, type, exception);
+        return Objects.hash(status, exception);
     }
 
     @Override
@@ -83,8 +73,6 @@ public class OperationInfo {
         return "OperationInfo{"
                 + "status="
                 + status
-                + ", type="
-                + type
                 + ", exception="
                 + (exception == null ? "null" : ExceptionUtils.stringifyException(exception))
                 + '}';
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index 9075a8066fb..aff8fd6e512 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -63,8 +62,7 @@ public class MockedSqlGatewayService implements SqlGatewayService {
 
     @Override
     public OperationHandle submitOperation(
-            SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor)
-            throws SqlGatewayException {
+            SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException {
         throw new UnsupportedOperationException();
     }
 
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index e9e886589eb..174918270ec 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -98,10 +97,9 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
 
     @Override
     public OperationHandle submitOperation(
-            SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor)
-            throws SqlGatewayException {
+            SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException {
         try {
-            return getSession(sessionHandle).getOperationManager().submitOperation(type, executor);
+            return getSession(sessionHandle).getOperationManager().submitOperation(executor);
         } catch (Throwable e) {
             LOG.error("Failed to submitOperation.", e);
             throw new SqlGatewayException("Failed to submitOperation.", e);
@@ -172,7 +170,6 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
             return getSession(sessionHandle)
                     .getOperationManager()
                     .submitOperation(
-                            OperationType.EXECUTE_STATEMENT,
                             handle ->
                                     getSession(sessionHandle)
                                             .createExecutor(executionConfig)
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index f493c24f868..d001b7c3062 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -80,17 +79,14 @@ public class OperationManager {
      * the lifecycle of the {@link Operation}, including register resources, fire the execution and
      * so on.
      *
-     * @param operationType The type of the submitted operation.
      * @param executor Worker to execute.
      * @return OperationHandle to fetch the results or check the status.
      */
-    public OperationHandle submitOperation(
-            OperationType operationType, Callable<ResultSet> executor) {
+    public OperationHandle submitOperation(Callable<ResultSet> executor) {
         OperationHandle handle = OperationHandle.create();
         Operation operation =
                 new Operation(
                         handle,
-                        operationType,
                         () -> {
                             ResultSet resultSet = executor.call();
                             return new ResultFetcher(
@@ -106,15 +102,13 @@ public class OperationManager {
      * lifecycle of the {@link Operation}, including register resources, fire the execution and so
      * on.
      *
-     * @param operationType The type of the submitted operation.
      * @param fetcherSupplier offer the fetcher to get the results.
      * @return OperationHandle to fetch the results or check the status.
      */
     public OperationHandle submitOperation(
-            OperationType operationType, Function<OperationHandle, ResultFetcher> fetcherSupplier) {
+            Function<OperationHandle, ResultFetcher> fetcherSupplier) {
         OperationHandle handle = OperationHandle.create();
-        Operation operation =
-                new Operation(handle, operationType, () -> fetcherSupplier.apply(handle));
+        Operation operation = new Operation(handle, () -> fetcherSupplier.apply(handle));
         submitOperationInternal(handle, operation);
         return handle;
     }
@@ -210,22 +204,16 @@ public class OperationManager {
 
         private final OperationHandle operationHandle;
 
-        private final OperationType operationType;
         private final AtomicReference<OperationStatus> status;
-
         private final Callable<ResultFetcher> resultSupplier;
 
         private volatile FutureTask<?> invocation;
         private volatile ResultFetcher resultFetcher;
         private volatile SqlExecutionException operationError;
 
-        public Operation(
-                OperationHandle operationHandle,
-                OperationType operationType,
-                Callable<ResultFetcher> resultSupplier) {
+        public Operation(OperationHandle operationHandle, Callable<ResultFetcher> resultSupplier) {
             this.operationHandle = operationHandle;
             this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
-            this.operationType = operationType;
             this.resultSupplier = resultSupplier;
         }
 
@@ -331,7 +319,7 @@ public class OperationManager {
         }
 
         public OperationInfo getOperationInfo() {
-            return new OperationInfo(status.get(), operationType, operationError);
+            return new OperationInfo(status.get(), operationError);
         }
 
         private ResultSet fetchResultsInternal(Supplier<ResultSet> results) {
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 70cf14cee6b..ec370610edb 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -187,11 +186,10 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
 
         startRunningLatch.await();
         assertThat(service.getOperationInfo(sessionHandle, operationHandle))
-                .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
+                .isEqualTo(new OperationInfo(OperationStatus.RUNNING));
 
         endRunningLatch.countDown();
-        OperationInfo expectedInfo =
-                new OperationInfo(OperationStatus.FINISHED, OperationType.UNKNOWN);
+        OperationInfo expectedInfo = new OperationInfo(OperationStatus.FINISHED);
 
         CommonTestUtils.waitUtil(
                 () -> service.getOperationInfo(sessionHandle, operationHandle).equals(expectedInfo),
@@ -230,12 +228,12 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
 
         startRunningLatch.await();
         assertThat(service.getOperationInfo(sessionHandle, operationHandle))
-                .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
+                .isEqualTo(new OperationInfo(OperationStatus.RUNNING));
 
         service.cancelOperation(sessionHandle, operationHandle);
 
         assertThat(service.getOperationInfo(sessionHandle, operationHandle))
-                .isEqualTo(new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN));
+                .isEqualTo(new OperationInfo(OperationStatus.CANCELED));
         service.closeOperation(sessionHandle, operationHandle);
         assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
     }
@@ -537,7 +535,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                             () ->
                                     service.submitOperation(
                                             sessionHandle,
-                                            OperationType.UNKNOWN,
                                             () -> {
                                                 startRunning.countDown();
                                                 terminateRunning.await();
@@ -562,7 +559,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             handles.add(
                     service.submitOperation(
                             sessionHandle,
-                            OperationType.UNKNOWN,
                             () -> {
                                 // If execute in parallel, the value of v may be overridden by
                                 // another thread
@@ -596,7 +592,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             operations.add(
                     service.submitOperation(
                             sessionHandle,
-                            OperationType.UNKNOWN,
                             () -> {
                                 latch.await();
                                 return getDefaultResultSet();
@@ -608,7 +603,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                         () ->
                                 service.submitOperation(
                                         sessionHandle,
-                                        OperationType.UNKNOWN,
                                         () -> {
                                             latch.await();
                                             return getDefaultResultSet();
@@ -627,7 +621,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         CountDownLatch success = new CountDownLatch(1);
         service.submitOperation(
                 sessionHandle,
-                OperationType.UNKNOWN,
                 () -> {
                     success.countDown();
                     return getDefaultResultSet();
@@ -701,7 +694,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             SessionHandle sessionHandle, RunnableWithException executor) {
         return service.submitOperation(
                 sessionHandle,
-                OperationType.UNKNOWN,
                 () -> {
                     executor.run();
                     return getDefaultResultSet();
@@ -735,7 +727,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         OperationHandle operationHandle =
                 service.submitOperation(
                         sessionHandle,
-                        OperationType.UNKNOWN,
                         () -> {
                             operationIsRunning.await();
                             return executor.call();