You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/08 02:26:46 UTC

[flink] 02/02: [FLINK-28851][hive] Allow to GetTypeInfo in the HiveServer2 Endpoint

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 29ca0ad8e33ff6391450c5d9b29f484373ce824a
Author: zhaoweinan <32...@qq.com>
AuthorDate: Fri Aug 5 16:58:01 2022 +0800

    [FLINK-28851][hive] Allow to GetTypeInfo in the HiveServer2 Endpoint
    
    This closes #20468
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 23 +++++-
 .../table/endpoint/hive/HiveServer2Schemas.java    | 44 +++++++++++
 .../hive/util/OperationExecutorFactory.java        | 86 ++++++++++++++++++++++
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 79 ++++++++++++++++++--
 4 files changed, 223 insertions(+), 9 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index ca99cad3191..bcae8c73a42 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -125,6 +125,7 @@ import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.g
 import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
 import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
 import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetSchemasExecutor;
+import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTableInfoExecutor;
 import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTablesExecutor;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFetchOrientation;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFlinkTableKinds;
@@ -136,6 +137,7 @@ import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema;
+import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -391,7 +393,20 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
     @Override
     public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq tGetTypeInfoReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetTypeInfoResp resp = new TGetTypeInfoResp();
+        try {
+            SessionHandle sessionHandle = toSessionHandle(tGetTypeInfoReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(sessionHandle, createGetTableInfoExecutor());
+            resp.setStatus(OK_STATUS);
+            resp.setOperationHandle(
+                    toTOperationHandle(
+                            sessionHandle, operationHandle, TOperationType.GET_TYPE_INFO));
+        } catch (Throwable t) {
+            LOG.error("Failed to GetTypeInfo.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
@@ -447,7 +462,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             OperationHandle operationHandle =
                     service.submitOperation(
                             sessionHandle,
-                            OperationType.LIST_TABLES,
                             createGetTablesExecutor(
                                     service,
                                     sessionHandle,
@@ -458,7 +472,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
             resp.setStatus(OK_STATUS);
             resp.setOperationHandle(
-                    toTOperationHandle(sessionHandle, operationHandle, OperationType.LIST_TABLES));
+                    toTOperationHandle(sessionHandle, operationHandle, TOperationType.GET_TABLES));
         } catch (Throwable t) {
             LOG.error("Failed to GetTables.", t);
             resp.setStatus(toTStatus(t));
@@ -601,7 +615,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                             toFetchOrientation(tFetchResultsReq.getFetchType()),
                             maxRows);
             resp.setStatus(OK_STATUS);
-            resp.setHasMoreRows(resultSet.getResultType() != ResultSet.ResultType.EOS);
+            resp.setHasMoreRows(resultSet.getResultType() != EOS);
             EndpointVersion sessionEndpointVersion =
                     service.getSessionEndpointVersion(sessionHandle);
 
@@ -644,6 +658,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     }
 
     // CHECKSTYLE.OFF: MethodName
+
     /** To be compatible with Hive3, add a default implementation. */
     public TGetQueryIdResp GetQueryId(TGetQueryIdReq tGetQueryIdReq) throws TException {
         throw new UnsupportedOperationException(ERROR_MESSAGE);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
index d553234cc94..ad7743d1be1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
@@ -78,4 +78,48 @@ public class HiveServer2Schemas {
                                                     "Specifies how values in SELF_REFERENCING_COL_NAME are created."))),
                     Collections.emptyList(),
                     null);
