You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/08 02:26:46 UTC
[flink] 02/02: [FLINK-28851][hive] Allow to GetTypeInfo in the HiveServer2 Endpoint
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 29ca0ad8e33ff6391450c5d9b29f484373ce824a
Author: zhaoweinan <32...@qq.com>
AuthorDate: Fri Aug 5 16:58:01 2022 +0800
[FLINK-28851][hive] Allow to GetTypeInfo in the HiveServer2 Endpoint
This closes #20468
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 23 +++++-
.../table/endpoint/hive/HiveServer2Schemas.java | 44 +++++++++++
.../hive/util/OperationExecutorFactory.java | 86 ++++++++++++++++++++++
.../endpoint/hive/HiveServer2EndpointITCase.java | 79 ++++++++++++++++++--
4 files changed, 223 insertions(+), 9 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index ca99cad3191..bcae8c73a42 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -125,6 +125,7 @@ import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.g
import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetSchemasExecutor;
+import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTableInfoExecutor;
import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTablesExecutor;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFetchOrientation;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFlinkTableKinds;
@@ -136,6 +137,7 @@ import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema;
+import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -391,7 +393,20 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq tGetTypeInfoReq) throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TGetTypeInfoResp resp = new TGetTypeInfoResp();
+ try {
+ SessionHandle sessionHandle = toSessionHandle(tGetTypeInfoReq.getSessionHandle());
+ OperationHandle operationHandle =
+ service.submitOperation(sessionHandle, createGetTableInfoExecutor());
+ resp.setStatus(OK_STATUS);
+ resp.setOperationHandle(
+ toTOperationHandle(
+ sessionHandle, operationHandle, TOperationType.GET_TYPE_INFO));
+ } catch (Throwable t) {
+ LOG.error("Failed to GetTypeInfo.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
@@ -447,7 +462,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
OperationHandle operationHandle =
service.submitOperation(
sessionHandle,
- OperationType.LIST_TABLES,
createGetTablesExecutor(
service,
sessionHandle,
@@ -458,7 +472,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
resp.setStatus(OK_STATUS);
resp.setOperationHandle(
- toTOperationHandle(sessionHandle, operationHandle, OperationType.LIST_TABLES));
+ toTOperationHandle(sessionHandle, operationHandle, TOperationType.GET_TABLES));
} catch (Throwable t) {
LOG.error("Failed to GetTables.", t);
resp.setStatus(toTStatus(t));
@@ -601,7 +615,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
toFetchOrientation(tFetchResultsReq.getFetchType()),
maxRows);
resp.setStatus(OK_STATUS);
- resp.setHasMoreRows(resultSet.getResultType() != ResultSet.ResultType.EOS);
+ resp.setHasMoreRows(resultSet.getResultType() != EOS);
EndpointVersion sessionEndpointVersion =
service.getSessionEndpointVersion(sessionHandle);
@@ -644,6 +658,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
}
// CHECKSTYLE.OFF: MethodName
+
/** To be compatible with Hive3, add a default implementation. */
public TGetQueryIdResp GetQueryId(TGetQueryIdReq tGetQueryIdReq) throws TException {
throw new UnsupportedOperationException(ERROR_MESSAGE);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
index d553234cc94..ad7743d1be1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
@@ -78,4 +78,48 @@ public class HiveServer2Schemas {
"Specifies how values in SELF_REFERENCING_COL_NAME are created."))),
Collections.emptyList(),
null);
+
+ /** Schema for {@link HiveServer2Endpoint#GetTypeInfo}. */
+ public static final ResolvedSchema GET_TYPE_INFO_SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("TYPE_NAME", DataTypes.STRING())
+ .withComment("Type name."),
+ Column.physical("DATA_TYPE", DataTypes.INT())
+ .withComment("SQL data type from java.sql.Types."),
+ Column.physical("PRECISION", DataTypes.INT())
+ .withComment("Maximum precision."),
+ Column.physical("LITERAL_PREFIX", DataTypes.STRING())
+ .withComment("Prefix used to quote a literal (may be null)."),
+ Column.physical("LITERAL_SUFFIX", DataTypes.STRING())
+ .withComment("Suffix used to quote a literal (may be null)."),
+ Column.physical("CREATE_PARAMS", DataTypes.STRING())
+ .withComment(
+ "Parameters used in creating the type (may be null)."),
+ Column.physical("NULLABLE", DataTypes.SMALLINT())
+ .withComment("Can you use NULL for this type."),
+ Column.physical("CASE_SENSITIVE", DataTypes.BOOLEAN())
+ .withComment("Is it case sensitive."),
+ Column.physical("SEARCHABLE", DataTypes.SMALLINT())
+ .withComment("Can you use \"WHERE\" based on this type."),
+ Column.physical("UNSIGNED_ATTRIBUTE", DataTypes.BOOLEAN())
+ .withComment("Is it unsigned."),
+ Column.physical("FIXED_PREC_SCALE", DataTypes.BOOLEAN())
+ .withComment("Can it be a money value."),
+ Column.physical("AUTO_INCREMENT", DataTypes.BOOLEAN())
+ .withComment("Can it be used for an auto-increment value."),
+ Column.physical("LOCAL_TYPE_NAME", DataTypes.STRING())
+ .withComment("Localized version of type name (may be null)."),
+ Column.physical("MINIMUM_SCALE", DataTypes.SMALLINT())
+ .withComment("Minimum scale supported."),
+ Column.physical("MAXIMUM_SCALE", DataTypes.SMALLINT())
+ .withComment("Maximum scale supported."),
+ Column.physical("SQL_DATA_TYPE", DataTypes.INT())
+ .withComment("Unused."),
+ Column.physical("SQL_DATETIME_SUB", DataTypes.INT())
+ .withComment("Unused."),
+ Column.physical("NUM_PREC_RADIX", DataTypes.INT())
+ .withComment("Usually 2 or 10.")),
+ Collections.emptyList(),
+ null);
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
index c69a546a502..44095b1c24a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
@@ -26,9 +26,14 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.hadoop.hive.serde2.thrift.Type;
+
import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Function;
@@ -38,7 +43,28 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TABLES_SCHEMA;
+import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TYPE_INFO_SCHEMA;
import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
+import static org.apache.hadoop.hive.serde2.thrift.Type.ARRAY_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.BIGINT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.BINARY_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.BOOLEAN_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.CHAR_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.DATE_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.DECIMAL_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.DOUBLE_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.FLOAT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.INTERVAL_DAY_TIME_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.INTERVAL_YEAR_MONTH_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.INT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.MAP_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.NULL_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.SMALLINT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.STRING_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.STRUCT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.TIMESTAMP_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.TINYINT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.VARCHAR_TYPE;
/** Factory to create the operation executor. */
public class OperationExecutorFactory {
@@ -68,6 +94,39 @@ public class OperationExecutorFactory {
service, sessionHandle, catalogName, schemaName, tableName, tableKinds);
}
+ public static Callable<ResultSet> createGetTableInfoExecutor() {
+ return () ->
+ new ResultSet(
+ EOS,
+ null,
+ GET_TYPE_INFO_SCHEMA,
+ getSupportedHiveType().stream()
+ .map(
+ type ->
+ wrap(
+ type.getName(), // TYPE_NAME
+ type.toJavaSQLType(), // DATA_TYPE
+ type.getMaxPrecision(), // PRECISION
+ type.getLiteralPrefix(), // LITERAL_PREFIX
+ type.getLiteralSuffix(), // LITERAL_SUFFIX
+ type.getCreateParams(), // CREATE_PARAMS
+ type.getNullable(), // NULLABLE
+ type.isCaseSensitive(), // CASE_SENSITIVE
+ type.getSearchable(), // SEARCHABLE
+ type
+ .isUnsignedAttribute(), // UNSIGNED_ATTRIBUTE
+ type.isFixedPrecScale(), // FIXED_PREC_SCALE
+ type.isAutoIncrement(), // AUTO_INCREMENT
+ type.getLocalizedName(), // LOCAL_TYPE_NAME
+ type.getMinimumScale(), // MINIMUM_SCALE
+ type.getMaximumScale(), // MAXIMUM_SCALE
+ null, // SQL_DATA_TYPE, unused
+ null, // SQL_DATETIME_SUB, unused
+ type.getNumPrecRadix() // NUM_PREC_RADIX
+ ))
+ .collect(Collectors.toList()));
+ }
+
// --------------------------------------------------------------------------------------------
// Executors
// --------------------------------------------------------------------------------------------
@@ -198,6 +257,8 @@ public class OperationExecutorFactory {
pack[i] = element;
} else if (element instanceof Short) {
pack[i] = element;
+ } else if (element instanceof Boolean) {
+ pack[i] = element;
} else {
throw new UnsupportedOperationException(
String.format(
@@ -208,4 +269,29 @@ public class OperationExecutorFactory {
}
return GenericRowData.of(pack);
}
+
+ private static List<Type> getSupportedHiveType() {
+ return Collections.unmodifiableList(
+ Arrays.asList(
+ NULL_TYPE,
+ BOOLEAN_TYPE,
+ STRING_TYPE,
+ BINARY_TYPE,
+ TINYINT_TYPE,
+ SMALLINT_TYPE,
+ INT_TYPE,
+ BIGINT_TYPE,
+ FLOAT_TYPE,
+ DOUBLE_TYPE,
+ DECIMAL_TYPE,
+ DATE_TYPE,
+ TIMESTAMP_TYPE,
+ ARRAY_TYPE,
+ MAP_TYPE,
+ STRUCT_TYPE,
+ CHAR_TYPE,
+ VARCHAR_TYPE,
+ INTERVAL_YEAR_MONTH_TYPE,
+ INTERVAL_DAY_TIME_TYPE));
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index c287a93e15e..120d3786b26 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -71,9 +71,13 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
@@ -404,6 +408,40 @@ public class HiveServer2EndpointITCase extends TestLogger {
null)));
}
+ @Test
+ public void testGetTypeInfo() throws Exception {
+ runGetObjectTest(
+ connection -> connection.getMetaData().getTypeInfo(),
+ getExpectedGetTypeInfoSchema(),
+ types ->
+ assertThat(
+ types.stream()
+ .map(type -> type.get(0))
+ .collect(Collectors.toList()))
+ .isEqualTo(
+ Arrays.asList(
+ "VOID",
+ "BOOLEAN",
+ "STRING",
+ "BINARY",
+ "TINYINT",
+ "SMALLINT",
+ "INT",
+ "BIGINT",
+ "FLOAT",
+ "DOUBLE",
+ "DECIMAL",
+ "DATE",
+ "TIMESTAMP",
+ "ARRAY",
+ "MAP",
+ "STRUCT",
+ "CHAR",
+ "VARCHAR",
+ "INTERVAL_YEAR_MONTH",
+ "INTERVAL_DAY_TIME")));
+ }
+
// --------------------------------------------------------------------------------------------
private Connection getInitializedConnection() throws Exception {
@@ -444,11 +482,21 @@ public class HiveServer2EndpointITCase extends TestLogger {
ResolvedSchema expectedSchema,
List<List<Object>> expectedResults)
throws Exception {
+ runGetObjectTest(
+ resultSetSupplier,
+ expectedSchema,
+ result -> assertThat(result).isEqualTo(new HashSet<>(expectedResults)));
+ }
+
+ private void runGetObjectTest(
+ FunctionWithException<Connection, java.sql.ResultSet, Exception> resultSetSupplier,
+ ResolvedSchema expectedSchema,
+ Consumer<Set<List<Object>>> validator)
+ throws Exception {
try (Connection connection = getInitializedConnection();
java.sql.ResultSet result = resultSetSupplier.apply(connection)) {
assertSchemaEquals(expectedSchema, result.getMetaData());
- assertThat(new HashSet<>(collect(result, expectedSchema.getColumnCount())))
- .isEqualTo(new HashSet<>(expectedResults));
+ validator.accept(collect(result, expectedSchema.getColumnCount()));
}
}
@@ -511,6 +559,28 @@ public class HiveServer2EndpointITCase extends TestLogger {
Column.physical("REF_GENERATION", DataTypes.STRING()));
}
+ private ResolvedSchema getExpectedGetTypeInfoSchema() {
+ return ResolvedSchema.of(
+ Column.physical("TYPE_NAME", DataTypes.STRING()),
+ Column.physical("DATA_TYPE", DataTypes.INT()),
+ Column.physical("PRECISION", DataTypes.INT()),
+ Column.physical("LITERAL_PREFIX", DataTypes.STRING()),
+ Column.physical("LITERAL_SUFFIX", DataTypes.STRING()),
+ Column.physical("CREATE_PARAMS", DataTypes.STRING()),
+ Column.physical("NULLABLE", DataTypes.SMALLINT()),
+ Column.physical("CASE_SENSITIVE", DataTypes.BOOLEAN()),
+ Column.physical("SEARCHABLE", DataTypes.SMALLINT()),
+ Column.physical("UNSIGNED_ATTRIBUTE", DataTypes.BOOLEAN()),
+ Column.physical("FIXED_PREC_SCALE", DataTypes.BOOLEAN()),
+ Column.physical("AUTO_INCREMENT", DataTypes.BOOLEAN()),
+ Column.physical("LOCAL_TYPE_NAME", DataTypes.STRING()),
+ Column.physical("MINIMUM_SCALE", DataTypes.SMALLINT()),
+ Column.physical("MAXIMUM_SCALE", DataTypes.SMALLINT()),
+ Column.physical("SQL_DATA_TYPE", DataTypes.INT()),
+ Column.physical("SQL_DATETIME_SUB", DataTypes.INT()),
+ Column.physical("NUM_PREC_RADIX", DataTypes.INT()));
+ }
+
private void assertSchemaEquals(ResolvedSchema expected, ResultSetMetaData metaData)
throws Exception {
assertThat(metaData.getColumnCount()).isEqualTo(expected.getColumnCount());
@@ -526,8 +596,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
}
}
- private List<List<Object>> collect(java.sql.ResultSet result, int columnCount)
- throws Exception {
+ private Set<List<Object>> collect(java.sql.ResultSet result, int columnCount) throws Exception {
List<List<Object>> actual = new ArrayList<>();
while (result.next()) {
List<Object> row = new ArrayList<>();
@@ -536,6 +605,6 @@ public class HiveServer2EndpointITCase extends TestLogger {
}
actual.add(row);
}
- return actual;
+ return new LinkedHashSet<>(actual);
}
}