You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/18 12:01:41 UTC

[GitHub] [flink] fsk119 opened a new pull request, #20298: [FLINK-28150][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

fsk119 opened a new pull request, #20298:
URL: https://github.com/apache/flink/pull/20298

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *Allow to submit sql for HiveServer2 Endopint.*
   
   ## Brief change log
   
     - *Remove useless HandleIdentifier*
     - *Allow execute sql and fetch results from the HiveServer2 endpoint*
     - *Expose RowDataToString converter in the ResultSet for serialization*
   
   ## Verifying this change
   
     - *Added integration tests for all kinds of Flink SQL*
     - *Added tests to serialize all kinds of supported DataTypes.*
     - *Added negative test cases for RowData with DELETE/UPDATE*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r931086928


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
+import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
+import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
+
+/** ITCase to verify the statements. */
+public class HiveServer2EndpointStatementITCase extends AbstractSqlGatewayStatementITCase {
+
+    @RegisterExtension
+    @Order(3)
+    public static final HiveServer2EndpointExtension ENDPOINT_EXTENSION =
+            new HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+    private Connection connection;
+
+    @BeforeEach
+    @Override
+    public void before(@TempDir Path temporaryFolder) throws Exception {
+        super.before(temporaryFolder);
+        connection = ENDPOINT_EXTENSION.getConnection();
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        connection.close();
+    }
+
+    public static Stream<String> listHiveSqlTests() throws Exception {
+        return listTestSpecInTheSameModule("endpoint");
+    }
+
+    @ParameterizedTest
+    @MethodSource("listHiveSqlTests")
+    public void testHiveSqlStatements(String sqlPath) throws Exception {
+        runTest(sqlPath);
+    }
+
+    @Override
+    protected String runSingleStatement(String sql) throws Exception {
+        Statement statement = connection.createStatement();
+        statement.execute(sql);
+
+        ResultSet resultSet = statement.getResultSet();
+        ResultSetMetaData metaData = resultSet.getMetaData();
+
+        int columnSize = metaData.getColumnCount();
+        List<RowData> rows = new ArrayList<>();
+        DataType type = toStringifiedType(metaData);
+        while (resultSet.next()) {
+            GenericRowData stringifiedRowData = new GenericRowData(columnSize);
+            for (int i = 0; i < columnSize; i++) {
+                Object field = resultSet.getObject(i + 1);
+                // Similar to SIMPLE_ROW_DATA_TO_STRING_CONVERTER
+                if (field != null) {
+                    if (field instanceof Boolean) {
+                        stringifiedRowData.setField(i, field);
+                    } else if (field instanceof byte[]) {
+                        stringifiedRowData.setField(
+                                i,
+                                StringData.fromString(
+                                        new String((byte[]) field, StandardCharsets.UTF_8)));
+                    } else {
+                        stringifiedRowData.setField(i, StringData.fromString(field.toString()));
+                    }
+                }
+            }
+            rows.add(stringifiedRowData);
+        }
+
+        StatementType statementType = StatementType.match(sql);
+
+        return toString(
+                statementType,
+                DataTypeUtils.expandCompositeTypeToSchema(type),
+                SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
+                rows.iterator());
+    }
+
+    @Override
+    protected String stringifyException(Throwable t) {
+        return t.getMessage().trim();
+    }
+
+    @Override
+    protected boolean isStreaming() throws Exception {
+        Field sessHandleField = HiveConnection.class.getDeclaredField("sessHandle");

Review Comment:
   Yes, it could. But it needs much time to filter out all the unrelated config options.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r931717886


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +107,249 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testToTTableSchema(DataTypeSpec spec) {
+        TableSchema actual =
+                new TableSchema(
+                        toTTableSchema(DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+        List<Integer> javaSqlTypes =
+                Arrays.stream(actual.toTypeDescriptors())
+                        .map(desc -> desc.getType().toJavaSQLType())
+                        .collect(Collectors.toList());
+
+        assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToColumnBasedRowSet(DataTypeSpec spec) throws Exception {
+        List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toColumnBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V10);
+        Iterator<Object[]> iterator = rowSet.iterator();
+        if (spec.flinkType.getChildren().equals(Collections.singletonList(BYTES()))) {
+            assertArrayEquals((byte[]) spec.convertedColumnBasedValue, (byte[]) iterator.next()[0]);
+        } else {
+            assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+        }
+
+        assertEquals(spec.convertedNullValue, iterator.next()[0]);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToRowBasedRowSet(DataTypeSpec spec) throws Exception {
+        List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toRowBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V3);
+        Iterator<Object[]> iter = rowSet.iterator();
+        assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+        assertEquals(spec.convertedNullValue, iter.next()[0]);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Negative tests
+    // --------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", DataTypes.INT())),
+                                            Collections.singletonList(
+                                                    GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static List<DataTypeSpec> getDataTypeSpecs() {
+        Map<StringData, StringData> map = new HashMap<>();
+        map.put(StringData.fromString("World"), StringData.fromString("Hello"));
+        map.put(StringData.fromString("Hello"), StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .forType(BOOLEAN())
+                        .forValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .forType(TINYINT())
+                        .forValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .forType(SMALLINT())
+                        .forValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                DataTypeSpec.newSpec().forType(INT()).forValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .forType(BIGINT())
+                        .forValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .forType(FLOAT())
+                        .forValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .forType(DOUBLE())
+                        .forValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .forType(DECIMAL(9, 6))
+                        .forValue(DecimalData.fromBigDecimal(new BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .forType(STRING())
+                        .forValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .forType(BYTES())
+                        .forValue("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue(
+                                new String("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8)),
+                                "Flink SQL Gateway".getBytes(StandardCharsets.UTF_8)),
+                DataTypeSpec.newSpec()
+                        .forType(DATE())
+                        .forValue((int) LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .forType(TIMESTAMP(4))
+                        .forValue(
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .forType(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
+                        .forValue(mapData)
+                        .expectSqlType(Types.JAVA_OBJECT)
+                        .expectValue("{\"Hello\":\"World\",\"World\":\"Hello\"}")
+                        .expectNullValue("null"),
+                DataTypeSpec.newSpec()
+                        .forType(
+                                DataTypes.ARRAY(
+                                        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())))
+                        .forValue(new GenericArrayData(new Object[] {mapData, mapData}))
+                        // Hive uses STRING type
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue(
+                                "[{\"Hello\":\"World\",\"World\":\"Hello\"},{\"Hello\":\"World\",\"World\":\"Hello\"}]")
+                        .expectNullValue("null"));

Review Comment:
   Every DataType test contains two parts:
   
   1. Serialization can get the expected value for the specific value.
   2. Serialization can serialize the null value.
   
   Hive serializes the null value for the complex type to the string "null"  rather than null[1].
   
   [1] https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java#L317



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +107,249 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testToTTableSchema(DataTypeSpec spec) {
+        TableSchema actual =
+                new TableSchema(
+                        toTTableSchema(DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+        List<Integer> javaSqlTypes =
+                Arrays.stream(actual.toTypeDescriptors())
+                        .map(desc -> desc.getType().toJavaSQLType())
+                        .collect(Collectors.toList());
+
+        assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToColumnBasedRowSet(DataTypeSpec spec) throws Exception {
+        List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toColumnBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V10);
+        Iterator<Object[]> iterator = rowSet.iterator();
+        if (spec.flinkType.getChildren().equals(Collections.singletonList(BYTES()))) {
+            assertArrayEquals((byte[]) spec.convertedColumnBasedValue, (byte[]) iterator.next()[0]);
+        } else {
+            assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+        }
+
+        assertEquals(spec.convertedNullValue, iterator.next()[0]);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToRowBasedRowSet(DataTypeSpec spec) throws Exception {
+        List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toRowBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V3);
+        Iterator<Object[]> iter = rowSet.iterator();
+        assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+        assertEquals(spec.convertedNullValue, iter.next()[0]);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Negative tests
+    // --------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", DataTypes.INT())),
+                                            Collections.singletonList(
+                                                    GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static List<DataTypeSpec> getDataTypeSpecs() {
+        Map<StringData, StringData> map = new HashMap<>();
+        map.put(StringData.fromString("World"), StringData.fromString("Hello"));
+        map.put(StringData.fromString("Hello"), StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .forType(BOOLEAN())
+                        .forValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .forType(TINYINT())
+                        .forValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .forType(SMALLINT())
+                        .forValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                DataTypeSpec.newSpec().forType(INT()).forValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .forType(BIGINT())
+                        .forValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .forType(FLOAT())
+                        .forValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .forType(DOUBLE())
+                        .forValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .forType(DECIMAL(9, 6))
+                        .forValue(DecimalData.fromBigDecimal(new BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .forType(STRING())
+                        .forValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .forType(BYTES())
+                        .forValue("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue(
+                                new String("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8)),
+                                "Flink SQL Gateway".getBytes(StandardCharsets.UTF_8)),
+                DataTypeSpec.newSpec()
+                        .forType(DATE())
+                        .forValue((int) LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .forType(TIMESTAMP(4))
+                        .forValue(
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .forType(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))

Review Comment:
   I open a ticket in the [FLINK-28723](https://issues.apache.org/jira/browse/FLINK-28723).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r931189354


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +107,249 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testToTTableSchema(DataTypeSpec spec) {
+        TableSchema actual =
+                new TableSchema(
+                        toTTableSchema(DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+        List<Integer> javaSqlTypes =
+                Arrays.stream(actual.toTypeDescriptors())
+                        .map(desc -> desc.getType().toJavaSQLType())
+                        .collect(Collectors.toList());
+
+        assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToColumnBasedRowSet(DataTypeSpec spec) throws Exception {
+        List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toColumnBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V10);
+        Iterator<Object[]> iterator = rowSet.iterator();
+        if (spec.flinkType.getChildren().equals(Collections.singletonList(BYTES()))) {
+            assertArrayEquals((byte[]) spec.convertedColumnBasedValue, (byte[]) iterator.next()[0]);
+        } else {
+            assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+        }
+
+        assertEquals(spec.convertedNullValue, iterator.next()[0]);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToRowBasedRowSet(DataTypeSpec spec) throws Exception {
+        List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toRowBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V3);
+        Iterator<Object[]> iter = rowSet.iterator();
+        assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+        assertEquals(spec.convertedNullValue, iter.next()[0]);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Negative tests
+    // --------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", DataTypes.INT())),
+                                            Collections.singletonList(
+                                                    GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static List<DataTypeSpec> getDataTypeSpecs() {
+        Map<StringData, StringData> map = new HashMap<>();
+        map.put(StringData.fromString("World"), StringData.fromString("Hello"));
+        map.put(StringData.fromString("Hello"), StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .forType(BOOLEAN())
+                        .forValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .forType(TINYINT())
+                        .forValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .forType(SMALLINT())
+                        .forValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                DataTypeSpec.newSpec().forType(INT()).forValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .forType(BIGINT())
+                        .forValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .forType(FLOAT())
+                        .forValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .forType(DOUBLE())
+                        .forValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .forType(DECIMAL(9, 6))
+                        .forValue(DecimalData.fromBigDecimal(new BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .forType(STRING())
+                        .forValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .forType(BYTES())
+                        .forValue("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue(
+                                new String("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8)),
+                                "Flink SQL Gateway".getBytes(StandardCharsets.UTF_8)),
+                DataTypeSpec.newSpec()
+                        .forType(DATE())
+                        .forValue((int) LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .forType(TIMESTAMP(4))
+                        .forValue(
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .forType(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
+                        .forValue(mapData)
+                        .expectSqlType(Types.JAVA_OBJECT)
+                        .expectValue("{\"Hello\":\"World\",\"World\":\"Hello\"}")
+                        .expectNullValue("null"),
+                DataTypeSpec.newSpec()
+                        .forType(
+                                DataTypes.ARRAY(
+                                        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())))
+                        .forValue(new GenericArrayData(new Object[] {mapData, mapData}))
+                        // Hive uses STRING type
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue(
+                                "[{\"Hello\":\"World\",\"World\":\"Hello\"},{\"Hello\":\"World\",\"World\":\"Hello\"}]")
+                        .expectNullValue("null"));

Review Comment:
   Why there is a null value for the complex types?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -419,44 +569,49 @@ public boolean equals(Object o) {
         }
         HiveServer2Endpoint that = (HiveServer2Endpoint) o;
 
-        return minWorkerThreads == that.minWorkerThreads
+        return Objects.equals(host, that.host)
+                && port == that.port
+                && minWorkerThreads == that.minWorkerThreads
                 && maxWorkerThreads == that.maxWorkerThreads
                 && requestTimeoutMs == that.requestTimeoutMs
                 && backOffSlotLengthMs == that.backOffSlotLengthMs
                 && maxMessageSize == that.maxMessageSize
-                && port == that.port
                 && Objects.equals(workerKeepAliveTime, that.workerKeepAliveTime)
                 && Objects.equals(catalogName, that.catalogName)
                 && Objects.equals(defaultDatabase, that.defaultDatabase)
                 && Objects.equals(hiveConfPath, that.hiveConfPath)
                 && Objects.equals(allowEmbedded, that.allowEmbedded)
+                && Objects.equals(isVerbose, that.isVerbose)
                 && Objects.equals(moduleName, that.moduleName);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
+                host,
+                port,
                 minWorkerThreads,
                 maxWorkerThreads,
                 workerKeepAliveTime,
                 requestTimeoutMs,
                 backOffSlotLengthMs,
                 maxMessageSize,
-                port,
                 catalogName,
                 defaultDatabase,
                 hiveConfPath,
                 allowEmbedded,
+                isVerbose,
                 moduleName);
     }
 
     @Override
     public void run() {
         try {
-            LOG.info("HiveServer2 Endpoint begins to listen on {}.", port);
+            LOG.info("HiveServer2 Endpoint begins to listen on {}:{}.", host, port);

Review Comment:
   The hostname may be not resolvable by externals. We should print the IP address here. 
   
   You can resolve the host into IP address in the class constructor. 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -290,12 +309,34 @@ public void close() {
         }
 
         public ResultSet fetchResults(long token, int maxRows) {
+            return fetchResultsInternal(() -> resultFetcher.fetchResults(token, maxRows));
+        }
+
+        public ResultSet fetchResults(FetchOrientation orientation, int maxRows) {
+            return fetchResultsInternal(() -> resultFetcher.fetchResults(orientation, maxRows));
+        }
+
+        public ResolvedSchema getResultSchema() {
+            OperationStatus current = status.get();
+            if (current != OperationStatus.FINISHED || !hasResults) {

Review Comment:
   This is not addressed. It seems `hasResults` is redundant here?



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +107,249 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testToTTableSchema(DataTypeSpec spec) {
+        TableSchema actual =
+                new TableSchema(
+                        toTTableSchema(DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+        List<Integer> javaSqlTypes =
+                Arrays.stream(actual.toTypeDescriptors())
+                        .map(desc -> desc.getType().toJavaSQLType())
+                        .collect(Collectors.toList());
+
+        assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToColumnBasedRowSet(DataTypeSpec spec) throws Exception {
+        List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toColumnBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V10);
+        Iterator<Object[]> iterator = rowSet.iterator();
+        if (spec.flinkType.getChildren().equals(Collections.singletonList(BYTES()))) {
+            assertArrayEquals((byte[]) spec.convertedColumnBasedValue, (byte[]) iterator.next()[0]);
+        } else {
+            assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+        }
+
+        assertEquals(spec.convertedNullValue, iterator.next()[0]);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToRowBasedRowSet(DataTypeSpec spec) throws Exception {
+        List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toRowBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V3);
+        Iterator<Object[]> iter = rowSet.iterator();
+        assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+        assertEquals(spec.convertedNullValue, iter.next()[0]);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Negative tests
+    // --------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", DataTypes.INT())),
+                                            Collections.singletonList(
+                                                    GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static List<DataTypeSpec> getDataTypeSpecs() {
+        Map<StringData, StringData> map = new HashMap<>();
+        map.put(StringData.fromString("World"), StringData.fromString("Hello"));
+        map.put(StringData.fromString("Hello"), StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .forType(BOOLEAN())
+                        .forValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .forType(TINYINT())
+                        .forValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .forType(SMALLINT())
+                        .forValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                DataTypeSpec.newSpec().forType(INT()).forValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .forType(BIGINT())
+                        .forValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .forType(FLOAT())
+                        .forValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .forType(DOUBLE())
+                        .forValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .forType(DECIMAL(9, 6))
+                        .forValue(DecimalData.fromBigDecimal(new BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .forType(STRING())
+                        .forValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .forType(BYTES())
+                        .forValue("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue(
+                                new String("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8)),
+                                "Flink SQL Gateway".getBytes(StandardCharsets.UTF_8)),
+                DataTypeSpec.newSpec()
+                        .forType(DATE())
+                        .forValue((int) LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .forType(TIMESTAMP(4))
+                        .forValue(
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .forType(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))

Review Comment:
   Could you create an issue to support non-string keys in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r930961893


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -18,28 +18,238 @@
 
 package org.apache.flink.table.endpoint.hive.util;
 
-import org.apache.flink.table.gateway.api.HandleIdentifier;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.types.RowKind;
 
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TBoolValue;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TByteValue;
+import org.apache.hive.service.rpc.thrift.TCLIServiceConstants;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TColumnDesc;
+import org.apache.hive.service.rpc.thrift.TColumnValue;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TDoubleValue;
+import org.apache.hive.service.rpc.thrift.TFetchOrientation;
 import org.apache.hive.service.rpc.thrift.THandleIdentifier;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI16Value;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI32Value;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TI64Value;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
+import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRow;
+import org.apache.hive.service.rpc.thrift.TRowSet;
 import org.apache.hive.service.rpc.thrift.TSessionHandle;
 import org.apache.hive.service.rpc.thrift.TStatus;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+import org.apache.hive.service.rpc.thrift.TStringValue;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
+import org.apache.hive.service.rpc.thrift.TTypeEntry;
+import org.apache.hive.service.rpc.thrift.TTypeQualifierValue;
+import org.apache.hive.service.rpc.thrift.TTypeQualifiers;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V6;
 
 /** Conversion between thrift object and flink object. */
 public class ThriftObjectConversions {
 
+    private static final UUID SECRET_ID = UUID.fromString("b06fa16a-3d16-475f-b510-6c64abb9b173");
+
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle from/to Hive SessionHandle
+    // --------------------------------------------------------------------------------------------
+
     public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) {
-        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
+        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier(), SECRET_ID));
     }
 
     public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) {
-        return new SessionHandle(toHandleIdentifier(tSessionHandle.getSessionId()));
+        ByteBuffer bb = ByteBuffer.wrap(tSessionHandle.getSessionId().getGuid());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle && OperationHandle from/to Hive OperationHandle
+    // --------------------------------------------------------------------------------------------
+
+    public static TOperationHandle toTOperationHandle(
+            SessionHandle sessionHandle,
+            OperationHandle operationHandle,
+            OperationType operationType,
+            boolean hasResult) {
+        return new TOperationHandle(
+                toTHandleIdentifier(sessionHandle.getIdentifier(), operationHandle.getIdentifier()),
+                toTOperationType(operationType),
+                hasResult);
+    }
+
+    public static SessionHandle toSessionHandle(TOperationHandle tOperationHandle) {
+        ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getGuid());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    public static OperationHandle toOperationHandle(TOperationHandle tOperationHandle) {
+        ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getSecret());
+        return new OperationHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Operation related conversions
+    // --------------------------------------------------------------------------------------------
+
+    public static TOperationType toTOperationType(OperationType type) {
+        switch (type) {
+            case EXECUTE_STATEMENT:
+                return TOperationType.EXECUTE_STATEMENT;
+            case UNKNOWN:
+                return TOperationType.UNKNOWN;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unknown operation type: %s.", type));
+        }
+    }
+
+    public static TOperationState toTOperationState(OperationStatus operationStatus) {
+        switch (operationStatus) {
+            case INITIALIZED:
+                return TOperationState.INITIALIZED_STATE;
+            case PENDING:
+                return TOperationState.PENDING_STATE;
+            case RUNNING:
+                return TOperationState.RUNNING_STATE;
+            case FINISHED:
+                return TOperationState.FINISHED_STATE;
+            case ERROR:
+                return TOperationState.ERROR_STATE;
+            case TIMEOUT:
+                return TOperationState.TIMEDOUT_STATE;
+            case CANCELED:
+                return TOperationState.CANCELED_STATE;
+            case CLOSED:
+                return TOperationState.CLOSED_STATE;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unknown operation status: %s.", operationStatus));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Statement related conversions
+    // --------------------------------------------------------------------------------------------
+
+    public static FetchOrientation toFetchOrientation(int fetchOrientation) {
+        if (fetchOrientation == TFetchOrientation.FETCH_NEXT.getValue()) {
+            return FetchOrientation.FETCH_NEXT;
+        } else if (fetchOrientation == TFetchOrientation.FETCH_PRIOR.getValue()) {
+            return FetchOrientation.FETCH_PRIOR;
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unsupported fetch orientation: %s.", fetchOrientation));
+        }
+    }
+
+    /** Similar logic in the {@code org.apache.hive.service.cli.ColumnDescriptor}. */
+    public static TTableSchema toTTableSchema(ResolvedSchema schema) {
+        TTableSchema tSchema = new TTableSchema();
+
+        for (int i = 0; i < schema.getColumnCount(); i++) {
+            Column column = schema.getColumns().get(i);
+            TColumnDesc desc = new TColumnDesc();
+            desc.setColumnName(column.getName());
+            column.getComment().ifPresent(desc::setComment);
+            desc.setPosition(i);
+
+            TTypeDesc typeDesc = new TTypeDesc();
+
+            // Hive uses the TPrimitiveTypeEntry only. Please refer to TypeDescriptor#toTTypeDesc.
+            DataType columnType = column.getDataType();
+            TPrimitiveTypeEntry typeEntry =
+                    new TPrimitiveTypeEntry(

Review Comment:
   No. We will transform the complex types to json string in the server side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r930738752


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -154,7 +160,11 @@ public void registerCatalog(String catalogName, Catalog catalog) {
     }
 
     public void registerModule(String moduleName, Module module) {
+        Deque<String> moduleNames = new ArrayDeque<>(sessionState.moduleManager.listModules());
+        moduleNames.addFirst(moduleName);

Review Comment:
   If the semantic of this method is registering the module at the head, the method name should be called `registerModuleAtHead` (the same as `SessionEnvironment.Builder#registerModule`). 



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
+import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
+import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
+
+/** ITCase to verify the statements. */
+public class HiveServer2EndpointStatementITCase extends AbstractSqlGatewayStatementITCase {
+
+    @RegisterExtension
+    @Order(3)
+    public static final HiveServer2EndpointExtension ENDPOINT_EXTENSION =
+            new HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+    private Connection connection;
+
+    @BeforeEach
+    @Override
+    public void before(@TempDir Path temporaryFolder) throws Exception {
+        super.before(temporaryFolder);
+        connection = ENDPOINT_EXTENSION.getConnection();
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        connection.close();
+    }
+
+    public static Stream<String> listHiveSqlTests() throws Exception {
+        return listTestSpecInTheSameModule("endpoint");
+    }
+
+    @ParameterizedTest
+    @MethodSource("listHiveSqlTests")
+    public void testHiveSqlStatements(String sqlPath) throws Exception {
+        runTest(sqlPath);
+    }
+
+    @Override
+    protected String runSingleStatement(String sql) throws Exception {
+        Statement statement = connection.createStatement();
+        statement.execute(sql);
+
+        ResultSet resultSet = statement.getResultSet();
+        ResultSetMetaData metaData = resultSet.getMetaData();
+
+        int columnSize = metaData.getColumnCount();
+        List<RowData> rows = new ArrayList<>();
+        DataType type = toStringifiedType(metaData);
+        while (resultSet.next()) {
+            GenericRowData stringifiedRowData = new GenericRowData(columnSize);
+            for (int i = 0; i < columnSize; i++) {
+                Object field = resultSet.getObject(i + 1);
+                // Similar to SIMPLE_ROW_DATA_TO_STRING_CONVERTER
+                if (field != null) {
+                    if (field instanceof Boolean) {
+                        stringifiedRowData.setField(i, field);
+                    } else if (field instanceof byte[]) {
+                        stringifiedRowData.setField(
+                                i,
+                                StringData.fromString(
+                                        new String((byte[]) field, StandardCharsets.UTF_8)));
+                    } else {
+                        stringifiedRowData.setField(i, StringData.fromString(field.toString()));
+                    }
+                }
+            }
+            rows.add(stringifiedRowData);
+        }
+
+        StatementType statementType = StatementType.match(sql);
+
+        return toString(
+                statementType,
+                DataTypeUtils.expandCompositeTypeToSchema(type),
+                SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
+                rows.iterator());
+    }
+
+    @Override
+    protected String stringifyException(Throwable t) {
+        return t.getMessage().trim();
+    }
+
+    @Override
+    protected boolean isStreaming() throws Exception {
+        Field sessHandleField = HiveConnection.class.getDeclaredField("sessHandle");
+        // Set the accessibility as true
+        sessHandleField.setAccessible(true);
+        SessionHandle sessionHandle =
+                ThriftObjectConversions.toSessionHandle(
+                        (TSessionHandle) sessHandleField.get(connection));
+        return Configuration.fromMap(service.getSessionConfig(sessionHandle))
+                .get(ExecutionOptions.RUNTIME_MODE)
+                .equals(RuntimeExecutionMode.STREAMING);
+    }
+
+    @Override
+    protected void initializeSession() throws Exception {

Review Comment:
   This method looks very strange to me. I was thinking it's a pre-action for `testHiveSqlStatements` as well. Rename the method to `resetSessionForFlinkSqlStatements` ?  Alternatively, we can separate `testFlinkSqlStatements` and `testHiveSqlStatements` into different test classes. 



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
+import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
+import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
+
+/** ITCase to verify the statements. */
+public class HiveServer2EndpointStatementITCase extends AbstractSqlGatewayStatementITCase {
+
+    @RegisterExtension
+    @Order(3)
+    public static final HiveServer2EndpointExtension ENDPOINT_EXTENSION =
+            new HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+    private Connection connection;
+
+    @BeforeEach
+    @Override
+    public void before(@TempDir Path temporaryFolder) throws Exception {
+        super.before(temporaryFolder);
+        connection = ENDPOINT_EXTENSION.getConnection();
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        connection.close();
+    }
+
+    public static Stream<String> listHiveSqlTests() throws Exception {
+        return listTestSpecInTheSameModule("endpoint");
+    }
+
+    @ParameterizedTest
+    @MethodSource("listHiveSqlTests")
+    public void testHiveSqlStatements(String sqlPath) throws Exception {
+        runTest(sqlPath);
+    }
+
+    @Override
+    protected String runSingleStatement(String sql) throws Exception {
+        Statement statement = connection.createStatement();
+        statement.execute(sql);
+
+        ResultSet resultSet = statement.getResultSet();
+        ResultSetMetaData metaData = resultSet.getMetaData();
+
+        int columnSize = metaData.getColumnCount();
+        List<RowData> rows = new ArrayList<>();
+        DataType type = toStringifiedType(metaData);
+        while (resultSet.next()) {
+            GenericRowData stringifiedRowData = new GenericRowData(columnSize);
+            for (int i = 0; i < columnSize; i++) {
+                Object field = resultSet.getObject(i + 1);
+                // Similar to SIMPLE_ROW_DATA_TO_STRING_CONVERTER
+                if (field != null) {
+                    if (field instanceof Boolean) {
+                        stringifiedRowData.setField(i, field);
+                    } else if (field instanceof byte[]) {
+                        stringifiedRowData.setField(
+                                i,
+                                StringData.fromString(
+                                        new String((byte[]) field, StandardCharsets.UTF_8)));
+                    } else {
+                        stringifiedRowData.setField(i, StringData.fromString(field.toString()));
+                    }
+                }
+            }
+            rows.add(stringifiedRowData);
+        }
+
+        StatementType statementType = StatementType.match(sql);
+
+        return toString(
+                statementType,
+                DataTypeUtils.expandCompositeTypeToSchema(type),
+                SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
+                rows.iterator());
+    }
+
+    @Override
+    protected String stringifyException(Throwable t) {
+        return t.getMessage().trim();
+    }
+
+    @Override
+    protected boolean isStreaming() throws Exception {
+        Field sessHandleField = HiveConnection.class.getDeclaredField("sessHandle");

Review Comment:
   nit: In theory, the configuration can be got by executing a `SET` statement, right?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -126,13 +145,15 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     // --------------------------------------------------------------------------------------------
 
     private final SqlGatewayService service;
+    private final String host;

Review Comment:
   Print the ip address when hiveserver is started. 
   
   ```
   LOG.info("HiveServer2 Endpoint begins to listen on {}:{}.", address, port);
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -213,11 +187,7 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) {
                 throw new SqlExecutionException(msg);
             }
             return new ResultSet(
-                    ResultSet.ResultType.PAYLOAD,
-                    currentToken,
-                    resultSchema,
-                    converter,
-                    new LinkedList<>(bufferedPrevResults));
+                    ResultSet.ResultType.PAYLOAD, currentToken, resultSchema, bufferedPrevResults);

Review Comment:
   Is the coping of `bufferedPrevResults` necessary? It seems there might be a new `fetchResult()` be called before the `ResultSet` is consumed. 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java:
##########
@@ -35,6 +35,15 @@ public class HiveServer2EndpointConfigOptions {
     // Server Options
     // --------------------------------------------------------------------------------------------
 
+    public static final ConfigOption<String> THRIFT_HOST =
+            ConfigOptions.key("thrift.host")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            "The server address of HiverServer2 host to be used for communication."
+                                    + "Default is empty, which means the to bind to the localhost. "

Review Comment:
   If default is localhost, why not use it as default value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r929707617


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +110,250 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            TableSchema actual =
+                    new TableSchema(
+                            toTTableSchema(
+                                    DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+            List<Integer> javaSqlTypes =
+                    Arrays.stream(actual.toTypeDescriptors())
+                            .map(desc -> desc.getType().toJavaSQLType())
+                            .collect(Collectors.toList());
+
+            assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+        }
+    }
+
+    @Test
+    public void testResultSetToColumnBasedRowSet() throws Exception {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {

Review Comment:
   Use JUnit5 `@ParameterizedTest` and `@ValueSource` to write parameterized test methods.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +110,250 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            TableSchema actual =
+                    new TableSchema(
+                            toTTableSchema(
+                                    DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+            List<Integer> javaSqlTypes =
+                    Arrays.stream(actual.toTypeDescriptors())
+                            .map(desc -> desc.getType().toJavaSQLType())
+                            .collect(Collectors.toList());
+
+            assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+        }
+    }
+
+    @Test
+    public void testResultSetToColumnBasedRowSet() throws Exception {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+            TRowSet tRowSet =
+                    toColumnBasedSet(
+                            fieldTypes,
+                            IntStream.range(0, fieldTypes.size())
+                                    .mapToObj(
+                                            pos ->
+                                                    RowData.createFieldGetter(
+                                                            fieldTypes.get(pos), pos))
+                                    .collect(Collectors.toList()),
+                            new RowDataToStringConverterImpl(
+                                    spec.flinkType,
+                                    DateTimeUtils.UTC_ZONE.toZoneId(),
+                                    Thread.currentThread().getContextClassLoader(),
+                                    false),
+                            Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+            RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V10);
+            Iterator<Object[]> iterator = rowSet.iterator();
+            if (spec.flinkType.getChildren().equals(Collections.singletonList(DataTypes.BYTES()))) {
+                assertArrayEquals(
+                        (byte[]) spec.convertedColumnBasedValue, (byte[]) iterator.next()[0]);
+            } else {
+                assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+            }
+
+            assertNull(iterator.next()[0]);
+        }
+    }
+
+    @Test
+    public void testResultSetToRowBasedRowSet() throws Exception {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+            TRowSet tRowSet =
+                    toRowBasedSet(
+                            fieldTypes,
+                            IntStream.range(0, fieldTypes.size())
+                                    .mapToObj(
+                                            pos ->
+                                                    RowData.createFieldGetter(
+                                                            fieldTypes.get(pos), pos))
+                                    .collect(Collectors.toList()),
+                            new RowDataToStringConverterImpl(
+                                    spec.flinkType,
+                                    DateTimeUtils.UTC_ZONE.toZoneId(),
+                                    Thread.currentThread().getContextClassLoader(),
+                                    false),
+                            Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+            RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V3);
+            Iterator<Object[]> iter = rowSet.iterator();
+            assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+            assertNull(iter.next()[0]);
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Negative tests
+    // --------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", DataTypes.INT())),
+                                            SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
+                                            Collections.singletonList(
+                                                    GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private List<DataTypeSpec> getDataTypeSpecs() {
+        Map<Integer, StringData> map = new HashMap<>();
+        map.put(1, StringData.fromString("Hello"));
+        map.put(Integer.MAX_VALUE, StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .withType(BOOLEAN())
+                        .withValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .withType(TINYINT())
+                        .withValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .withType(SMALLINT())
+                        .withValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                DataTypeSpec.newSpec().withType(INT()).withValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .withType(BIGINT())
+                        .withValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .withType(FLOAT())
+                        .withValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .withType(DOUBLE())
+                        .withValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .withType(DECIMAL(9, 6))
+                        .withValue(DecimalData.fromBigDecimal(new BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .withType(STRING())
+                        .withValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .withType(BYTES())
+                        .withValue("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectRowBasedValue("x'" + EncodingUtils.hex("Flink SQL Gateway") + "'"),
+                DataTypeSpec.newSpec()
+                        .withType(DATE())
+                        .withValue((int) LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .withType(TIMESTAMP(4))
+                        .withValue(
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .withType(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))
+                        .withValue(mapData)
+                        .expectSqlType(Types.JAVA_OBJECT)
+                        .expectValue("{2147483647=World, 1=Hello}"),
+                DataTypeSpec.newSpec()
+                        .withType(
+                                DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())))
+                        .withValue(new GenericArrayData(new Object[] {mapData, mapData}))
+                        // Hive uses STRING type
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("[{2147483647=World, 1=Hello}, {2147483647=World, 1=Hello}]"));
+    }
+
+    private static class DataTypeSpec {

Review Comment:
   Add `toString` for this class for better error method when running a spec is failed. 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java:
##########
@@ -48,7 +48,7 @@ public class ResultStore {
     private final List<RowData> recordsBuffer = new ArrayList<>();
     private final int maxBufferSize;
 
-    private final Object resultLock = new Object();
+    private final Object resultLock = new ReentrantLock();

Review Comment:
   Why use `ReentrantLock`?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java:
##########
@@ -128,6 +140,8 @@ private void processRecord(RowData row) {
                 }
             }
             recordsBuffer.add(row);
+            // Notify the consumer to consume
+            resultLock.notify();

Review Comment:
   Should be `notifyAll()`?



##########
flink-connectors/flink-connector-hive/src/test/resources/endpoint/hive_catalog.q:
##########
@@ -0,0 +1,121 @@
+# catalog_database.q - CREATE/DROP/SHOW/USE CATALOG/DATABASE
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ==========================================================================
+# test hive catalog
+# ==========================================================================
+
+show current catalog;
+!output
++----------------------+
+| current catalog name |
++----------------------+
+|                 hive |
++----------------------+
+1 row in set
+!ok
+
+show databases;
+!output
++---------------+
+| database name |
++---------------+
+|       default |
++---------------+
+1 row in set
+!ok
+
+show tables;
+!output
+Empty set
+!ok
+
+create database additional_test_database;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+use additional_test_database;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+create table param_types_table (
+    dec DECIMAL(10, 10),
+    ch CHAR(5),
+    vch VARCHAR(15)
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+show tables;
+!output
++-------------------+
+|        table name |
++-------------------+
+| param_types_table |
++-------------------+
+1 row in set
+!ok
+
+show current database;
+!output
++--------------------------+
+|    current database name |
++--------------------------+
+| additional_test_database |
++--------------------------+
+1 row in set
+!ok
+
+# ==========================================================================
+# test hive table with parameterized types
+# ==========================================================================
+
+describe hive.additional_test_database.param_types_table;
+!output
++------+-----------------+------+-----+--------+-----------+
+| name |            type | null | key | extras | watermark |

Review Comment:
   It seems the `describe` command doesn't not compatible with HiveSQL? 



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +110,250 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {

Review Comment:
   Use JUnit5 `@ParameterizedTest` and `@ValueSource` to write parameterized test methods.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +110,250 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            TableSchema actual =
+                    new TableSchema(
+                            toTTableSchema(
+                                    DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+            List<Integer> javaSqlTypes =
+                    Arrays.stream(actual.toTypeDescriptors())
+                            .map(desc -> desc.getType().toJavaSQLType())
+                            .collect(Collectors.toList());
+
+            assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+        }
+    }
+
+    @Test
+    public void testResultSetToColumnBasedRowSet() throws Exception {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+            TRowSet tRowSet =
+                    toColumnBasedSet(
+                            fieldTypes,
+                            IntStream.range(0, fieldTypes.size())
+                                    .mapToObj(
+                                            pos ->
+                                                    RowData.createFieldGetter(
+                                                            fieldTypes.get(pos), pos))
+                                    .collect(Collectors.toList()),
+                            new RowDataToStringConverterImpl(
+                                    spec.flinkType,
+                                    DateTimeUtils.UTC_ZONE.toZoneId(),
+                                    Thread.currentThread().getContextClassLoader(),
+                                    false),
+                            Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+            RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V10);
+            Iterator<Object[]> iterator = rowSet.iterator();
+            if (spec.flinkType.getChildren().equals(Collections.singletonList(DataTypes.BYTES()))) {
+                assertArrayEquals(
+                        (byte[]) spec.convertedColumnBasedValue, (byte[]) iterator.next()[0]);
+            } else {
+                assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+            }
+
+            assertNull(iterator.next()[0]);
+        }
+    }
+
+    @Test
+    public void testResultSetToRowBasedRowSet() throws Exception {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            List<LogicalType> fieldTypes = spec.flinkType.getLogicalType().getChildren();
+            TRowSet tRowSet =
+                    toRowBasedSet(
+                            fieldTypes,
+                            IntStream.range(0, fieldTypes.size())
+                                    .mapToObj(
+                                            pos ->
+                                                    RowData.createFieldGetter(
+                                                            fieldTypes.get(pos), pos))
+                                    .collect(Collectors.toList()),
+                            new RowDataToStringConverterImpl(
+                                    spec.flinkType,
+                                    DateTimeUtils.UTC_ZONE.toZoneId(),
+                                    Thread.currentThread().getContextClassLoader(),
+                                    false),
+                            Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+            RowSet rowSet = RowSetFactory.create(tRowSet, HIVE_CLI_SERVICE_PROTOCOL_V3);
+            Iterator<Object[]> iter = rowSet.iterator();
+            assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+            assertNull(iter.next()[0]);
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Negative tests
+    // --------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", DataTypes.INT())),
+                                            SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
+                                            Collections.singletonList(
+                                                    GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private List<DataTypeSpec> getDataTypeSpecs() {
+        Map<Integer, StringData> map = new HashMap<>();
+        map.put(1, StringData.fromString("Hello"));
+        map.put(Integer.MAX_VALUE, StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .withType(BOOLEAN())
+                        .withValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .withType(TINYINT())
+                        .withValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .withType(SMALLINT())
+                        .withValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                DataTypeSpec.newSpec().withType(INT()).withValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .withType(BIGINT())
+                        .withValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .withType(FLOAT())
+                        .withValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .withType(DOUBLE())
+                        .withValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .withType(DECIMAL(9, 6))
+                        .withValue(DecimalData.fromBigDecimal(new BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .withType(STRING())
+                        .withValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .withType(BYTES())
+                        .withValue("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue("Flink SQL Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectRowBasedValue("x'" + EncodingUtils.hex("Flink SQL Gateway") + "'"),
+                DataTypeSpec.newSpec()
+                        .withType(DATE())
+                        .withValue((int) LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .withType(TIMESTAMP(4))
+                        .withValue(
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .withType(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))
+                        .withValue(mapData)
+                        .expectSqlType(Types.JAVA_OBJECT)
+                        .expectValue("{2147483647=World, 1=Hello}"),
+                DataTypeSpec.newSpec()
+                        .withType(
+                                DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())))
+                        .withValue(new GenericArrayData(new Object[] {mapData, mapData}))
+                        // Hive uses STRING type
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("[{2147483647=World, 1=Hello}, {2147483647=World, 1=Hello}]"));
+    }
+
+    private static class DataTypeSpec {
+        DataType flinkType;
+        Integer sqlType;
+        RowData flinkValue;
+        Object convertedColumnBasedValue;
+        Object convertedRowBasedValue;
+
+        public static DataTypeSpec newSpec() {
+            DataTypeSpec spec = new DataTypeSpec();
+            spec.flinkValue = new GenericRowData(1);
+            return spec;
+        }
+
+        public DataTypeSpec withType(DataType flinkType) {
+            this.flinkType = DataTypes.ROW(flinkType);
+            return this;
+        }
+
+        public DataTypeSpec expectSqlType(int sqlType) {
+            this.sqlType = sqlType;
+            return this;
+        }
+
+        public DataTypeSpec withValue(Object flinkValue) {
+            this.flinkValue = GenericRowData.of(flinkValue);
+            this.convertedColumnBasedValue = flinkValue;
+            this.convertedRowBasedValue = flinkValue;
+            return this;
+        }
+
+        public DataTypeSpec expectValue(Object convertedValue) {
+            this.convertedColumnBasedValue = convertedValue;
+            this.convertedRowBasedValue = convertedValue;
+            return this;
+        }
+
+        public DataTypeSpec expectRowBasedValue(Object convertedRowBasedValue) {
+            this.convertedRowBasedValue = convertedRowBasedValue;
+            return this;
+        }

Review Comment:
   The semantics of the three methods are confusing to me. The invoking order is critical, otherwise, the result is unexpected. How about separating them into 3 methods: 
   
   ```java
   withValue -> forValue() // withType -> forType as well. 
   expectValue(Object rowAndColumnBasedValue)
   expectValue(Object rowBasedValue, Object columnBasedValue)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r930644202


##########
flink-connectors/flink-connector-hive/src/test/resources/endpoint/hive_catalog.q:
##########
@@ -0,0 +1,121 @@
+# catalog_database.q - CREATE/DROP/SHOW/USE CATALOG/DATABASE
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ==========================================================================
+# test hive catalog
+# ==========================================================================
+
+show current catalog;
+!output
++----------------------+
+| current catalog name |
++----------------------+
+|                 hive |
++----------------------+
+1 row in set
+!ok
+
+show databases;
+!output
++---------------+
+| database name |
++---------------+
+|       default |
++---------------+
+1 row in set
+!ok
+
+show tables;
+!output
+Empty set
+!ok
+
+create database additional_test_database;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+use additional_test_database;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+create table param_types_table (
+    dec DECIMAL(10, 10),
+    ch CHAR(5),
+    vch VARCHAR(15)
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+show tables;
+!output
++-------------------+
+|        table name |
++-------------------+
+| param_types_table |
++-------------------+
+1 row in set
+!ok
+
+show current database;
+!output
++--------------------------+
+|    current database name |
++--------------------------+
+| additional_test_database |
++--------------------------+
+1 row in set
+!ok
+
+# ==========================================================================
+# test hive table with parameterized types
+# ==========================================================================
+
+describe hive.additional_test_database.param_types_table;
+!output
++------+-----------------+------+-----+--------+-----------+
+| name |            type | null | key | extras | watermark |

Review Comment:
   Yes. Maybe we should introduce a hive-style describe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
wuchong commented on PR #20298:
URL: https://github.com/apache/flink/pull/20298#issuecomment-1195317288

   Besides, I left some comments on the commit: https://github.com/apache/flink/commit/d067629d4d200f940d0b58759459d7ff5832b292. You can address the comments in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #20298:
URL: https://github.com/apache/flink/pull/20298#issuecomment-1198804456

   Merged into master:
   
   6a95672db9cf23c9110ec9b0d4701e9c1e3acfa1
   f477a43ff23576cd2e1f8c632f78458948245df4
   1fb5875ae6e2536266275c28e6260019706c28f2
   27cecdebccc95e9acf6ba38616b14e531b35d1be


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r931843918


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -628,7 +628,9 @@ private void buildTThreadPoolServer() {
                             new TThreadPoolServer.Args(
                                             new TServerSocket(
                                                     new ServerSocket(
-                                                            port, -1, InetAddress.getByName(host))))
+                                                            port,
+                                                            -1,
+                                                            InetAddress.getByName(hostAddress))))

Review Comment:
   `hostAddress` has been resolved, we don't need to resolve again. 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java:
##########
@@ -77,21 +80,33 @@ public String factoryIdentifier() {
 
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
         return new HashSet<>(
                 Arrays.asList(
+                        THRIFT_HOST,
                         THRIFT_PORT,
                         THRIFT_MAX_MESSAGE_SIZE,
                         THRIFT_LOGIN_TIMEOUT,
                         THRIFT_WORKER_THREADS_MIN,
                         THRIFT_WORKER_THREADS_MAX,
                         THRIFT_WORKER_KEEPALIVE_TIME,
                         CATALOG_NAME,
+                        CATALOG_HIVE_CONF_DIR,
+                        CATALOG_DEFAULT_DATABASE,
                         MODULE_NAME));
     }
 
-    @Override
-    public Set<ConfigOption<?>> optionalOptions() {
-        return new HashSet<>(Arrays.asList(CATALOG_HIVE_CONF_DIR, CATALOG_DEFAULT_DATABASE));
+    private static String getHostAddress(String hostName) {
+        try {
+            return InetAddress.getByName(hostName).getHostAddress();
+        } catch (UnknownHostException e) {
+            throw new ValidationException(
+                    String.format("Can not get the address for the host '%s'.", hostName));

Review Comment:
   throw the root exception



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -315,7 +315,7 @@ public ResultSet fetchResults(FetchOrientation orientation, int maxRows) {
 
         public ResolvedSchema getResultSchema() {
             OperationStatus current = status.get();
-            if (current != OperationStatus.FINISHED || !hasResults) {

Review Comment:
   What's the usage of `hasResults`? It seems never used. Can we remove the variable? 



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java:
##########
@@ -79,7 +81,7 @@ public void testCreateHiveServer2Endpoint() {
                         Collections.singletonList(
                                 new HiveServer2Endpoint(
                                         service,
-                                        "localhost",
+                                        "127.0.0.1",

Review Comment:
   The ip address of "localhost" might be different across machines. So this test is not deterministic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20298: [FLINK-28150][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20298:
URL: https://github.com/apache/flink/pull/20298#issuecomment-1187245968

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f0d442f24bb5ce2cdc541a1c4367f4a28a5209aa",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f0d442f24bb5ce2cdc541a1c4367f4a28a5209aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f0d442f24bb5ce2cdc541a1c4367f4a28a5209aa UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r929567090


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -51,25 +261,250 @@ public static TStatus toTStatus(Throwable t) {
 
     // --------------------------------------------------------------------------------------------
 
-    private static THandleIdentifier toTHandleIdentifier(HandleIdentifier identifier) {
+    private static THandleIdentifier toTHandleIdentifier(UUID publicId, UUID secretId) {
         byte[] guid = new byte[16];
         byte[] secret = new byte[16];
         ByteBuffer guidBB = ByteBuffer.wrap(guid);
         ByteBuffer secretBB = ByteBuffer.wrap(secret);
 
-        guidBB.putLong(identifier.getPublicId().getMostSignificantBits());
-        guidBB.putLong(identifier.getPublicId().getLeastSignificantBits());
-        secretBB.putLong(identifier.getSecretId().getMostSignificantBits());
-        secretBB.putLong(identifier.getSecretId().getLeastSignificantBits());
+        guidBB.putLong(publicId.getMostSignificantBits());
+        guidBB.putLong(publicId.getLeastSignificantBits());
+        secretBB.putLong(secretId.getMostSignificantBits());
+        secretBB.putLong(secretId.getLeastSignificantBits());
         return new THandleIdentifier(ByteBuffer.wrap(guid), ByteBuffer.wrap(secret));
     }
 
-    private static HandleIdentifier toHandleIdentifier(THandleIdentifier tHandleId) {
-        ByteBuffer bb = ByteBuffer.wrap(tHandleId.getGuid());
-        UUID publicId = new UUID(bb.getLong(), bb.getLong());
-        bb = ByteBuffer.wrap(tHandleId.getSecret());
-        UUID secretId = new UUID(bb.getLong(), bb.getLong());
-        return new HandleIdentifier(publicId, secretId);
+    @VisibleForTesting
+    public static TRowSet toColumnBasedSet(
+            List<LogicalType> fieldTypes,
+            List<RowData.FieldGetter> fieldGetters,
+            RowDataToStringConverter converter,
+            List<RowData> rows) {
+        int rowNum = rows.size();
+        // TODO: Support accurate start offset
+        TRowSet rowSet = new TRowSet(0, new ArrayList<>(rowNum));
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            int index = i;
+            rowSet.addToColumns(
+                    toTColumn(
+                            fieldTypes.get(i),
+                            fieldGetters.get(i),
+                            row -> row.isNullAt(index),
+                            row -> converter.convert(row, index),
+                            rows));
+        }
+        return rowSet;
+    }
+
+    private static TColumn toTColumn(
+            LogicalType fieldType,
+            RowData.FieldGetter fieldGetter,
+            Function<RowData, Boolean> isNull,
+            Function<RowData, String> stringifiedValue,
+            List<RowData> rows) {
+        BitSet nulls = new BitSet();
+        switch (fieldType.getTypeRoot()) {

Review Comment:
   I think the comment is not accurate in the TCLService.thrift. I read the implementation it actually convert the TIMESTAMP to STRING value.
   
   [1] https://github.com/apache/hive/blob/1c3406ea598e0c2d866b20747602c1a01fa5a425/service/src/java/org/apache/hive/service/cli/ColumnValue.java#L140
   [2] https://github.com/apache/hive/blob/1c3406ea598e0c2d866b20747602c1a01fa5a425/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java#L129



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r929604300


##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java:
##########
@@ -108,6 +120,16 @@ void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle
     OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle)
             throws SqlGatewayException;
 
+    /**
+     * Get the result schema for the specified Operation.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param operationHandle handle to identify the operation.
+     */
+    ResolvedSchema getOperationResultSchema(

Review Comment:
   The SQL Operation allows ueser to fetch schema when it is in the RUNNING or FINISHED state. But because of the Operation Lock, the GetResultSetMetadata (needs to get the lock before execution) is only able to get the schema when the Operation is FINISHED. 
   
   The [HiveStatement](https://github.com/apache/hive/blob/b76c64024ac340c7a1d764f68bac15501b2bc8d6/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java#L294) fetches the schema until the result is ready.
   
   I just wonder whether it's better we block the request until the schema is available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r928809112


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -196,7 +214,8 @@ public HiveServer2Endpoint(
             @Nullable String hiveConfPath,
             @Nullable String defaultDatabase,
             String moduleName,
-            boolean allowEmbedded) {
+            boolean allowEmbedded,
+            boolean isVerbose) {
         this.service = service;
 
         this.port = port;

Review Comment:
   The thrift bind host should also be able to configure, otherwise, the bound host is not deterministic when the node has multiple network cards. See `hive.server2.thrift.bind.host`.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -18,28 +18,238 @@
 
 package org.apache.flink.table.endpoint.hive.util;
 
-import org.apache.flink.table.gateway.api.HandleIdentifier;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.types.RowKind;
 
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TBoolValue;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TByteValue;
+import org.apache.hive.service.rpc.thrift.TCLIServiceConstants;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TColumnDesc;
+import org.apache.hive.service.rpc.thrift.TColumnValue;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TDoubleValue;
+import org.apache.hive.service.rpc.thrift.TFetchOrientation;
 import org.apache.hive.service.rpc.thrift.THandleIdentifier;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI16Value;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI32Value;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TI64Value;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
+import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRow;
+import org.apache.hive.service.rpc.thrift.TRowSet;
 import org.apache.hive.service.rpc.thrift.TSessionHandle;
 import org.apache.hive.service.rpc.thrift.TStatus;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+import org.apache.hive.service.rpc.thrift.TStringValue;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
+import org.apache.hive.service.rpc.thrift.TTypeEntry;
+import org.apache.hive.service.rpc.thrift.TTypeQualifierValue;
+import org.apache.hive.service.rpc.thrift.TTypeQualifiers;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V6;
 
 /** Conversion between thrift object and flink object. */
 public class ThriftObjectConversions {
 
+    private static final UUID SECRET_ID = UUID.fromString("b06fa16a-3d16-475f-b510-6c64abb9b173");
+
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle from/to Hive SessionHandle
+    // --------------------------------------------------------------------------------------------
+
     public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) {
-        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
+        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier(), SECRET_ID));
     }
 
     public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) {
-        return new SessionHandle(toHandleIdentifier(tSessionHandle.getSessionId()));
+        ByteBuffer bb = ByteBuffer.wrap(tSessionHandle.getSessionId().getGuid());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle && OperationHandle from/to Hive OperationHandle
+    // --------------------------------------------------------------------------------------------
+
+    public static TOperationHandle toTOperationHandle(
+            SessionHandle sessionHandle,
+            OperationHandle operationHandle,
+            OperationType operationType,
+            boolean hasResult) {
+        return new TOperationHandle(
+                toTHandleIdentifier(sessionHandle.getIdentifier(), operationHandle.getIdentifier()),

Review Comment:
   1. Please add a comment on why we use session identifier as the publicId of Hive operation handle
   2. Maybe we should switch the order? session identifier as the secretId, and operation identifier as the publicId?  We use Flink identifier as Hive publicId in the session handle conversion. 



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java:
##########
@@ -108,6 +120,16 @@ void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle
     OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle)
             throws SqlGatewayException;
 
+    /**
+     * Get the result schema for the specified Operation.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param operationHandle handle to identify the operation.
+     */
+    ResolvedSchema getOperationResultSchema(

Review Comment:
   Add a NOTE to warn users this method should be called when `getOperationInfo(..).getStatus() == FINISHED`. TBH, this sounds too restricted to me (what if this is a long query?). 
   
   Besides, does Hive driver client follow this contract? Otherwise, it may occur exceptions when running SELECT queries. 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -18,28 +18,238 @@
 
 package org.apache.flink.table.endpoint.hive.util;
 
-import org.apache.flink.table.gateway.api.HandleIdentifier;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.types.RowKind;
 
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TBoolValue;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TByteValue;
+import org.apache.hive.service.rpc.thrift.TCLIServiceConstants;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TColumnDesc;
+import org.apache.hive.service.rpc.thrift.TColumnValue;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TDoubleValue;
+import org.apache.hive.service.rpc.thrift.TFetchOrientation;
 import org.apache.hive.service.rpc.thrift.THandleIdentifier;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI16Value;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI32Value;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TI64Value;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
+import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRow;
+import org.apache.hive.service.rpc.thrift.TRowSet;
 import org.apache.hive.service.rpc.thrift.TSessionHandle;
 import org.apache.hive.service.rpc.thrift.TStatus;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+import org.apache.hive.service.rpc.thrift.TStringValue;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
+import org.apache.hive.service.rpc.thrift.TTypeEntry;
+import org.apache.hive.service.rpc.thrift.TTypeQualifierValue;
+import org.apache.hive.service.rpc.thrift.TTypeQualifiers;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V6;
 
 /** Conversion between thrift object and flink object. */
 public class ThriftObjectConversions {
 
+    private static final UUID SECRET_ID = UUID.fromString("b06fa16a-3d16-475f-b510-6c64abb9b173");
+
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle from/to Hive SessionHandle
+    // --------------------------------------------------------------------------------------------
+
     public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) {
-        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
+        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier(), SECRET_ID));
     }
 
     public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) {
-        return new SessionHandle(toHandleIdentifier(tSessionHandle.getSessionId()));
+        ByteBuffer bb = ByteBuffer.wrap(tSessionHandle.getSessionId().getGuid());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle && OperationHandle from/to Hive OperationHandle
+    // --------------------------------------------------------------------------------------------
+
+    public static TOperationHandle toTOperationHandle(
+            SessionHandle sessionHandle,
+            OperationHandle operationHandle,
+            OperationType operationType,
+            boolean hasResult) {
+        return new TOperationHandle(
+                toTHandleIdentifier(sessionHandle.getIdentifier(), operationHandle.getIdentifier()),
+                toTOperationType(operationType),
+                hasResult);
+    }
+
+    public static SessionHandle toSessionHandle(TOperationHandle tOperationHandle) {
+        ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getGuid());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    public static OperationHandle toOperationHandle(TOperationHandle tOperationHandle) {
+        ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getSecret());
+        return new OperationHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Operation related conversions
+    // --------------------------------------------------------------------------------------------
+
+    public static TOperationType toTOperationType(OperationType type) {
+        switch (type) {
+            case EXECUTE_STATEMENT:
+                return TOperationType.EXECUTE_STATEMENT;
+            case UNKNOWN:
+                return TOperationType.UNKNOWN;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unknown operation type: %s.", type));
+        }
+    }
+
+    public static TOperationState toTOperationState(OperationStatus operationStatus) {
+        switch (operationStatus) {
+            case INITIALIZED:
+                return TOperationState.INITIALIZED_STATE;
+            case PENDING:
+                return TOperationState.PENDING_STATE;
+            case RUNNING:
+                return TOperationState.RUNNING_STATE;
+            case FINISHED:
+                return TOperationState.FINISHED_STATE;
+            case ERROR:
+                return TOperationState.ERROR_STATE;
+            case TIMEOUT:
+                return TOperationState.TIMEDOUT_STATE;
+            case CANCELED:
+                return TOperationState.CANCELED_STATE;
+            case CLOSED:
+                return TOperationState.CLOSED_STATE;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unknown operation status: %s.", operationStatus));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Statement related conversions
+    // --------------------------------------------------------------------------------------------
+
+    public static FetchOrientation toFetchOrientation(int fetchOrientation) {
+        if (fetchOrientation == TFetchOrientation.FETCH_NEXT.getValue()) {
+            return FetchOrientation.FETCH_NEXT;
+        } else if (fetchOrientation == TFetchOrientation.FETCH_PRIOR.getValue()) {
+            return FetchOrientation.FETCH_PRIOR;
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unsupported fetch orientation: %s.", fetchOrientation));
+        }
+    }
+
+    /** Similar logic in the {@code org.apache.hive.service.cli.ColumnDescriptor}. */
+    public static TTableSchema toTTableSchema(ResolvedSchema schema) {
+        TTableSchema tSchema = new TTableSchema();
+
+        for (int i = 0; i < schema.getColumnCount(); i++) {
+            Column column = schema.getColumns().get(i);
+            TColumnDesc desc = new TColumnDesc();
+            desc.setColumnName(column.getName());
+            column.getComment().ifPresent(desc::setComment);
+            desc.setPosition(i);
+
+            TTypeDesc typeDesc = new TTypeDesc();
+
+            // Hive uses the TPrimitiveTypeEntry only. Please refer to TypeDescriptor#toTTypeDesc.
+            DataType columnType = column.getDataType();
+            TPrimitiveTypeEntry typeEntry =
+                    new TPrimitiveTypeEntry(

Review Comment:
   Does this mean we only support mapping primitive types? Do not support complex types? 



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +110,250 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {

Review Comment:
   ?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -290,12 +309,34 @@ public void close() {
         }
 
         public ResultSet fetchResults(long token, int maxRows) {
+            return fetchResultsInternal(() -> resultFetcher.fetchResults(token, maxRows));
+        }
+
+        public ResultSet fetchResults(FetchOrientation orientation, int maxRows) {
+            return fetchResultsInternal(() -> resultFetcher.fetchResults(orientation, maxRows));
+        }
+
+        public ResolvedSchema getResultSchema() {
+            OperationStatus current = status.get();
+            if (current != OperationStatus.FINISHED || !hasResults) {

Review Comment:
   `hasResults` is always `true`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/print/RowDataToStringConverter.java:
##########
@@ -24,5 +24,15 @@
 /** Interface to convert {@link RowData} to {@link String} using the SQL casting rules. */
 @Internal
 public interface RowDataToStringConverter {
-    String[] convert(RowData rowData);
+
+    default String[] convert(RowData rowData) {
+        int arity = rowData.getArity();
+        String[] converted = new String[arity];
+        for (int i = 0; i < arity; i++) {
+            converted[i] = convert(rowData, i);
+        }
+        return converted;
+    }
+
+    String convert(RowData rowData, int index);

Review Comment:
   1. Add comments to the methods. 
   2. convert -> convertColumn.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -51,25 +261,250 @@ public static TStatus toTStatus(Throwable t) {
 
     // --------------------------------------------------------------------------------------------
 
-    private static THandleIdentifier toTHandleIdentifier(HandleIdentifier identifier) {
+    private static THandleIdentifier toTHandleIdentifier(UUID publicId, UUID secretId) {
         byte[] guid = new byte[16];
         byte[] secret = new byte[16];
         ByteBuffer guidBB = ByteBuffer.wrap(guid);
         ByteBuffer secretBB = ByteBuffer.wrap(secret);
 
-        guidBB.putLong(identifier.getPublicId().getMostSignificantBits());
-        guidBB.putLong(identifier.getPublicId().getLeastSignificantBits());
-        secretBB.putLong(identifier.getSecretId().getMostSignificantBits());
-        secretBB.putLong(identifier.getSecretId().getLeastSignificantBits());
+        guidBB.putLong(publicId.getMostSignificantBits());
+        guidBB.putLong(publicId.getLeastSignificantBits());
+        secretBB.putLong(secretId.getMostSignificantBits());
+        secretBB.putLong(secretId.getLeastSignificantBits());
         return new THandleIdentifier(ByteBuffer.wrap(guid), ByteBuffer.wrap(secret));
     }
 
-    private static HandleIdentifier toHandleIdentifier(THandleIdentifier tHandleId) {
-        ByteBuffer bb = ByteBuffer.wrap(tHandleId.getGuid());
-        UUID publicId = new UUID(bb.getLong(), bb.getLong());
-        bb = ByteBuffer.wrap(tHandleId.getSecret());
-        UUID secretId = new UUID(bb.getLong(), bb.getLong());
-        return new HandleIdentifier(publicId, secretId);
+    @VisibleForTesting
+    public static TRowSet toColumnBasedSet(
+            List<LogicalType> fieldTypes,
+            List<RowData.FieldGetter> fieldGetters,
+            RowDataToStringConverter converter,
+            List<RowData> rows) {
+        int rowNum = rows.size();
+        // TODO: Support accurate start offset
+        TRowSet rowSet = new TRowSet(0, new ArrayList<>(rowNum));
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            int index = i;
+            rowSet.addToColumns(
+                    toTColumn(
+                            fieldTypes.get(i),
+                            fieldGetters.get(i),
+                            row -> row.isNullAt(index),
+                            row -> converter.convert(row, index),
+                            rows));
+        }
+        return rowSet;
+    }
+
+    private static TColumn toTColumn(
+            LogicalType fieldType,
+            RowData.FieldGetter fieldGetter,
+            Function<RowData, Boolean> isNull,
+            Function<RowData, String> stringifiedValue,
+            List<RowData> rows) {
+        BitSet nulls = new BitSet();
+        switch (fieldType.getTypeRoot()) {

Review Comment:
   According to https://github.com/apache/hive/blame/master/service-rpc/if/TCLIService.thrift#L405, we should map TIMESTAMP, TIMESTAMP_LTZ to int64 as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 closed pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
fsk119 closed pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint
URL: https://github.com/apache/flink/pull/20298


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r930714033


##########
flink-connectors/flink-connector-hive/src/test/resources/endpoint/hive_catalog.q:
##########
@@ -0,0 +1,121 @@
+# catalog_database.q - CREATE/DROP/SHOW/USE CATALOG/DATABASE
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ==========================================================================
+# test hive catalog
+# ==========================================================================
+
+show current catalog;
+!output
++----------------------+
+| current catalog name |
++----------------------+
+|                 hive |
++----------------------+
+1 row in set
+!ok
+
+show databases;
+!output
++---------------+
+| database name |
++---------------+
+|       default |
++---------------+
+1 row in set
+!ok
+
+show tables;
+!output
+Empty set
+!ok
+
+create database additional_test_database;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+use additional_test_database;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+create table param_types_table (
+    dec DECIMAL(10, 10),
+    ch CHAR(5),
+    vch VARCHAR(15)
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+show tables;
+!output
++-------------------+
+|        table name |
++-------------------+
+| param_types_table |
++-------------------+
+1 row in set
+!ok
+
+show current database;
+!output
++--------------------------+
+|    current database name |
++--------------------------+
+| additional_test_database |
++--------------------------+
+1 row in set
+!ok
+
+# ==========================================================================
+# test hive table with parameterized types
+# ==========================================================================
+
+describe hive.additional_test_database.param_types_table;
+!output
++------+-----------------+------+-----+--------+-----------+
+| name |            type | null | key | extras | watermark |

Review Comment:
   Yes. I think so. Flink `describe` may not contain rich information like Hive does. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r931088836


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
+import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
+import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
+
+/** ITCase to verify the statements. */
+public class HiveServer2EndpointStatementITCase extends AbstractSqlGatewayStatementITCase {
+
+    @RegisterExtension
+    @Order(3)
+    public static final HiveServer2EndpointExtension ENDPOINT_EXTENSION =
+            new HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+    private Connection connection;
+
+    @BeforeEach
+    @Override
+    public void before(@TempDir Path temporaryFolder) throws Exception {
+        super.before(temporaryFolder);
+        connection = ENDPOINT_EXTENSION.getConnection();
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        connection.close();
+    }
+
+    public static Stream<String> listHiveSqlTests() throws Exception {
+        return listTestSpecInTheSameModule("endpoint");
+    }
+
+    @ParameterizedTest
+    @MethodSource("listHiveSqlTests")
+    public void testHiveSqlStatements(String sqlPath) throws Exception {
+        runTest(sqlPath);
+    }
+
+    @Override
+    protected String runSingleStatement(String sql) throws Exception {
+        Statement statement = connection.createStatement();
+        statement.execute(sql);
+
+        ResultSet resultSet = statement.getResultSet();
+        ResultSetMetaData metaData = resultSet.getMetaData();
+
+        int columnSize = metaData.getColumnCount();
+        List<RowData> rows = new ArrayList<>();
+        DataType type = toStringifiedType(metaData);
+        while (resultSet.next()) {
+            GenericRowData stringifiedRowData = new GenericRowData(columnSize);
+            for (int i = 0; i < columnSize; i++) {
+                Object field = resultSet.getObject(i + 1);
+                // Similar to SIMPLE_ROW_DATA_TO_STRING_CONVERTER
+                if (field != null) {
+                    if (field instanceof Boolean) {
+                        stringifiedRowData.setField(i, field);
+                    } else if (field instanceof byte[]) {
+                        stringifiedRowData.setField(
+                                i,
+                                StringData.fromString(
+                                        new String((byte[]) field, StandardCharsets.UTF_8)));
+                    } else {
+                        stringifiedRowData.setField(i, StringData.fromString(field.toString()));
+                    }
+                }
+            }
+            rows.add(stringifiedRowData);
+        }
+
+        StatementType statementType = StatementType.match(sql);
+
+        return toString(
+                statementType,
+                DataTypeUtils.expandCompositeTypeToSchema(type),
+                SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
+                rows.iterator());
+    }
+
+    @Override
+    protected String stringifyException(Throwable t) {
+        return t.getMessage().trim();
+    }
+
+    @Override
+    protected boolean isStreaming() throws Exception {
+        Field sessHandleField = HiveConnection.class.getDeclaredField("sessHandle");
+        // Set the accessibility as true
+        sessHandleField.setAccessible(true);
+        SessionHandle sessionHandle =
+                ThriftObjectConversions.toSessionHandle(
+                        (TSessionHandle) sessHandleField.get(connection));
+        return Configuration.fromMap(service.getSessionConfig(sessionHandle))
+                .get(ExecutionOptions.RUNTIME_MODE)
+                .equals(RuntimeExecutionMode.STREAMING);
+    }
+
+    @Override
+    protected void initializeSession() throws Exception {

Review Comment:
   Because many codes are shared. I think it's better to rename the method name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org