You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/29 01:59:12 UTC

[flink] 03/04: [FLINK-28152][sql-gateway][hive] Support GetOperationStatus and GetResultSetMetadata for HiveServer2Endpoint

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f477a43ff23576cd2e1f8c632f78458948245df4
Author: Shengkai <10...@qq.com>
AuthorDate: Sun Jul 24 15:23:46 2022 +0800

    [FLINK-28152][sql-gateway][hive] Support GetOperationStatus and GetResultSetMetadata for HiveServer2Endpoint
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   |  51 +++++-
 .../hive/util/ThriftObjectConversions.java         | 186 ++++++++++++++++++++-
 .../hive/util/ThriftObjectConversionsTest.java     | 139 +++++++++++++++
 .../flink/table/gateway/api/SqlGatewayService.java |  11 ++
 .../table/gateway/api/results/OperationInfo.java   |  26 ++-
 .../gateway/api/utils/MockedSqlGatewayService.java |   8 +
 .../gateway/service/SqlGatewayServiceImpl.java     |  15 ++
 .../service/operation/OperationManager.java        |  22 ++-
 .../gateway/service/result/ResultFetcher.java      |   4 +
 .../gateway/service/SqlGatewayServiceITCase.java   |   8 +-
 .../service/SqlGatewayServiceStatementITCase.java  |   4 -
 11 files changed, 449 insertions(+), 25 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 262177a547e..2595860de6b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -22,16 +22,21 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.api.utils.ThreadUtils;
 import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.hive.HiveModule;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.service.rpc.thrift.TCLIService;
@@ -75,6 +80,7 @@ import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
 import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
 import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
 import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
@@ -107,9 +113,12 @@ import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIA
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
 import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
 import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationState;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -218,9 +227,10 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
         this.catalogName = checkNotNull(catalogName);
         this.hiveConfPath = hiveConfPath;
         this.defaultDatabase = defaultDatabase;
-        this.allowEmbedded = allowEmbedded;
 
         this.moduleName = moduleName;
+
+        this.allowEmbedded = allowEmbedded;
     }
 
     @Override
@@ -375,7 +385,27 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq tGetOperationStatusReq)
             throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetOperationStatusResp resp = new TGetOperationStatusResp();
