You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/08 02:26:44 UTC

[flink] branch master updated (437978fdf8a -> 29ca0ad8e33)

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 437978fdf8a [FLINK-28234][table-planner] Fix Infinite or NaN exception in ExpressionReducer
     new 3aaa160bd80 [FLINK-28851][sql-gateway][hive] Remove useless OperationType
     new 29ca0ad8e33 [FLINK-28851][hive] Allow to GetTypeInfo in the HiveServer2 Endpoint

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 36 ++++++---
 .../table/endpoint/hive/HiveServer2Schemas.java    | 44 +++++++++++
 .../hive/util/OperationExecutorFactory.java        | 86 ++++++++++++++++++++++
 .../hive/util/ThriftObjectConversions.java         | 23 +-----
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 84 +++++++++++++++++++--
 .../hive/util/ThriftObjectConversionsTest.java     |  4 +-
 .../flink/table/gateway/api/SqlGatewayService.java |  5 +-
 .../table/gateway/api/operation/OperationType.java | 40 ----------
 .../table/gateway/api/results/OperationInfo.java   | 22 ++----
 .../gateway/api/utils/MockedSqlGatewayService.java |  4 +-
 .../gateway/service/SqlGatewayServiceImpl.java     |  7 +-
 .../service/operation/OperationManager.java        | 22 ++----
 .../gateway/service/SqlGatewayServiceITCase.java   | 17 +----
 13 files changed, 252 insertions(+), 142 deletions(-)
 delete mode 100644 flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java


[flink] 02/02: [FLINK-28851][hive] Allow to GetTypeInfo in the HiveServer2 Endpoint

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 29ca0ad8e33ff6391450c5d9b29f484373ce824a
Author: zhaoweinan <32...@qq.com>
AuthorDate: Fri Aug 5 16:58:01 2022 +0800

    [FLINK-28851][hive] Allow to GetTypeInfo in the HiveServer2 Endpoint
    
    This closes #20468
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 23 +++++-
 .../table/endpoint/hive/HiveServer2Schemas.java    | 44 +++++++++++
 .../hive/util/OperationExecutorFactory.java        | 86 ++++++++++++++++++++++
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 79 ++++++++++++++++++--
 4 files changed, 223 insertions(+), 9 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index ca99cad3191..bcae8c73a42 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -125,6 +125,7 @@ import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.g
 import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
 import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
 import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetSchemasExecutor;
+import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTableInfoExecutor;
 import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTablesExecutor;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFetchOrientation;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFlinkTableKinds;
@@ -136,6 +137,7 @@ import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema;
+import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -391,7 +393,20 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
     @Override
     public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq tGetTypeInfoReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetTypeInfoResp resp = new TGetTypeInfoResp();
+        try {
+            SessionHandle sessionHandle = toSessionHandle(tGetTypeInfoReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(sessionHandle, createGetTableInfoExecutor());
+            resp.setStatus(OK_STATUS);
+            resp.setOperationHandle(
+                    toTOperationHandle(
+                            sessionHandle, operationHandle, TOperationType.GET_TYPE_INFO));
+        } catch (Throwable t) {
+            LOG.error("Failed to GetTypeInfo.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
@@ -447,7 +462,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             OperationHandle operationHandle =
                     service.submitOperation(
                             sessionHandle,
-                            OperationType.LIST_TABLES,
                             createGetTablesExecutor(
                                     service,
                                     sessionHandle,
@@ -458,7 +472,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
             resp.setStatus(OK_STATUS);
             resp.setOperationHandle(
-                    toTOperationHandle(sessionHandle, operationHandle, OperationType.LIST_TABLES));
+                    toTOperationHandle(sessionHandle, operationHandle, TOperationType.GET_TABLES));
         } catch (Throwable t) {
             LOG.error("Failed to GetTables.", t);
             resp.setStatus(toTStatus(t));
@@ -601,7 +615,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                             toFetchOrientation(tFetchResultsReq.getFetchType()),
                             maxRows);
             resp.setStatus(OK_STATUS);
-            resp.setHasMoreRows(resultSet.getResultType() != ResultSet.ResultType.EOS);
+            resp.setHasMoreRows(resultSet.getResultType() != EOS);
             EndpointVersion sessionEndpointVersion =
                     service.getSessionEndpointVersion(sessionHandle);
 
@@ -644,6 +658,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     }
 
     // CHECKSTYLE.OFF: MethodName
+
     /** To be compatible with Hive3, add a default implementation. */
     public TGetQueryIdResp GetQueryId(TGetQueryIdReq tGetQueryIdReq) throws TException {
         throw new UnsupportedOperationException(ERROR_MESSAGE);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
index d553234cc94..ad7743d1be1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
@@ -78,4 +78,48 @@ public class HiveServer2Schemas {
                                                     "Specifies how values in SELF_REFERENCING_COL_NAME are created."))),
                     Collections.emptyList(),
                     null);
