You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/29 01:59:12 UTC
[flink] 03/04: [FLINK-28152][sql-gateway][hive] Support GetOperationStatus and GetResultSetMetadata for HiveServer2Endpoint
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f477a43ff23576cd2e1f8c632f78458948245df4
Author: Shengkai <10...@qq.com>
AuthorDate: Sun Jul 24 15:23:46 2022 +0800
[FLINK-28152][sql-gateway][hive] Support GetOperationStatus and GetResultSetMetadata for HiveServer2Endpoint
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 51 +++++-
.../hive/util/ThriftObjectConversions.java | 186 ++++++++++++++++++++-
.../hive/util/ThriftObjectConversionsTest.java | 139 +++++++++++++++
.../flink/table/gateway/api/SqlGatewayService.java | 11 ++
.../table/gateway/api/results/OperationInfo.java | 26 ++-
.../gateway/api/utils/MockedSqlGatewayService.java | 8 +
.../gateway/service/SqlGatewayServiceImpl.java | 15 ++
.../service/operation/OperationManager.java | 22 ++-
.../gateway/service/result/ResultFetcher.java | 4 +
.../gateway/service/SqlGatewayServiceITCase.java | 8 +-
.../service/SqlGatewayServiceStatementITCase.java | 4 -
11 files changed, 449 insertions(+), 25 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 262177a547e..2595860de6b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -22,16 +22,21 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.api.utils.ThreadUtils;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.hive.HiveModule;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.rpc.thrift.TCLIService;
@@ -75,6 +80,7 @@ import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
@@ -107,9 +113,12 @@ import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIA
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationState;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -218,9 +227,10 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
this.catalogName = checkNotNull(catalogName);
this.hiveConfPath = hiveConfPath;
this.defaultDatabase = defaultDatabase;
- this.allowEmbedded = allowEmbedded;
this.moduleName = moduleName;
+
+ this.allowEmbedded = allowEmbedded;
}
@Override
@@ -375,7 +385,27 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq tGetOperationStatusReq)
throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TGetOperationStatusResp resp = new TGetOperationStatusResp();
+ try {
+ TOperationHandle operationHandle = tGetOperationStatusReq.getOperationHandle();
+ OperationInfo operationInfo =
+ service.getOperationInfo(
+ toSessionHandle(operationHandle), toOperationHandle(operationHandle));
+ resp.setStatus(OK_STATUS);
+ // TODO: support completed time / start time
+ resp.setOperationState(toTOperationState(operationInfo.getStatus()));
+ // Currently, all operations have results.
+ resp.setHasResultSet(true);
+ if (operationInfo.getStatus().equals(OperationStatus.ERROR)
+ && operationInfo.getException().isPresent()) {
+ resp.setErrorMessage(
+ ExceptionUtils.stringifyException(operationInfo.getException().get()));
+ }
+ } catch (Throwable t) {
+ LOG.error("Failed to GetOperationStatus.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
@@ -393,7 +423,22 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public TGetResultSetMetadataResp GetResultSetMetadata(
TGetResultSetMetadataReq tGetResultSetMetadataReq) throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
+ try {
+ SessionHandle sessionHandle =
+ toSessionHandle(tGetResultSetMetadataReq.getOperationHandle());
+ OperationHandle operationHandle =
+ toOperationHandle(tGetResultSetMetadataReq.getOperationHandle());
+ ResolvedSchema schema =
+ service.getOperationResultSchema(sessionHandle, operationHandle);
+
+ resp.setStatus(OK_STATUS);
+ resp.setSchema(toTTableSchema(schema));
+ } catch (Throwable t) {
+ LOG.warn("Failed to GetResultSetMetadata.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 49ece510191..948771c1401 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -18,16 +18,42 @@
package org.apache.flink.table.endpoint.hive.util;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.service.rpc.thrift.TCLIServiceConstants;
+import org.apache.hive.service.rpc.thrift.TColumnDesc;
import org.apache.hive.service.rpc.thrift.THandleIdentifier;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
+import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry;
import org.apache.hive.service.rpc.thrift.TSessionHandle;
import org.apache.hive.service.rpc.thrift.TStatus;
import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
+import org.apache.hive.service.rpc.thrift.TTypeEntry;
+import org.apache.hive.service.rpc.thrift.TTypeQualifierValue;
+import org.apache.hive.service.rpc.thrift.TTypeQualifiers;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
/** Conversion between thrift object and flink object. */
@@ -40,7 +66,7 @@ public class ThriftObjectConversions {
// --------------------------------------------------------------------------------------------
public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) {
- return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
+ return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier(), SECRET_ID));
}
public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) {
@@ -48,6 +74,115 @@ public class ThriftObjectConversions {
return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
}
+ // --------------------------------------------------------------------------------------------
+ // Flink SessionHandle && OperationHandle from/to Hive OperationHandle
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Convert {@link SessionHandle} and {@link OperationHandle} to {@link TOperationHandle}.
+ *
+ * <p>Hive uses {@link TOperationHandle} to retrieve the {@code Operation} related information.
+ * However, SqlGateway uses {@link SessionHandle} and {@link OperationHandle} to determine.
+ * Therefore, the {@link TOperationHandle} needs to contain both {@link SessionHandle} and
+ * {@link OperationHandle}.
+ *
+ * <p>Currently all operations in the {@link SqlGatewayService} has data. Therefore, set the
+ * {@code TOperationHandle#hasResultSet} true.
+ */
+ public static TOperationHandle toTOperationHandle(
+ SessionHandle sessionHandle,
+ OperationHandle operationHandle,
+ OperationType operationType) {
+ return new TOperationHandle(
+ toTHandleIdentifier(operationHandle.getIdentifier(), sessionHandle.getIdentifier()),
+ toTOperationType(operationType),
+ true);
+ }
+
+ public static SessionHandle toSessionHandle(TOperationHandle tOperationHandle) {
+ ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getSecret());
+ return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+ }
+
+ public static OperationHandle toOperationHandle(TOperationHandle tOperationHandle) {
+ ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getGuid());
+ return new OperationHandle(new UUID(bb.getLong(), bb.getLong()));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Operation related conversions
+ // --------------------------------------------------------------------------------------------
+
+ public static TOperationType toTOperationType(OperationType type) {
+ switch (type) {
+ case EXECUTE_STATEMENT:
+ return TOperationType.EXECUTE_STATEMENT;
+ case UNKNOWN:
+ return TOperationType.UNKNOWN;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown operation type: %s.", type));
+ }
+ }
+
+ public static TOperationState toTOperationState(OperationStatus operationStatus) {
+ switch (operationStatus) {
+ case INITIALIZED:
+ return TOperationState.INITIALIZED_STATE;
+ case PENDING:
+ return TOperationState.PENDING_STATE;
+ case RUNNING:
+ return TOperationState.RUNNING_STATE;
+ case FINISHED:
+ return TOperationState.FINISHED_STATE;
+ case ERROR:
+ return TOperationState.ERROR_STATE;
+ case TIMEOUT:
+ return TOperationState.TIMEDOUT_STATE;
+ case CANCELED:
+ return TOperationState.CANCELED_STATE;
+ case CLOSED:
+ return TOperationState.CLOSED_STATE;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown operation status: %s.", operationStatus));
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Statement related conversions
+ // --------------------------------------------------------------------------------------------
+
+ /** Similar logic in the {@code org.apache.hive.service.cli.ColumnDescriptor}. */
+ public static TTableSchema toTTableSchema(ResolvedSchema schema) {
+ TTableSchema tSchema = new TTableSchema();
+
+ for (int i = 0; i < schema.getColumnCount(); i++) {
+ Column column = schema.getColumns().get(i);
+ TColumnDesc desc = new TColumnDesc();
+ desc.setColumnName(column.getName());
+ column.getComment().ifPresent(desc::setComment);
+ desc.setPosition(i);
+
+ TTypeDesc typeDesc = new TTypeDesc();
+
+ // Hive uses the TPrimitiveTypeEntry only. Please refer to TypeDescriptor#toTTypeDesc.
+ DataType columnType = column.getDataType();
+ TPrimitiveTypeEntry typeEntry =
+ new TPrimitiveTypeEntry(
+ Type.getType(HiveTypeUtil.toHiveTypeInfo(columnType, false)).toTType());
+
+ if (hasTypeQualifiers(columnType.getLogicalType())) {
+ typeEntry.setTypeQualifiers(toTTypeQualifiers(columnType.getLogicalType()));
+ }
+ typeDesc.addToTypes(TTypeEntry.primitiveEntry(typeEntry));
+
+ desc.setTypeDesc(typeDesc);
+ tSchema.addToColumns(desc);
+ }
+ return tSchema;
+ }
+
public static TStatus toTStatus(Throwable t) {
TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS);
tStatus.setErrorMessage(t.getMessage());
@@ -57,7 +192,7 @@ public class ThriftObjectConversions {
// --------------------------------------------------------------------------------------------
- private static THandleIdentifier toTHandleIdentifier(UUID publicId) {
+ private static THandleIdentifier toTHandleIdentifier(UUID publicId, UUID secretId) {
byte[] guid = new byte[16];
byte[] secret = new byte[16];
ByteBuffer guidBB = ByteBuffer.wrap(guid);
@@ -65,11 +200,54 @@ public class ThriftObjectConversions {
guidBB.putLong(publicId.getMostSignificantBits());
guidBB.putLong(publicId.getLeastSignificantBits());
- secretBB.putLong(SECRET_ID.getMostSignificantBits());
- secretBB.putLong(SECRET_ID.getLeastSignificantBits());
+ secretBB.putLong(secretId.getMostSignificantBits());
+ secretBB.putLong(secretId.getLeastSignificantBits());
return new THandleIdentifier(ByteBuffer.wrap(guid), ByteBuffer.wrap(secret));
}
+ /** Only the type that has length, precision or scale has {@link TTypeQualifiers}. */
+ private static boolean hasTypeQualifiers(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case DECIMAL:
+ case CHAR:
+ case VARCHAR:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Create {@link TTypeQualifiers} from {@link LogicalType}. The logic is almost same in the
+ * {@code org.apache.hive.service.cli#toTTypeQualifiers}.
+ */
+ private static TTypeQualifiers toTTypeQualifiers(LogicalType type) {
+ Map<String, TTypeQualifierValue> qualifiers = new HashMap<>();
+
+ switch (type.getTypeRoot()) {
+ case DECIMAL:
+ qualifiers.put(
+ TCLIServiceConstants.PRECISION,
+ TTypeQualifierValue.i32Value(((DecimalType) type).getPrecision()));
+ qualifiers.put(
+ TCLIServiceConstants.SCALE,
+ TTypeQualifierValue.i32Value(((DecimalType) type).getScale()));
+ break;
+ case VARCHAR:
+ qualifiers.put(
+ TCLIServiceConstants.CHARACTER_MAXIMUM_LENGTH,
+ TTypeQualifierValue.i32Value(((VarCharType) type).getLength()));
+ break;
+ case CHAR:
+ qualifiers.put(
+ TCLIServiceConstants.CHARACTER_MAXIMUM_LENGTH,
+ TTypeQualifierValue.i32Value(((CharType) type).getLength()));
+ break;
+ }
+
+ return new TTypeQualifiers(qualifiers);
+ }
+
/**
* Converts a {@link Throwable} object into a flattened list of texts including its stack trace
* and the stack traces of the nested causes.
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
index 511170aa975..aecc8bc2cfa 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java
@@ -18,12 +18,55 @@
package org.apache.flink.table.endpoint.hive.util;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
import org.junit.jupiter.api.Test;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationState;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.CANCELED;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.CLOSED;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.ERROR;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.FINISHED;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.INITIALIZED;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.PENDING;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.RUNNING;
+import static org.apache.flink.table.gateway.api.operation.OperationStatus.TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertEquals;
/** Test for {@link ThriftObjectConversions}. */
@@ -34,4 +77,100 @@ class ThriftObjectConversionsTest {
SessionHandle originSessionHandle = SessionHandle.create();
assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
}
+
+ @Test
+ public void testConvertSessionHandleAndOperationHandle() {
+ SessionHandle originSessionHandle = SessionHandle.create();
+ OperationHandle originOperationHandle = OperationHandle.create();
+ TOperationHandle tOperationHandle =
+ toTOperationHandle(
+ originSessionHandle, originOperationHandle, OperationType.UNKNOWN);
+
+ assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+ assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+ }
+
+ @Test
+ public void testConvertOperationStatus() {
+ Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+ expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+ expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+ expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+ expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+ expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+ expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+ expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+ expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+ for (OperationStatus status : expectedMappings.keySet()) {
+ assertEquals(expectedMappings.get(status), toTOperationState(status));
+ }
+ }
+
+ @Test
+ public void testToTTableSchema() {
+ for (DataTypeSpec spec : getDataTypeSpecs()) {
+ TableSchema actual =
+ new TableSchema(
+ toTTableSchema(
+ DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+ List<Integer> javaSqlTypes =
+ Arrays.stream(actual.toTypeDescriptors())
+ .map(desc -> desc.getType().toJavaSQLType())
+ .collect(Collectors.toList());
+
+ assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private List<DataTypeSpec> getDataTypeSpecs() {
+ return Arrays.asList(
+ DataTypeSpec.newSpec().withType(BOOLEAN()).expectSqlType(Types.BOOLEAN),
+ DataTypeSpec.newSpec()
+ .withType(TINYINT())
+ // TINYINT is the alias of the BYTE in Hive.
+ .expectSqlType(Types.BINARY),
+ DataTypeSpec.newSpec().withType(SMALLINT()).expectSqlType(Types.SMALLINT),
+ DataTypeSpec.newSpec().withType(INT()).expectSqlType(Types.INTEGER),
+ DataTypeSpec.newSpec().withType(BIGINT()).expectSqlType(Types.BIGINT),
+ DataTypeSpec.newSpec().withType(FLOAT()).expectSqlType(Types.FLOAT),
+ DataTypeSpec.newSpec().withType(DOUBLE()).expectSqlType(Types.DOUBLE),
+ DataTypeSpec.newSpec().withType(DECIMAL(9, 6)).expectSqlType(Types.DECIMAL),
+ DataTypeSpec.newSpec().withType(STRING()).expectSqlType(Types.VARCHAR),
+ DataTypeSpec.newSpec().withType(BYTES()).expectSqlType(Types.BINARY),
+ DataTypeSpec.newSpec().withType(DATE()).expectSqlType(Types.DATE),
+ DataTypeSpec.newSpec().withType(TIMESTAMP(4)).expectSqlType(Types.TIMESTAMP),
+ DataTypeSpec.newSpec()
+ .withType(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))
+ .expectSqlType(Types.JAVA_OBJECT),
+ DataTypeSpec.newSpec()
+ .withType(
+ DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())))
+ // Hive uses STRING type
+ .expectSqlType(Types.VARCHAR));
+ }
+
+ private static class DataTypeSpec {
+ DataType flinkType;
+ Integer sqlType;
+ RowData flinkValue;
+
+ public static DataTypeSpec newSpec() {
+ DataTypeSpec spec = new DataTypeSpec();
+ spec.flinkValue = new GenericRowData(1);
+ return spec;
+ }
+
+ public DataTypeSpec withType(DataType flinkType) {
+ this.flinkType = DataTypes.ROW(flinkType);
+ return this;
+ }
+
+ public DataTypeSpec expectSqlType(int sqlType) {
+ this.sqlType = sqlType;
+ return this;
+ }
+ }
}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index 7e92610fb6e..70e326705c6 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.OperationInfo;
@@ -108,6 +109,16 @@ public interface SqlGatewayService {
OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle)
throws SqlGatewayException;
+ /**
+ * Get the result schema for the specified Operation.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param operationHandle handle to identify the operation.
+ */
+ ResolvedSchema getOperationResultSchema(
+ SessionHandle sessionHandle, OperationHandle operationHandle)
+ throws SqlGatewayException;
+
// -------------------------------------------------------------------------------------------
// Statements
// -------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
index 7ebb28f5169..3abe6bdb7ce 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
@@ -22,24 +22,28 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.operation.OperationType;
+import javax.annotation.Nullable;
+
import java.util.Objects;
+import java.util.Optional;
/** Information of the {@code Operation}. */
@PublicEvolving
public class OperationInfo {
private final OperationStatus status;
- private final boolean hasResults;
private final OperationType type;
+ @Nullable private final Exception exception;
- public OperationInfo(OperationStatus status, OperationType type, boolean hasResults) {
- this.status = status;
- this.type = type;
- this.hasResults = hasResults;
+ public OperationInfo(OperationStatus status, OperationType type) {
+ this(status, type, null);
}
- public boolean isHasResults() {
- return hasResults;
+ public OperationInfo(
+ OperationStatus status, OperationType type, @Nullable Exception exception) {
+ this.status = status;
+ this.type = type;
+ this.exception = exception;
}
public OperationType getType() {
@@ -50,6 +54,10 @@ public class OperationInfo {
return status;
}
+ public Optional<Exception> getException() {
+ return Optional.ofNullable(exception);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -59,11 +67,11 @@ public class OperationInfo {
return false;
}
OperationInfo that = (OperationInfo) o;
- return hasResults == that.hasResults && status == that.status && type == that.type;
+ return status == that.status && type == that.type && exception == that.exception;
}
@Override
public int hashCode() {
- return Objects.hash(status, hasResults, type);
+ return Objects.hash(status, type, exception);
}
}
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index f962e9c727e..49ba033a314 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.gateway.api.utils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationType;
@@ -89,4 +90,11 @@ public class MockedSqlGatewayService implements SqlGatewayService {
throws SqlGatewayException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public ResolvedSchema getOperationResultSchema(
+ SessionHandle sessionHandle, OperationHandle operationHandle)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index fb68828aedf..7ec185dd8b0 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationType;
@@ -124,6 +125,20 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
}
}
+ @Override
+ public ResolvedSchema getOperationResultSchema(
+ SessionHandle sessionHandle, OperationHandle operationHandle)
+ throws SqlGatewayException {
+ try {
+ return getSession(sessionHandle)
+ .getOperationManager()
+ .getOperationResultSchema(operationHandle);
+ } catch (Throwable t) {
+ LOG.error("Failed to getOperationResultSchema.", t);
+ throw new SqlGatewayException("Failed to getOperationResultSchema.", t);
+ }
+ }
+
@Override
public OperationHandle executeStatement(
SessionHandle sessionHandle,
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 80ec381eb96..602a3fc3552 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.operation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.operation.OperationType;
@@ -150,6 +151,15 @@ public class OperationManager {
return getOperation(operationHandle).getOperationInfo();
}
+ /**
+ * Get the {@link ResolvedSchema} of the operation.
+ *
+ * @param operationHandle identifies the {@link Operation}.
+ */
+ public ResolvedSchema getOperationResultSchema(OperationHandle operationHandle) {
+ return getOperation(operationHandle).getResultSchema();
+ }
+
/**
* Get the results of the operation.
*
@@ -309,7 +319,17 @@ public class OperationManager {
}
public OperationInfo getOperationInfo() {
- return new OperationInfo(status.get(), operationType, hasResults);
+ return new OperationInfo(status.get(), operationType);
+ }
+
+ public ResolvedSchema getResultSchema() {
+ OperationStatus current = status.get();
+ if (current != OperationStatus.FINISHED || !hasResults) {
+ throw new IllegalStateException(
+ "The result schema is available when the Operation is in FINISHED state and the Operation has data.");
+ }
+
+ return resultFetcher.getResultSchema();
}
private void updateState(OperationStatus toStatus) {
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
index 6812947ead9..98da1c9ac2a 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
@@ -89,6 +89,10 @@ public class ResultFetcher {
resultStore.close();
}
+ public ResolvedSchema getResultSchema() {
+ return resultSchema;
+ }
+
/**
* Fetch results from the result store. It tries to return the data cached in the buffer first.
* If the buffer is empty, then fetch results from the {@link ResultStore}. It's possible
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 0d0069f388e..33b6d5e86d1 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -193,12 +193,12 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
startRunningLatch.await();
assertEquals(
- new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN, true),
+ new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN),
service.getOperationInfo(sessionHandle, operationHandle));
endRunningLatch.countDown();
OperationInfo expectedInfo =
- new OperationInfo(OperationStatus.FINISHED, OperationType.UNKNOWN, true);
+ new OperationInfo(OperationStatus.FINISHED, OperationType.UNKNOWN);
CommonTestUtils.waitUtil(
() -> service.getOperationInfo(sessionHandle, operationHandle).equals(expectedInfo),
@@ -237,13 +237,13 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
startRunningLatch.await();
assertEquals(
- new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN, true),
+ new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN),
service.getOperationInfo(sessionHandle, operationHandle));
service.cancelOperation(sessionHandle, operationHandle);
assertEquals(
- new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN, true),
+ new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN),
service.getOperationInfo(sessionHandle, operationHandle));
service.closeOperation(sessionHandle, operationHandle);
assertEquals(0, sessionManager.getOperationCount(sessionHandle));
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java
index 40a3ec44f34..6348ca0d575 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java
@@ -231,10 +231,6 @@ public class SqlGatewayServiceStatementITCase {
Duration.ofSeconds(100),
"Failed to wait operation finish.");
- if (!service.getOperationInfo(sessionHandle, operationHandle).isHasResults()) {
- return Tag.INFO.addTag("");
- }
-
// The content in the result of the `explain` and `show create` statement is large, so it's
// more straightforward to just print the content without the table.
if (statement.toUpperCase().startsWith("EXPLAIN")