+
+    /** Schema for {@link HiveServer2Endpoint#GetTypeInfo}. */
+    public static final ResolvedSchema GET_TYPE_INFO_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("TYPE_NAME", DataTypes.STRING())
+                                    .withComment("Type name."),
+                            Column.physical("DATA_TYPE", DataTypes.INT())
+                                    .withComment("SQL data type from java.sql.Types."),
+                            Column.physical("PRECISION", DataTypes.INT())
+                                    .withComment("Maximum precision."),
+                            Column.physical("LITERAL_PREFIX", DataTypes.STRING())
+                                    .withComment("Prefix used to quote a literal (may be null)."),
+                            Column.physical("LITERAL_SUFFIX", DataTypes.STRING())
+                                    .withComment("Suffix used to quote a literal (may be null)."),
+                            Column.physical("CREATE_PARAMS", DataTypes.STRING())
+                                    .withComment(
+                                            "Parameters used in creating the type (may be null)."),
+                            Column.physical("NULLABLE", DataTypes.SMALLINT())
+                                    .withComment("Can you use NULL for this type."),
+                            Column.physical("CASE_SENSITIVE", DataTypes.BOOLEAN())
+                                    .withComment("Is it case sensitive."),
+                            Column.physical("SEARCHABLE", DataTypes.SMALLINT())
+                                    .withComment("Can you use \"WHERE\" based on this type."),
+                            Column.physical("UNSIGNED_ATTRIBUTE", DataTypes.BOOLEAN())
+                                    .withComment("Is it unsigned."),
+                            Column.physical("FIXED_PREC_SCALE", DataTypes.BOOLEAN())
+                                    .withComment("Can it be a money value."),
+                            Column.physical("AUTO_INCREMENT", DataTypes.BOOLEAN())
+                                    .withComment("Can it be used for an auto-increment value."),
+                            Column.physical("LOCAL_TYPE_NAME", DataTypes.STRING())
+                                    .withComment("Localized version of type name (may be null)."),
+                            Column.physical("MINIMUM_SCALE", DataTypes.SMALLINT())
+                                    .withComment("Minimum scale supported."),
+                            Column.physical("MAXIMUM_SCALE", DataTypes.SMALLINT())
+                                    .withComment("Maximum scale supported."),
+                            Column.physical("SQL_DATA_TYPE", DataTypes.INT())
+                                    .withComment("Unused."),
+                            Column.physical("SQL_DATETIME_SUB", DataTypes.INT())
+                                    .withComment("Unused."),
+                            Column.physical("NUM_PREC_RADIX", DataTypes.INT())
+                                    .withComment("Usually 2 or 10.")),
+                    Collections.emptyList(),
+                    null);
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
index c69a546a502..44095b1c24a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
@@ -26,9 +26,14 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.TableInfo;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 
+import org.apache.hadoop.hive.serde2.thrift.Type;
+
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.function.Function;
@@ -38,7 +43,28 @@ import java.util.stream.Collectors;
 import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
 import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
 import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TABLES_SCHEMA;
+import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TYPE_INFO_SCHEMA;
 import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
+import static org.apache.hadoop.hive.serde2.thrift.Type.ARRAY_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.BIGINT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.BINARY_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.BOOLEAN_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.CHAR_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.DATE_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.DECIMAL_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.DOUBLE_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.FLOAT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.INTERVAL_DAY_TIME_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.INTERVAL_YEAR_MONTH_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.INT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.MAP_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.NULL_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.SMALLINT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.STRING_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.STRUCT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.TIMESTAMP_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.TINYINT_TYPE;
+import static org.apache.hadoop.hive.serde2.thrift.Type.VARCHAR_TYPE;
 
 /** Factory to create the operation executor. */
 public class OperationExecutorFactory {
@@ -68,6 +94,39 @@ public class OperationExecutorFactory {
                         service, sessionHandle, catalogName, schemaName, tableName, tableKinds);
     }
 