+        try {
+            TOperationHandle operationHandle = tGetOperationStatusReq.getOperationHandle();
+            OperationInfo operationInfo =
+                    service.getOperationInfo(
+                            toSessionHandle(operationHandle), toOperationHandle(operationHandle));
+            resp.setStatus(OK_STATUS);
+            // TODO: support completed time / start time
+            resp.setOperationState(toTOperationState(operationInfo.getStatus()));
+            // Currently, all operations have results.
+            resp.setHasResultSet(true);
+            if (operationInfo.getStatus().equals(OperationStatus.ERROR)
+                    && operationInfo.getException().isPresent()) {
+                resp.setErrorMessage(
+                        ExceptionUtils.stringifyException(operationInfo.getException().get()));
+            }
+        } catch (Throwable t) {
+            LOG.error("Failed to GetOperationStatus.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
@@ -393,7 +423,22 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public TGetResultSetMetadataResp GetResultSetMetadata(
             TGetResultSetMetadataReq tGetResultSetMetadataReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
+        try {
+            SessionHandle sessionHandle =
+                    toSessionHandle(tGetResultSetMetadataReq.getOperationHandle());
+            OperationHandle operationHandle =
+                    toOperationHandle(tGetResultSetMetadataReq.getOperationHandle());
+            ResolvedSchema schema =
+                    service.getOperationResultSchema(sessionHandle, operationHandle);
+
+            resp.setStatus(OK_STATUS);
+            resp.setSchema(toTTableSchema(schema));
+        } catch (Throwable t) {
+            LOG.warn("Failed to GetResultSetMetadata.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 49ece510191..948771c1401 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -18,16 +18,42 @@
 
 package org.apache.flink.table.endpoint.hive.util;
 
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
 
+import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.service.rpc.thrift.TCLIServiceConstants;
+import org.apache.hive.service.rpc.thrift.TColumnDesc;
 import org.apache.hive.service.rpc.thrift.THandleIdentifier;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
+import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry;
 import org.apache.hive.service.rpc.thrift.TSessionHandle;
 import org.apache.hive.service.rpc.thrift.TStatus;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
+import org.apache.hive.service.rpc.thrift.TTypeEntry;
+import org.apache.hive.service.rpc.thrift.TTypeQualifierValue;
+import org.apache.hive.service.rpc.thrift.TTypeQualifiers;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 /** Conversion between thrift object and flink object. */
@@ -40,7 +66,7 @@ public class ThriftObjectConversions {
     // --------------------------------------------------------------------------------------------
 
     public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) {
-        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
+        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier(), SECRET_ID));
     }
 
     public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) {
@@ -48,6 +74,115 @@ public class ThriftObjectConversions {
         return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
     }
 
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle && OperationHandle from/to Hive OperationHandle
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Convert {@link SessionHandle} and {@link OperationHandle} to {@link TOperationHandle}.
+     *
+     * <p>Hive uses {@link TOperationHandle} to retrieve the {@code Operation} related information.
+     * However, SqlGateway uses {@link SessionHandle} and {@link OperationHandle} to determine.
+     * Therefore, the {@link TOperationHandle} needs to contain both {@link SessionHandle} and
+     * {@link OperationHandle}.
+     *
+     * <p>Currently all operations in the {@link SqlGatewayService} has data. Therefore, set the
+     * {@code TOperationHandle#hasResultSet} true.
+     */
+    public static TOperationHandle toTOperationHandle(
+            SessionHandle sessionHandle,
+            OperationHandle operationHandle,
+            OperationType operationType) {
+        return new TOperationHandle(
+                toTHandleIdentifier(operationHandle.getIdentifier(), sessionHandle.getIdentifier()),
+                toTOperationType(operationType),
+                true);
+    }
+
+    public static SessionHandle toSessionHandle(TOperationHandle tOperationHandle) {
+        ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getSecret());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    public static OperationHandle toOperationHandle(TOperationHandle tOperationHandle) {
+        ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getGuid());
+        return new OperationHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Operation related conversions
+    // --------------------------------------------------------------------------------------------
+
+    public static TOperationType toTOperationType(OperationType type) {
+        switch (type) {
+            case EXECUTE_STATEMENT:
+                return TOperationType.EXECUTE_STATEMENT;
+            case UNKNOWN:
+                return TOperationType.UNKNOWN;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unknown operation type: %s.", type));
+        }
+    }
+
+    public static TOperationState toTOperationState(OperationStatus operationStatus) {
+        switch (operationStatus) {
+            case INITIALIZED:
+                return TOperationState.INITIALIZED_STATE;
+            case PENDING:
+                return TOperationState.PENDING_STATE;
+            case RUNNING:
+                return TOperationState.RUNNING_STATE;
+            case FINISHED:
+                return TOperationState.FINISHED_STATE;
+            case ERROR:
+                return TOperationState.ERROR_STATE;
+            case TIMEOUT:
+                return TOperationState.TIMEDOUT_STATE;
+            case CANCELED:
+                return TOperationState.CANCELED_STATE;
+            case CLOSED:
+                return TOperationState.CLOSED_STATE;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unknown operation status: %s.", operationStatus));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Statement related conversions
+    // --------------------------------------------------------------------------------------------
+
+    /** Similar logic in the {@code org.apache.hive.service.cli.ColumnDescriptor}. */
+    public static TTableSchema toTTableSchema(ResolvedSchema schema) {
+        TTableSchema tSchema = new TTableSchema();
+
+        for (int i = 0; i < schema.getColumnCount(); i++) {
+            Column column = schema.getColumns().get(i);
+            TColumnDesc desc = new TColumnDesc();
+            desc.setColumnName(column.getName());
+            column.getComment().ifPresent(desc::setComment);
+            desc.setPosition(i);
+
+            TTypeDesc typeDesc = new TTypeDesc();
+
+            // Hive uses the TPrimitiveTypeEntry only. Please refer to TypeDescriptor#toTTypeDesc.
+            DataType columnType = column.getDataType();
+            TPrimitiveTypeEntry typeEntry =
+                    new TPrimitiveTypeEntry(
+                            Type.getType(HiveTypeUtil.toHiveTypeInfo(columnType, false)).toTType());
+
+            if (hasTypeQualifiers(columnType.getLogicalType())) {
+                typeEntry.setTypeQualifiers(toTTypeQualifiers(columnType.getLogicalType()));
+            }
+            typeDesc.addToTypes(TTypeEntry.primitiveEntry(typeEntry));
+
+            desc.setTypeDesc(typeDesc);
+            tSchema.addToColumns(desc);
+        }
+        return tSchema;
+    }
+
     public static TStatus toTStatus(Throwable t) {
         TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS);
         tStatus.setErrorMessage(t.getMessage());
@@ -57,7 +192,7 @@ public class ThriftObjectConversions {
 
     // --------------------------------------------------------------------------------------------
 
-    private static THandleIdentifier toTHandleIdentifier(UUID publicId) {
+    private static THandleIdentifier toTHandleIdentifier(UUID publicId, UUID secretId) {
         byte[] guid = new byte[16];
         byte[] secret = new byte[16];
         ByteBuffer guidBB = ByteBuffer.wrap(guid);
@@ -65,11 +200,54 @@ public class ThriftObjectConversions {
 
         guidBB.putLong(publicId.getMostSignificantBits());
         guidBB.putLong(publicId.getLeastSignificantBits());
-        secretBB.putLong(SECRET_ID.getMostSignificantBits());
-        secretBB.putLong(SECRET_ID.getLeastSignificantBits());
+        secretBB.putLong(secretId.getMostSignificantBits());
+        secretBB.putLong(secretId.getLeastSignificantBits());
         return new THandleIdentifier(ByteBuffer.wrap(guid), ByteBuffer.wrap(secret));
     }
 
+    /** Only the type that has length, precision or scale has {@link TTypeQualifiers}. */
+    private static boolean hasTypeQualifiers(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case DECIMAL:
+            case CHAR:
+            case VARCHAR:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * Create {@link TTypeQualifiers} from {@link LogicalType}. The logic is almost same in the
+     * {@code org.apache.hive.service.cli#toTTypeQualifiers}.
+     */
+    private static TTypeQualifiers toTTypeQualifiers(LogicalType type) {
+        Map<String, TTypeQualifierValue> qualifiers = new HashMap<>();
+
+        switch (type.getTypeRoot()) {
+            case DECIMAL:
+                qualifiers.put(
+                        TCLIServiceConstants.PRECISION,
+                        TTypeQualifierValue.i32Value(((DecimalType) type).getPrecision()));
+                qualifiers.put(
+                        TCLIServiceConstants.SCALE,
+                        TTypeQualifierValue.i32Value(((DecimalType) type).getScale()));
+                break;
+            case VARCHAR:
+                qualifiers.put(
+                        TCLIServiceConstants.CHARACTER_MAXIMUM_LENGTH,
+                        TTypeQualifierValue.i32Value(((VarCharType) type).getLength()));
+                break;
+            case CHAR:
+                qualifiers.put(
+                        TCLIServiceConstants.CHARACTER_MAXIMUM_LENGTH,
+                        TTypeQualifierValue.i32Value(((CharType) type).getLength()));
+                break;
+        }
+
+        return new TTypeQualifiers(qualifiers);
+    }
+
     /**
      * Converts a {@link Throwable} object into a flattened list of texts including its stack trace
      * and the stack traces of the nested causes.
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
index 511170aa975..aecc8bc2cfa 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
@@ -18,12 +18,55 @@
 
 package org.apache.flink.table.endpoint.hive.util;
 
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
 import org.junit.jupiter.api.Test;
 
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationState;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.CANCELED;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.CLOSED;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.ERROR;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.FINISHED;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.INITIALIZED;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.PENDING;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.RUNNING;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.TIMEOUT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Test for {@link ThriftObjectConversions}. */
@@ -34,4 +77,100 @@ class ThriftObjectConversionsTest {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            TableSchema actual =
+                    new TableSchema(
+                            toTTableSchema(
+                                    DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+            List<Integer> javaSqlTypes =
+                    Arrays.stream(actual.toTypeDescriptors())
+                            .map(desc -> desc.getType().toJavaSQLType())
+                            .collect(Collectors.toList());
+
+            assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private List<DataTypeSpec> getDataTypeSpecs() {
+        return Arrays.asList(
+                DataTypeSpec.newSpec().withType(BOOLEAN()).expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .withType(TINYINT())
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec().withType(SMALLINT()).expectSqlType(Types.SMALLINT),
+                DataTypeSpec.newSpec().withType(INT()).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec().withType(BIGINT()).expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec().withType(FLOAT()).expectSqlType(Types.FLOAT),
+                DataTypeSpec.newSpec().withType(DOUBLE()).expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec().withType(DECIMAL(9, 6)).expectSqlType(Types.DECIMAL),
+                DataTypeSpec.newSpec().withType(STRING()).expectSqlType(Types.VARCHAR),
+                DataTypeSpec.newSpec().withType(BYTES()).expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec().withType(DATE()).expectSqlType(Types.DATE),
+                DataTypeSpec.newSpec().withType(TIMESTAMP(4)).expectSqlType(Types.TIMESTAMP),
+                DataTypeSpec.newSpec()
+                        .withType(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))
+                        .expectSqlType(Types.JAVA_OBJECT),
+                DataTypeSpec.newSpec()
+                        .withType(
+                                DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())))
+                        // Hive uses STRING type
+                        .expectSqlType(Types.VARCHAR));
+    }
+
+    private static class DataTypeSpec {
+        DataType flinkType;
+        Integer sqlType;
+        RowData flinkValue;
+
+        public static DataTypeSpec newSpec() {
+            DataTypeSpec spec = new DataTypeSpec();
+            spec.flinkValue = new GenericRowData(1);
+            return spec;
+        }
+
+        public DataTypeSpec withType(DataType flinkType) {
+            this.flinkType = DataTypes.ROW(flinkType);
+            return this;
+        }
+
+        public DataTypeSpec expectSqlType(int sqlType) {
+            this.sqlType = sqlType;
+            return this;
+        }
+    }
 }
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index 7e92610fb6e..70e326705c6 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
@@ -108,6 +109,16 @@ public interface SqlGatewayService {
     OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle)
             throws SqlGatewayException;
 
+    /**
+     * Get the result schema for the specified Operation.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param operationHandle handle to identify the operation.
+     */
+    ResolvedSchema getOperationResultSchema(
+            SessionHandle sessionHandle, OperationHandle operationHandle)
+            throws SqlGatewayException;
+
     // -------------------------------------------------------------------------------------------
     // Statements
     // -------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
index 7ebb28f5169..3abe6bdb7ce 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
@@ -22,24 +22,28 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
 import org.apache.flink.table.gateway.api.operation.OperationType;
 
+import javax.annotation.Nullable;
+
 import java.util.Objects;
+import java.util.Optional;
 
 /** Information of the {@code Operation}. */
 @PublicEvolving
 public class OperationInfo {
 
     private final OperationStatus status;
-    private final boolean hasResults;
     private final OperationType type;
+    @Nullable private final Exception exception;
 
-    public OperationInfo(OperationStatus status, OperationType type, boolean hasResults) {
-        this.status = status;
-        this.type = type;
-        this.hasResults = hasResults;
+    public OperationInfo(OperationStatus status, OperationType type) {
+        this(status, type, null);
     }
 
-    public boolean isHasResults() {
-        return hasResults;
+    public OperationInfo(
+            OperationStatus status, OperationType type, @Nullable Exception exception) {
+        this.status = status;
+        this.type = type;
+        this.exception = exception;
     }
 
     public OperationType getType() {
@@ -50,6 +54,10 @@ public class OperationInfo {
         return status;
     }
 
+    public Optional<Exception> getException() {
+        return Optional.ofNullable(exception);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -59,11 +67,11 @@ public class OperationInfo {
             return false;
         }
         OperationInfo that = (OperationInfo) o;
-        return hasResults == that.hasResults && status == that.status && type == that.type;
+        return status == that.status && type == that.type && exception == that.exception;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(status, hasResults, type);
+        return Objects.hash(status, type, exception);
     }
 }
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index f962e9c727e..49ba033a314 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.gateway.api.utils;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationType;
@@ -89,4 +90,11 @@ public class MockedSqlGatewayService implements SqlGatewayService {
             throws SqlGatewayException {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public ResolvedSchema getOperationResultSchema(
+            SessionHandle sessionHandle, OperationHandle operationHandle)
+            throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index fb68828aedf..7ec185dd8b0 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationType;
@@ -124,6 +125,20 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
         }
     }
 
+    @Override
+    public ResolvedSchema getOperationResultSchema(
+            SessionHandle sessionHandle, OperationHandle operationHandle)
+            throws SqlGatewayException {
+        try {
+            return getSession(sessionHandle)
+                    .getOperationManager()
+                    .getOperationResultSchema(operationHandle);
+        } catch (Throwable t) {
+            LOG.error("Failed to getOperationResultSchema.", t);
+            throw new SqlGatewayException("Failed to getOperationResultSchema.", t);
+        }
+    }
+
     @Override
     public OperationHandle executeStatement(
             SessionHandle sessionHandle,
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 80ec381eb96..602a3fc3552 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.operation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
 import org.apache.flink.table.gateway.api.operation.OperationType;
@@ -150,6 +151,15 @@ public class OperationManager {
         return getOperation(operationHandle).getOperationInfo();
     }
 
+    /**
+     * Get the {@link ResolvedSchema} of the operation.
+     *
+     * @param operationHandle identifies the {@link Operation}.
+     */
+    public ResolvedSchema getOperationResultSchema(OperationHandle operationHandle) {
+        return getOperation(operationHandle).getResultSchema();
+    }
+
     /**
      * Get the results of the operation.
      *
@@ -309,7 +319,17 @@ public class OperationManager {
         }
 
         public OperationInfo getOperationInfo() {
-            return new OperationInfo(status.get(), operationType, hasResults);
+            return new OperationInfo(status.get(), operationType);
+        }
+
+        public ResolvedSchema getResultSchema() {
+            OperationStatus current = status.get();
+            if (current != OperationStatus.FINISHED || !hasResults) {
+                throw new IllegalStateException(
+                        "The result schema is available when the Operation is in FINISHED state and the Operation has data.");
+            }
+
+            return resultFetcher.getResultSchema();
         }
 
         private void updateState(OperationStatus toStatus) {
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
index 6812947ead9..98da1c9ac2a 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
@@ -89,6 +89,10 @@ public class ResultFetcher {
         resultStore.close();
     }
 
+    public ResolvedSchema getResultSchema() {
+        return resultSchema;
+    }
+
     /**
      * Fetch results from the result store. It tries to return the data cached in the buffer first.
      * If the buffer is empty, then fetch results from the {@link ResultStore}. It's possible
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 0d0069f388e..33b6d5e86d1 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -193,12 +193,12 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
 
         startRunningLatch.await();
         assertEquals(
-                new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN, true),
+                new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN),
                 service.getOperationInfo(sessionHandle, operationHandle));
 
         endRunningLatch.countDown();
         OperationInfo expectedInfo =
-                new OperationInfo(OperationStatus.FINISHED, OperationType.UNKNOWN, true);
+                new OperationInfo(OperationStatus.FINISHED, OperationType.UNKNOWN);
 
         CommonTestUtils.waitUtil(
                 () -> service.getOperationInfo(sessionHandle, operationHandle).equals(expectedInfo),
@@ -237,13 +237,13 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
 
         startRunningLatch.await();
         assertEquals(
-                new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN, true),
+                new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN),
                 service.getOperationInfo(sessionHandle, operationHandle));
 
         service.cancelOperation(sessionHandle, operationHandle);
 
         assertEquals(
-                new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN, true),
+                new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN),
                 service.getOperationInfo(sessionHandle, operationHandle));
         service.closeOperation(sessionHandle, operationHandle);
         assertEquals(0, sessionManager.getOperationCount(sessionHandle));
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java
index 40a3ec44f34..6348ca0d575 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java
@@ -231,10 +231,6 @@ public class SqlGatewayServiceStatementITCase {
                 Duration.ofSeconds(100),
                 "Failed to wait operation finish.");
 
-        if (!service.getOperationInfo(sessionHandle, operationHandle).isHasResults()) {
-            return Tag.INFO.addTag("");
-        }
-
         // The content in the result of the `explain` and `show create` statement is large, so it's
         // more straightforward to just print the content without the table.
         if (statement.toUpperCase().startsWith("EXPLAIN")