+
+    /** Schema for {@link HiveServer2Endpoint#GetTypeInfo}. */
+    public static final ResolvedSchema GET_TYPE_INFO_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("TYPE_NAME", DataTypes.STRING())
+                                    .withComment("Type name."),
+                            Column.physical("DATA_TYPE", DataTypes.INT())
+                                    .withComment("SQL data type from java.sql.Types."),
+                            Column.physical("PRECISION", DataTypes.INT())
+                                    .withComment("Maximum precision."),
+                            Column.physical("LITERAL_PREFIX", DataTypes.STRING())
+                                    .withComment("Prefix used to quote a literal (may be null)."),
+                            Column.physical("LITERAL_SUFFIX", DataTypes.STRING())
+                                    .withComment("Suffix used to quote a literal (may be null)."),
+                            Column.physical("CREATE_PARAMS", DataTypes.STRING())
+                                    .withComment(
+                                            "Parameters used in creating the type (may be null)."),
+                            Column.physical("NULLABLE", DataTypes.SMALLINT())
+                                    .withComment("Can you use NULL for this type."),
+                            Column.physical("CASE_SENSITIVE", DataTypes.BOOLEAN())
+                                    .withComment("Is it case sensitive."),
+                            Column.physical("SEARCHABLE", DataTypes.SMALLINT())
+                                    .withComment("Can you use \"WHERE\" based on this type."),
+                            Column.physical("UNSIGNED_ATTRIBUTE", DataTypes.BOOLEAN())
+                                    .withComment("Is it unsigned."),
+                            Column.physical("FIXED_PREC_SCALE", DataTypes.BOOLEAN())
+                                    .withComment("Can it be a money value."),
+                            Column.physical("AUTO_INCREMENT", DataTypes.BOOLEAN())
+                                    .withComment("Can it be used for an auto-increment value."),
+                            Column.physical("LOCAL_TYPE_NAME", DataTypes.STRING())
+                                    .withComment("Localized version of type name (may be null)."),
+                            Column.physical("MINIMUM_SCALE", DataTypes.SMALLINT())
+                                    .withComment("Minimum scale supported."),
+                            Column.physical("MAXIMUM_SCALE", DataTypes.SMALLINT())
+                                    .withComment("Maximum scale supported."),
+                            Column.physical("SQL_DATA_TYPE", DataTypes.INT())
+                                    .withComment("Unused."),
+                            Column.physical("SQL_DATETIME_SUB", DataTypes.INT())
+                                    .withComment("Unused."),
+                            Column.physical("NUM_PREC_RADIX", DataTypes.INT())
+                                    .withComment("Usually 2 or 10.")),
+                    Collections.emptyList(),
+                    null);
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
index c69a546a502..44095b1c24a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
@@ -26,9 +26,14 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.TableInfo;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 
+import org.apache.hadoop.hive.serde2.thrift.Type;
+
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.function.Function;
@@ -38,7 +43,28 @@ import java.util.stream.Collectors;
 import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
 import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
 import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TABLES_SCHEMA;
+import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TYPE_INFO_SCHEMA;
 import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