+    public static Callable<ResultSet> createGetTableInfoExecutor() {
+        return () ->
+                new ResultSet(
+                        EOS,
+                        null,
+                        GET_TYPE_INFO_SCHEMA,
+                        getSupportedHiveType().stream()
+                                .map(
+                                        type ->
+                                                wrap(
+                                                        type.getName(), // TYPE_NAME
+                                                        type.toJavaSQLType(), // DATA_TYPE
+                                                        type.getMaxPrecision(), // PRECISION
+                                                        type.getLiteralPrefix(), // LITERAL_PREFIX
+                                                        type.getLiteralSuffix(), // LITERAL_SUFFIX
+                                                        type.getCreateParams(), // CREATE_PARAMS
+                                                        type.getNullable(), // NULLABLE
+                                                        type.isCaseSensitive(), // CASE_SENSITIVE
+                                                        type.getSearchable(), // SEARCHABLE
+                                                        type
+                                                                .isUnsignedAttribute(), // UNSIGNED_ATTRIBUTE
+                                                        type.isFixedPrecScale(), // FIXED_PREC_SCALE
+                                                        type.isAutoIncrement(), // AUTO_INCREMENT
+                                                        type.getLocalizedName(), // LOCAL_TYPE_NAME
+                                                        type.getMinimumScale(), // MINIMUM_SCALE
+                                                        type.getMaximumScale(), // MAXIMUM_SCALE
+                                                        null, // SQL_DATA_TYPE, unused
+                                                        null, // SQL_DATETIME_SUB, unused
+                                                        type.getNumPrecRadix() // NUM_PREC_RADIX
+                                                        ))
+                                .collect(Collectors.toList()));
+    }
+
     // --------------------------------------------------------------------------------------------
     // Executors
     // --------------------------------------------------------------------------------------------
@@ -198,6 +257,8 @@ public class OperationExecutorFactory {
                     pack[i] = element;
                 } else if (element instanceof Short) {
                     pack[i] = element;
+                } else if (element instanceof Boolean) {
+                    pack[i] = element;
                 } else {
                     throw new UnsupportedOperationException(
                             String.format(
@@ -208,4 +269,29 @@ public class OperationExecutorFactory {
         }
         return GenericRowData.of(pack);
     }
+
+    private static List<Type> getSupportedHiveType() {
+        return Collections.unmodifiableList(
+                Arrays.asList(
+                        NULL_TYPE,
+                        BOOLEAN_TYPE,
+                        STRING_TYPE,
+                        BINARY_TYPE,
+                        TINYINT_TYPE,
+                        SMALLINT_TYPE,
+                        INT_TYPE,
+                        BIGINT_TYPE,
+                        FLOAT_TYPE,
+                        DOUBLE_TYPE,
+                        DECIMAL_TYPE,
+                        DATE_TYPE,
+                        TIMESTAMP_TYPE,
+                        ARRAY_TYPE,
+                        MAP_TYPE,
+                        STRUCT_TYPE,
+                        CHAR_TYPE,
+                        VARCHAR_TYPE,
+                        INTERVAL_YEAR_MONTH_TYPE,
+                        INTERVAL_DAY_TIME_TYPE));
+    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index c287a93e15e..120d3786b26 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -71,9 +71,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
@@ -404,6 +408,40 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                 null)));
     }
 