+import static org.apache.hadoop.hive.serde2.thrift.Type.ARRAY_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.BIGINT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.BINARY_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.BOOLEAN_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.CHAR_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.DATE_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.DECIMAL_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.DOUBLE_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.FLOAT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.INTERVAL_DAY_TIME_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.INTERVAL_YEAR_MONTH_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.INT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.MAP_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.NULL_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.SMALLINT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.STRING_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.STRUCT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.TIMESTAMP_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.TINYINT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.VARCHAR_TYPE;
 
 /** Factory to create the operation executor. */
 public class OperationExecutorFactory {
@@ -68,6 +94,39 @@ public class OperationExecutorFactory {
                         service, sessionHandle, catalogName, schemaName, tableName, tableKinds);
     }
 
+    public static Callable<ResultSet> createGetTableInfoExecutor() {
+        return () ->
+                new ResultSet(
+                        EOS,
+                        null,
+                        GET_TYPE_INFO_SCHEMA,
+                        getSupportedHiveType().stream()
+                                .map(
+                                        type ->
+                                                wrap(
+                                                        type.getName(), // TYPE_NAME
+                                                        type.toJavaSQLType(), // DATA_TYPE
+                                                        type.getMaxPrecision(), // PRECISION
+                                                        type.getLiteralPrefix(), // LITERAL_PREFIX
+                                                        type.getLiteralSuffix(), // LITERAL_SUFFIX
+                                                        type.getCreateParams(), // CREATE_PARAMS
+                                                        type.getNullable(), // NULLABLE
+                                                        type.isCaseSensitive(), // CASE_SENSITIVE
+                                                        type.getSearchable(), // SEARCHABLE
+                                                        type
+                                                                .isUnsignedAttribute(), // UNSIGNED_ATTRIBUTE
+                                                        type.isFixedPrecScale(), // FIXED_PREC_SCALE
+                                                        type.isAutoIncrement(), // AUTO_INCREMENT
+                                                        type.getLocalizedName(), // LOCAL_TYPE_NAME
+                                                        type.getMinimumScale(), // MINIMUM_SCALE
+                                                        type.getMaximumScale(), // MAXIMUM_SCALE
+                                                        null, // SQL_DATA_TYPE, unused
+                                                        null, // SQL_DATETIME_SUB, unused
+                                                        type.getNumPrecRadix() // NUM_PREC_RADIX
+                                                        ))
+                                .collect(Collectors.toList()));
+    }
+
     // --------------------------------------------------------------------------------------------
     // Executors
     // --------------------------------------------------------------------------------------------
@@ -198,6 +257,8 @@ public class OperationExecutorFactory {
                     pack[i] = element;
                 } else if (element instanceof Short) {
                     pack[i] = element;
+                } else if (element instanceof Boolean) {
+                    pack[i] = element;
                 } else {
                     throw new UnsupportedOperationException(
                             String.format(
@@ -208,4 +269,29 @@ public class OperationExecutorFactory {
         }
         return GenericRowData.of(pack);
     }
+
+    private static List<Type> getSupportedHiveType() {
+        return Collections.unmodifiableList(
+                Arrays.asList(
+                        NULL_TYPE,
+                        BOOLEAN_TYPE,
+                        STRING_TYPE,
+                        BINARY_TYPE,
+                        TINYINT_TYPE,
+                        SMALLINT_TYPE,
+                        INT_TYPE,
+                        BIGINT_TYPE,
+                        FLOAT_TYPE,
+                        DOUBLE_TYPE,
+                        DECIMAL_TYPE,
+                        DATE_TYPE,
+                        TIMESTAMP_TYPE,
+                        ARRAY_TYPE,
+                        MAP_TYPE,
+                        STRUCT_TYPE,
+                        CHAR_TYPE,
+                        VARCHAR_TYPE,
+                        INTERVAL_YEAR_MONTH_TYPE,
+                        INTERVAL_DAY_TIME_TYPE));
+    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index c287a93e15e..120d3786b26 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -71,9 +71,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
@@ -404,6 +408,40 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                 null)));
     }
 
+    @Test
+    public void testGetTypeInfo() throws Exception {
+        runGetObjectTest(
+                connection -> connection.getMetaData().getTypeInfo(),
+                getExpectedGetTypeInfoSchema(),
+                types ->
+                        assertThat(
+                                        types.stream()
+                                                .map(type -> type.get(0))
+                                                .collect(Collectors.toList()))
+                                .isEqualTo(
+                                        Arrays.asList(
+                                                "VOID",
+                                                "BOOLEAN",
+                                                "STRING",
+                                                "BINARY",
+                                                "TINYINT",
+                                                "SMALLINT",
+                                                "INT",
+                                                "BIGINT",
+                                                "FLOAT",
+                                                "DOUBLE",
+                                                "DECIMAL",
+                                                "DATE",
+                                                "TIMESTAMP",
+                                                "ARRAY",
+                                                "MAP",
+                                                "STRUCT",
+                                                "CHAR",
+                                                "VARCHAR",
+                                                "INTERVAL_YEAR_MONTH",
+                                                "INTERVAL_DAY_TIME")));
+    }
+
     // --------------------------------------------------------------------------------------------
 
     private Connection getInitializedConnection() throws Exception {
@@ -444,11 +482,21 @@ public class HiveServer2EndpointITCase extends TestLogger {
             ResolvedSchema expectedSchema,
             List<List<Object>> expectedResults)
             throws Exception {
+        runGetObjectTest(
+                resultSetSupplier,
+                expectedSchema,
+                result -> assertThat(result).isEqualTo(new HashSet<>(expectedResults)));
+    }
+
+    private void runGetObjectTest(
+            FunctionWithException<Connection, java.sql.ResultSet, Exception> resultSetSupplier,
+            ResolvedSchema expectedSchema,
+            Consumer<Set<List<Object>>> validator)
+            throws Exception {
         try (Connection connection = getInitializedConnection();
                 java.sql.ResultSet result = resultSetSupplier.apply(connection)) {
             assertSchemaEquals(expectedSchema, result.getMetaData());
-            assertThat(new HashSet<>(collect(result, expectedSchema.getColumnCount())))
-                    .isEqualTo(new HashSet<>(expectedResults));
+            validator.accept(collect(result, expectedSchema.getColumnCount()));
         }
     }
 
@@ -511,6 +559,28 @@ public class HiveServer2EndpointITCase extends TestLogger {
                 Column.physical("REF_GENERATION", DataTypes.STRING()));
     }
 
+    private ResolvedSchema getExpectedGetTypeInfoSchema() {
+        return ResolvedSchema.of(
+                Column.physical("TYPE_NAME", DataTypes.STRING()),
+                Column.physical("DATA_TYPE", DataTypes.INT()),
+                Column.physical("PRECISION", DataTypes.INT()),
+                Column.physical("LITERAL_PREFIX", DataTypes.STRING()),
+                Column.physical("LITERAL_SUFFIX", DataTypes.STRING()),
+                Column.physical("CREATE_PARAMS", DataTypes.STRING()),
+                Column.physical("NULLABLE", DataTypes.SMALLINT()),
+                Column.physical("CASE_SENSITIVE", DataTypes.BOOLEAN()),
+                Column.physical("SEARCHABLE", DataTypes.SMALLINT()),
+                Column.physical("UNSIGNED_ATTRIBUTE", DataTypes.BOOLEAN()),
+                Column.physical("FIXED_PREC_SCALE", DataTypes.BOOLEAN()),
+                Column.physical("AUTO_INCREMENT", DataTypes.BOOLEAN()),
+                Column.physical("LOCAL_TYPE_NAME", DataTypes.STRING()),
+                Column.physical("MINIMUM_SCALE", DataTypes.SMALLINT()),
+                Column.physical("MAXIMUM_SCALE", DataTypes.SMALLINT()),
+                Column.physical("SQL_DATA_TYPE", DataTypes.INT()),
+                Column.physical("SQL_DATETIME_SUB", DataTypes.INT()),
+                Column.physical("NUM_PREC_RADIX", DataTypes.INT()));
+    }
+
     private void assertSchemaEquals(ResolvedSchema expected, ResultSetMetaData metaData)
             throws Exception {
         assertThat(metaData.getColumnCount()).isEqualTo(expected.getColumnCount());
@@ -526,8 +596,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
         }
     }
 