+    @Test
+    public void testGetTypeInfo() throws Exception {
+        runGetObjectTest(
+                connection -> connection.getMetaData().getTypeInfo(),
+                getExpectedGetTypeInfoSchema(),
+                types ->
+                        assertThat(
+                                        types.stream()
+                                                .map(type -> type.get(0))
+                                                .collect(Collectors.toList()))
+                                .isEqualTo(
+                                        Arrays.asList(
+                                                "VOID",
+                                                "BOOLEAN",
+                                                "STRING",
+                                                "BINARY",
+                                                "TINYINT",
+                                                "SMALLINT",
+                                                "INT",
+                                                "BIGINT",
+                                                "FLOAT",
+                                                "DOUBLE",
+                                                "DECIMAL",
+                                                "DATE",
+                                                "TIMESTAMP",
+                                                "ARRAY",
+                                                "MAP",
+                                                "STRUCT",
+                                                "CHAR",
+                                                "VARCHAR",
+                                                "INTERVAL_YEAR_MONTH",
+                                                "INTERVAL_DAY_TIME")));
+    }
+
     // --------------------------------------------------------------------------------------------
 
     private Connection getInitializedConnection() throws Exception {
@@ -444,11 +482,21 @@ public class HiveServer2EndpointITCase extends TestLogger {
             ResolvedSchema expectedSchema,
             List<List<Object>> expectedResults)
             throws Exception {
+        runGetObjectTest(
+                resultSetSupplier,
+                expectedSchema,
+                result -> assertThat(result).isEqualTo(new HashSet<>(expectedResults)));
+    }
+
+    private void runGetObjectTest(
+            FunctionWithException<Connection, java.sql.ResultSet, Exception> resultSetSupplier,
+            ResolvedSchema expectedSchema,
+            Consumer<Set<List<Object>>> validator)
+            throws Exception {
         try (Connection connection = getInitializedConnection();
                 java.sql.ResultSet result = resultSetSupplier.apply(connection)) {
             assertSchemaEquals(expectedSchema, result.getMetaData());
-            assertThat(new HashSet<>(collect(result, expectedSchema.getColumnCount())))
-                    .isEqualTo(new HashSet<>(expectedResults));
+            validator.accept(collect(result, expectedSchema.getColumnCount()));
         }
     }
 
@@ -511,6 +559,28 @@ public class HiveServer2EndpointITCase extends TestLogger {
                 Column.physical("REF_GENERATION", DataTypes.STRING()));
     }
 
+    private ResolvedSchema getExpectedGetTypeInfoSchema() {
+        return ResolvedSchema.of(
+                Column.physical("TYPE_NAME", DataTypes.STRING()),
+                Column.physical("DATA_TYPE", DataTypes.INT()),
+                Column.physical("PRECISION", DataTypes.INT()),
+                Column.physical("LITERAL_PREFIX", DataTypes.STRING()),
+                Column.physical("LITERAL_SUFFIX", DataTypes.STRING()),
+                Column.physical("CREATE_PARAMS", DataTypes.STRING()),
+                Column.physical("NULLABLE", DataTypes.SMALLINT()),
+                Column.physical("CASE_SENSITIVE", DataTypes.BOOLEAN()),
+                Column.physical("SEARCHABLE", DataTypes.SMALLINT()),
+                Column.physical("UNSIGNED_ATTRIBUTE", DataTypes.BOOLEAN()),
+                Column.physical("FIXED_PREC_SCALE", DataTypes.BOOLEAN()),
+                Column.physical("AUTO_INCREMENT", DataTypes.BOOLEAN()),
+                Column.physical("LOCAL_TYPE_NAME", DataTypes.STRING()),
+                Column.physical("MINIMUM_SCALE", DataTypes.SMALLINT()),
+                Column.physical("MAXIMUM_SCALE", DataTypes.SMALLINT()),
+                Column.physical("SQL_DATA_TYPE", DataTypes.INT()),
+                Column.physical("SQL_DATETIME_SUB", DataTypes.INT()),
+                Column.physical("NUM_PREC_RADIX", DataTypes.INT()));
+    }
+
     private void assertSchemaEquals(ResolvedSchema expected, ResultSetMetaData metaData)
             throws Exception {
         assertThat(metaData.getColumnCount()).isEqualTo(expected.getColumnCount());
@@ -526,8 +596,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
         }
     }
 
-    private List<List<Object>> collect(java.sql.ResultSet result, int columnCount)
-            throws Exception {
+    private Set<List<Object>> collect(java.sql.ResultSet result, int columnCount) throws Exception {
         List<List<Object>> actual = new ArrayList<>();
         while (result.next()) {
             List<Object> row = new ArrayList<>();
@@ -536,6 +605,6 @@ public class HiveServer2EndpointITCase extends TestLogger {
             }
             actual.add(row);
         }
-        return actual;
+        return new LinkedHashSet<>(actual);
     }
 }