-    private List<List<Object>> collect(java.sql.ResultSet result, int columnCount)
-            throws Exception {
+    private Set<List<Object>> collect(java.sql.ResultSet result, int columnCount) throws Exception {
         List<List<Object>> actual = new ArrayList<>();
         while (result.next()) {
             List<Object> row = new ArrayList<>();
@@ -536,6 +605,6 @@ public class HiveServer2EndpointITCase extends TestLogger {
             }
             actual.add(row);
         }
-        return actual;
+        return new LinkedHashSet<>(actual);
     }
 }


[flink] 01/02: [FLINK-28851][sql-gateway][hive] Remove useless OperationType

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3aaa160bd80a5e89447118fa1bc9cfea6bd368d1
Author: Shengkai <10...@qq.com>
AuthorDate: Sun Aug 7 17:52:38 2022 +0800

    [FLINK-28851][sql-gateway][hive] Remove useless OperationType
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 13 +++----
 .../hive/util/ThriftObjectConversions.java         | 23 ++-----------
 .../endpoint/hive/HiveServer2EndpointITCase.java   |  5 ++-
 .../hive/util/ThriftObjectConversionsTest.java     |  4 +--
 .../flink/table/gateway/api/SqlGatewayService.java |  5 +--
 .../table/gateway/api/operation/OperationType.java | 40 ----------------------
 .../table/gateway/api/results/OperationInfo.java   | 22 +++---------
 .../gateway/api/utils/MockedSqlGatewayService.java |  4 +--
 .../gateway/service/SqlGatewayServiceImpl.java     |  7 ++--
 .../service/operation/OperationManager.java        | 22 +++---------
 .../gateway/service/SqlGatewayServiceITCase.java   | 17 +++------
 11 files changed, 29 insertions(+), 133 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 07c383e516e..ca99cad3191 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
@@ -88,6 +87,7 @@ import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationType;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
 import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
 import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
@@ -381,7 +381,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
             resp.setOperationHandle(
                     toTOperationHandle(
-                            sessionHandle, operationHandle, OperationType.EXECUTE_STATEMENT));
+                            sessionHandle, operationHandle, TOperationType.EXECUTE_STATEMENT));
         } catch (Throwable t) {
             LOG.error("Failed to ExecuteStatement.", t);
             resp.setStatus(toTStatus(t));
@@ -401,13 +401,11 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             SessionHandle sessionHandle = toSessionHandle(tGetCatalogsReq.getSessionHandle());
             OperationHandle operationHandle =
                     service.submitOperation(
-                            sessionHandle,
-                            OperationType.LIST_CATALOGS,
-                            createGetCatalogsExecutor(service, sessionHandle));
+                            sessionHandle, createGetCatalogsExecutor(service, sessionHandle));
             resp.setStatus(OK_STATUS);
             resp.setOperationHandle(
                     toTOperationHandle(
-                            sessionHandle, operationHandle, OperationType.LIST_CATALOGS));
+                            sessionHandle, operationHandle, TOperationType.GET_CATALOGS));
         } catch (Throwable t) {
             LOG.error("Failed to GetCatalogs.", t);
             resp.setStatus(toTStatus(t));
@@ -423,7 +421,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             OperationHandle operationHandle =
                     service.submitOperation(
                             sessionHandle,
-                            OperationType.LIST_SCHEMAS,
                             createGetSchemasExecutor(
                                     service,
                                     sessionHandle,
@@ -432,7 +429,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
             resp.setStatus(OK_STATUS);
             resp.setOperationHandle(
-                    toTOperationHandle(sessionHandle, operationHandle, OperationType.LIST_SCHEMAS));
+                    toTOperationHandle(sessionHandle, operationHandle, TOperationType.GET_SCHEMAS));
         } catch (Throwable t) {
             LOG.error("Failed to GetSchemas.", t);
             resp.setStatus(toTStatus(t));
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 8577fbd16ff..d78d4cbcb73 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.types.DataType;
@@ -144,10 +143,10 @@ public class ThriftObjectConversions {
     public static TOperationHandle toTOperationHandle(
             SessionHandle sessionHandle,
             OperationHandle operationHandle,
-            OperationType operationType) {
+            TOperationType operationType) {
         return new TOperationHandle(
                 toTHandleIdentifier(operationHandle.getIdentifier(), sessionHandle.getIdentifier()),
-                toTOperationType(operationType),
+                operationType,
                 true);
     }
 
@@ -165,24 +164,6 @@ public class ThriftObjectConversions {
     // Operation related conversions
     // --------------------------------------------------------------------------------------------
 
-    public static TOperationType toTOperationType(OperationType type) {
-        switch (type) {
-            case EXECUTE_STATEMENT:
-                return TOperationType.EXECUTE_STATEMENT;
-            case LIST_CATALOGS:
-                return TOperationType.GET_CATALOGS;
-            case LIST_SCHEMAS:
-                return TOperationType.GET_SCHEMAS;
-            case LIST_TABLES:
-                return TOperationType.GET_TABLES;
-            case UNKNOWN:
-                return TOperationType.UNKNOWN;
-            default:
-                throw new IllegalArgumentException(
-                        String.format("Unknown operation type: %s.", type));
-        }
-    }
-
     public static TOperationState toTOperationState(OperationStatus operationStatus) {
         switch (operationStatus) {
             case INITIALIZED:
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index d3737a79009..c287a93e15e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
 import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
@@ -54,6 +53,7 @@ import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationType;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TTransport;
@@ -471,13 +471,12 @@ public class HiveServer2EndpointITCase extends TestLogger {
                         .getService()
                         .submitOperation(
                                 sessionHandle,
-                                OperationType.UNKNOWN,
                                 () -> {
                                     latch.await();
                                     return ResultSet.NOT_READY_RESULTS;
                                 });
         manipulateOp.accept(
-                toTOperationHandle(sessionHandle, operationHandle, OperationType.UNKNOWN));
+                toTOperationHandle(sessionHandle, operationHandle, TOperationType.UNKNOWN));
         operationValidator.accept(sessionHandle, operationHandle);
         SQL_GATEWAY_SERVICE_EXTENSION.getService().closeSession(sessionHandle);
     }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
index 2a5ffb3561d..4b7655d3c07 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -45,6 +44,7 @@ import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.operation.ClassicTableTypeMapping.ClassicTableTypes;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
 import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
 import org.apache.hive.service.rpc.thrift.TRowSet;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -118,7 +118,7 @@ class ThriftObjectConversionsTest {
         OperationHandle originOperationHandle = OperationHandle.create();
         TOperationHandle tOperationHandle =
                 toTOperationHandle(
-                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN);
+                        originSessionHandle, originOperationHandle, TOperationType.UNKNOWN);
 
         assertThat(toSessionHandle(tOperationHandle)).isEqualTo(originSessionHandle);
         assertThat(toOperationHandle(tOperationHandle)).isEqualTo(originOperationHandle);
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index cb4cb3d9392..b1e832c07b7 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -87,12 +86,10 @@ public interface SqlGatewayService {
      * execution and assign the {@link OperationHandle} for later to retrieve the results.
      *
      * @param sessionHandle handle to identify the session.
-     * @param type describe the operation type.
      * @param executor the main logic to get the execution results.
      * @return Returns the handle for later retrieve results.
      */
-    OperationHandle submitOperation(
-            SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor)
+    OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor)
             throws SqlGatewayException;
 
     /**
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
deleted file mode 100644
index a41012846d1..00000000000
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.gateway.api.operation;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/** The Operation Type. */
-@PublicEvolving
-public enum OperationType {
-    /** The type indicates the operation executes statements. */
-    EXECUTE_STATEMENT,
-
-    /** The type indicates the operation list catalogs. */
-    LIST_CATALOGS,
-
-    /** The type indicates the operation list schemas. */
-    LIST_SCHEMAS,
-
-    /** The type indicates the operation list tables. */
-    LIST_TABLES,
-
-    /** The type indicates the operation is unknown. */
-    UNKNOWN;
-}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
index a85404f8400..b05860e6b26 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.gateway.api.results;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nullable;
@@ -33,24 +32,17 @@ import java.util.Optional;
 public class OperationInfo {
 
     private final OperationStatus status;
-    private final OperationType type;
     @Nullable private final Exception exception;
 
-    public OperationInfo(OperationStatus status, OperationType type) {
-        this(status, type, null);
+    public OperationInfo(OperationStatus status) {
+        this(status, null);
     }
 
-    public OperationInfo(
-            OperationStatus status, OperationType type, @Nullable Exception exception) {
+    public OperationInfo(OperationStatus status, @Nullable Exception exception) {
         this.status = status;
-        this.type = type;
         this.exception = exception;
     }
 
-    public OperationType getType() {
-        return type;
-    }
-
     public OperationStatus getStatus() {
         return status;
     }
@@ -68,14 +60,12 @@ public class OperationInfo {
             return false;
         }
         OperationInfo that = (OperationInfo) o;
-        return status == that.status
-                && type == that.type
-                && Objects.equals(exception, that.exception);
+        return status == that.status && Objects.equals(exception, that.exception);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(status, type, exception);
+        return Objects.hash(status, exception);
     }
 
     @Override
@@ -83,8 +73,6 @@ public class OperationInfo {
         return "OperationInfo{"
                 + "status="
                 + status
-                + ", type="
-                + type
                 + ", exception="
                 + (exception == null ? "null" : ExceptionUtils.stringifyException(exception))
                 + '}';
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index 9075a8066fb..aff8fd6e512 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -63,8 +62,7 @@ public class MockedSqlGatewayService implements SqlGatewayService {
 
     @Override
     public OperationHandle submitOperation(
-            SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor)
-            throws SqlGatewayException {
+            SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException {
         throw new UnsupportedOperationException();
     }
 
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index e9e886589eb..174918270ec 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -98,10 +97,9 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
 
     @Override
     public OperationHandle submitOperation(
-            SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor)
-            throws SqlGatewayException {
+            SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException {
         try {
-            return getSession(sessionHandle).getOperationManager().submitOperation(type, executor);
+            return getSession(sessionHandle).getOperationManager().submitOperation(executor);
         } catch (Throwable e) {
             LOG.error("Failed to submitOperation.", e);
             throw new SqlGatewayException("Failed to submitOperation.", e);
@@ -172,7 +170,6 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
             return getSession(sessionHandle)
                     .getOperationManager()
                     .submitOperation(
-                            OperationType.EXECUTE_STATEMENT,
                             handle ->
                                     getSession(sessionHandle)
                                             .createExecutor(executionConfig)
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index f493c24f868..d001b7c3062 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
@@ -80,17 +79,14 @@ public class OperationManager {
      * the lifecycle of the {@link Operation}, including register resources, fire the execution and
      * so on.
      *
-     * @param operationType The type of the submitted operation.
      * @param executor Worker to execute.
      * @return OperationHandle to fetch the results or check the status.
      */
-    public OperationHandle submitOperation(
-            OperationType operationType, Callable<ResultSet> executor) {
+    public OperationHandle submitOperation(Callable<ResultSet> executor) {
         OperationHandle handle = OperationHandle.create();
         Operation operation =
                 new Operation(
                         handle,
-                        operationType,
                         () -> {
                             ResultSet resultSet = executor.call();
                             return new ResultFetcher(
@@ -106,15 +102,13 @@ public class OperationManager {
      * lifecycle of the {@link Operation}, including register resources, fire the execution and so
      * on.
      *
-     * @param operationType The type of the submitted operation.
      * @param fetcherSupplier offer the fetcher to get the results.
      * @return OperationHandle to fetch the results or check the status.
      */
     public OperationHandle submitOperation(
-            OperationType operationType, Function<OperationHandle, ResultFetcher> fetcherSupplier) {
+            Function<OperationHandle, ResultFetcher> fetcherSupplier) {
         OperationHandle handle = OperationHandle.create();
-        Operation operation =
-                new Operation(handle, operationType, () -> fetcherSupplier.apply(handle));
+        Operation operation = new Operation(handle, () -> fetcherSupplier.apply(handle));
         submitOperationInternal(handle, operation);
         return handle;
     }
@@ -210,22 +204,16 @@ public class OperationManager {
 
         private final OperationHandle operationHandle;
 
-        private final OperationType operationType;
         private final AtomicReference<OperationStatus> status;
-
         private final Callable<ResultFetcher> resultSupplier;
 
         private volatile FutureTask<?> invocation;
         private volatile ResultFetcher resultFetcher;
         private volatile SqlExecutionException operationError;
 
-        public Operation(
-                OperationHandle operationHandle,
-                OperationType operationType,
-                Callable<ResultFetcher> resultSupplier) {
+        public Operation(OperationHandle operationHandle, Callable<ResultFetcher> resultSupplier) {
             this.operationHandle = operationHandle;
             this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
-            this.operationType = operationType;
             this.resultSupplier = resultSupplier;
         }
 
@@ -331,7 +319,7 @@ public class OperationManager {
         }
 
         public OperationInfo getOperationInfo() {
-            return new OperationInfo(status.get(), operationType, operationError);
+            return new OperationInfo(status.get(), operationError);
         }
 
         private ResultSet fetchResultsInternal(Supplier<ResultSet> results) {
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 70cf14cee6b..ec370610edb 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
-import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -187,11 +186,10 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
 
         startRunningLatch.await();
         assertThat(service.getOperationInfo(sessionHandle, operationHandle))
-                .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
+                .isEqualTo(new OperationInfo(OperationStatus.RUNNING));
 
         endRunningLatch.countDown();
-        OperationInfo expectedInfo =
-                new OperationInfo(OperationStatus.FINISHED, OperationType.UNKNOWN);
+        OperationInfo expectedInfo = new OperationInfo(OperationStatus.FINISHED);
 
         CommonTestUtils.waitUtil(
                 () -> service.getOperationInfo(sessionHandle, operationHandle).equals(expectedInfo),
@@ -230,12 +228,12 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
 
         startRunningLatch.await();
         assertThat(service.getOperationInfo(sessionHandle, operationHandle))
-                .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
+                .isEqualTo(new OperationInfo(OperationStatus.RUNNING));
 
         service.cancelOperation(sessionHandle, operationHandle);
 
         assertThat(service.getOperationInfo(sessionHandle, operationHandle))
-                .isEqualTo(new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN));
+                .isEqualTo(new OperationInfo(OperationStatus.CANCELED));
         service.closeOperation(sessionHandle, operationHandle);
         assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
     }
@@ -537,7 +535,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                             () ->
                                     service.submitOperation(
                                             sessionHandle,
-                                            OperationType.UNKNOWN,
                                             () -> {
                                                 startRunning.countDown();
                                                 terminateRunning.await();
@@ -562,7 +559,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             handles.add(
                     service.submitOperation(
                             sessionHandle,
-                            OperationType.UNKNOWN,
                             () -> {
                                 // If execute in parallel, the value of v may be overridden by
                                 // another thread
@@ -596,7 +592,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             operations.add(
                     service.submitOperation(
                             sessionHandle,
-                            OperationType.UNKNOWN,
                             () -> {
                                 latch.await();
                                 return getDefaultResultSet();
@@ -608,7 +603,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                         () ->
                                 service.submitOperation(
                                         sessionHandle,
-                                        OperationType.UNKNOWN,
                                         () -> {
                                             latch.await();
                                             return getDefaultResultSet();
@@ -627,7 +621,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         CountDownLatch success = new CountDownLatch(1);
         service.submitOperation(
                 sessionHandle,
-                OperationType.UNKNOWN,
                 () -> {
                     success.countDown();
                     return getDefaultResultSet();
@@ -701,7 +694,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             SessionHandle sessionHandle, RunnableWithException executor) {
         return service.submitOperation(
                 sessionHandle,
-                OperationType.UNKNOWN,
                 () -> {
                     executor.run();
                     return getDefaultResultSet();
@@ -735,7 +727,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         OperationHandle operationHandle =
                 service.submitOperation(
                         sessionHandle,
-                        OperationType.UNKNOWN,
                         () -> {
                             operationIsRunning.await();
                             return executor.call();