You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/04 12:35:50 UTC

[flink] branch master updated (75a66f921c9 -> e3b26c283ca)

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 75a66f921c9 [FLINK-25252][kafka] Enabling Kafka tests on Java 11
     new 4b73437524f [FLINK-28629][sql-gateway] Return operation schema until operation is in terminal state
     new e3b26c283ca [FLINK-28629][sql-gateway][hive] Allow to getCatalogs in the HiveServer2 Endpoint

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/endpoint/hive/HiveServer2Endpoint.java   |  29 ++++-
 .../table/endpoint/hive/HiveServer2Schemas.java    |  34 +++--
 .../endpoint/hive/util/StringRowDataUtils.java     |  18 ++-
 .../hive/util/ThriftObjectConversions.java         |   2 +
 .../endpoint/hive/HiveServer2EndpointITCase.java   |  40 ++++++
 .../flink/table/gateway/api/SqlGatewayService.java |  15 ++-
 .../table/gateway/api/operation/OperationType.java |   3 +
 .../table/gateway/api/results/OperationInfo.java   |  17 ++-
 .../gateway/api/utils/MockedSqlGatewayService.java |   6 +
 .../gateway/service/SqlGatewayServiceImpl.java     |  11 ++
 .../service/operation/OperationExecutor.java       |   7 ++
 .../service/operation/OperationManager.java        |  25 ++--
 .../table/gateway/service/session/Session.java     |   4 +
 .../gateway/service/SqlGatewayServiceITCase.java   | 139 ++++++++++++++-------
 14 files changed, 268 insertions(+), 82 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java => flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java (56%)
 copy flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogResult.java => flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java (62%)


[flink] 02/02: [FLINK-28629][sql-gateway][hive] Allow to getCatalogs in the HiveServer2 Endpoint

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3b26c283ca3ad266978b719d21e2da72ff225da
Author: Shengkai <10...@qq.com>
AuthorDate: Tue Aug 2 12:18:19 2022 +0800

    [FLINK-28629][sql-gateway][hive] Allow to getCatalogs in the HiveServer2 Endpoint
    
    This closes #20334
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 29 +++++++++++++++-
 .../table/endpoint/hive/HiveServer2Schemas.java    | 25 +++++++++-----
 .../endpoint/hive/util/StringRowDataUtils.java     | 24 ++++++++-----
 .../hive/util/ThriftObjectConversions.java         |  2 ++
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 40 ++++++++++++++++++++++
 .../flink/table/gateway/api/SqlGatewayService.java | 15 +++++++-
 .../table/gateway/api/operation/OperationType.java |  3 ++
 .../gateway/api/utils/MockedSqlGatewayService.java |  6 ++++
 .../gateway/service/SqlGatewayServiceImpl.java     | 11 ++++++
 .../service/operation/OperationExecutor.java       |  7 ++++
 .../table/gateway/service/session/Session.java     |  4 +++
 .../gateway/service/SqlGatewayServiceITCase.java   | 12 +++++++
 12 files changed, 158 insertions(+), 20 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 8ae5bb23954..0772a4e0b24 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.endpoint.hive.util.StringRowDataUtils;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
@@ -112,6 +113,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -119,6 +121,7 @@ import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
+import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
 import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
 import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFetchOrientation;
@@ -130,6 +133,7 @@ import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema;
+import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -390,7 +394,30 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
     @Override
     public TGetCatalogsResp GetCatalogs(TGetCatalogsReq tGetCatalogsReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetCatalogsResp resp = new TGetCatalogsResp();
+        try {
+            SessionHandle sessionHandle = toSessionHandle(tGetCatalogsReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(
+                            sessionHandle,
+                            OperationType.LIST_CATALOGS,
+                            () -> {
+                                Set<String> catalogNames = service.listCatalogs(sessionHandle);
+                                return new ResultSet(
+                                        EOS,
+                                        null,
+                                        GET_CATALOGS_SCHEMA,
+                                        StringRowDataUtils.toRowData(catalogNames));
+                            });
+            resp.setStatus(OK_STATUS);
+            resp.setOperationHandle(
+                    toTOperationHandle(
+                            sessionHandle, operationHandle, OperationType.LIST_CATALOGS));
+        } catch (Throwable t) {
+            LOG.error("Failed to GetCatalogs.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
similarity index 51%
copy from flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
index babe99fd3f9..0a6818330fd 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
@@ -16,16 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.gateway.api.operation;
+package org.apache.flink.table.endpoint.hive;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
 
-/** The Operation Type. */
-@PublicEvolving
-public enum OperationType {
-    /** The type indicates the operation executes statements. */
-    EXECUTE_STATEMENT,
+import java.util.Collections;
 
-    /** The type indicates the operation is unknown. */
-    UNKNOWN;
+/** Schemas for the HiveServer2 Endpoint result. */
+public class HiveServer2Schemas {
+
+    /** Schema for {@link HiveServer2Endpoint#GetCatalogs}. */
+    public static final ResolvedSchema GET_CATALOGS_SCHEMA =
+            new ResolvedSchema(
+                    Collections.singletonList(
+                            Column.physical("TABLE_CAT", DataTypes.STRING())
+                                    .withComment("Catalog name. NULL if not applicable.")),
+                    Collections.emptyList(),
+                    null);
 }
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java
similarity index 57%
copy from flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java
index babe99fd3f9..103f7311c89 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java
@@ -16,16 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.gateway.api.operation;
+package org.apache.flink.table.endpoint.hive.util;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 
-/** The Operation Type. */
-@PublicEvolving
-public enum OperationType {
-    /** The type indicates the operation executes statements. */
-    EXECUTE_STATEMENT,
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
 
-    /** The type indicates the operation is unknown. */
-    UNKNOWN;
+/** Convert the {@link String} to {@link RowData}. */
+public class StringRowDataUtils {
+
+    public static List<RowData> toRowData(Collection<String> external) {
+        return external.stream()
+                .map(val -> GenericRowData.of(StringData.fromString(val)))
+                .collect(Collectors.toList());
+    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index eccf3c7e1f4..235796e4eb3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -160,6 +160,8 @@ public class ThriftObjectConversions {
         switch (type) {
             case EXECUTE_STATEMENT:
                 return TOperationType.EXECUTE_STATEMENT;
+            case LIST_CATALOGS:
+                return TOperationType.GET_CATALOGS;
             case UNKNOWN:
                 return TOperationType.UNKNOWN;
             default:
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 269e1a21f3b..7b567e184c0 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -19,8 +19,12 @@
 package org.apache.flink.table.endpoint.hive;
 
 import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.endpoint.hive.util.HiveServer2EndpointExtension;
 import org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
@@ -38,6 +42,7 @@ import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
+import org.apache.hive.jdbc.JdbcColumn;
 import org.apache.hive.service.rpc.thrift.TCLIService;
 import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
 import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
@@ -57,8 +62,11 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.net.InetAddress;
 import java.sql.Connection;
+import java.sql.ResultSetMetaData;
 import java.util.AbstractMap;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
@@ -186,6 +194,23 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                                         operationHandle)))));
     }
 
+    @Test
+    public void testGetCatalogs() throws Exception {
+        try (Connection connection = ENDPOINT_EXTENSION.getConnection()) {
+            java.sql.ResultSet result = connection.getMetaData().getCatalogs();
+            assertSchemaEquals(
+                    ResolvedSchema.of(Column.physical("TABLE_CAT", DataTypes.STRING())),
+                    result.getMetaData());
+
+            List<String> actual = new ArrayList<>();
+            while (result.next()) {
+                actual.add(result.getString(1));
+            }
+
+            assertThat(actual).contains("hive", "default_catalog");
+        }
+    }
+
     private void runOperationRequest(
             ThrowingConsumer<TOperationHandle, Exception> manipulateOp,
             BiConsumerWithException<SessionHandle, OperationHandle, Exception> operationValidator)
@@ -225,4 +250,19 @@ public class HiveServer2EndpointITCase extends TestLogger {
         transport.open();
         return new TCLIService.Client(new TBinaryProtocol(transport));
     }
+
+    private void assertSchemaEquals(ResolvedSchema expected, ResultSetMetaData metaData)
+            throws Exception {
+        assertThat(metaData.getColumnCount()).isEqualTo(expected.getColumnCount());
+        for (int i = 1; i <= metaData.getColumnCount(); i++) {
+            Column column =
+                    expected.getColumn(i - 1)
+                            .orElseThrow(() -> new RuntimeException("Can not get column."));
+            assertThat(metaData.getColumnName(i)).isEqualTo(column.getName());
+            int jdbcType =
+                    JdbcColumn.hiveTypeToSqlType(
+                            HiveTypeUtil.toHiveTypeInfo(column.getDataType(), false).getTypeName());
+            assertThat(metaData.getColumnType(i)).isEqualTo(jdbcType);
+        }
+    }
 }
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index fd2ed677243..97b377c59c4 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 
 /** A service of SQL gateway is responsible for handling requests from the endpoints. */
@@ -135,7 +136,7 @@ public interface SqlGatewayService {
             throws SqlGatewayException;
 
     // -------------------------------------------------------------------------------------------
-    // Statements
+    // Statements API
     // -------------------------------------------------------------------------------------------
 
     /**
@@ -185,4 +186,16 @@ public interface SqlGatewayService {
             OperationHandle operationHandle,
             FetchOrientation orientation,
             int maxRows);
+
+    // -------------------------------------------------------------------------------------------
+    // Catalog API
+    // -------------------------------------------------------------------------------------------
+
+    /**
+     * Return all available catalogs in the current session.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @return names of the registered catalogs.
+     */
+    Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException;
 }
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
index babe99fd3f9..6177008b264 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
@@ -26,6 +26,9 @@ public enum OperationType {
     /** The type indicates the operation executes statements. */
     EXECUTE_STATEMENT,
 
+    /** The type indicates the operation list catalogs. */
+    LIST_CATALOGS,
+
     /** The type indicates the operation is unknown. */
     UNKNOWN;
 }
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index a75acd72566..af0ff36ec6e 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 
 /** Mocked {@link SqlGatewayService}. */
@@ -114,4 +115,9 @@ public class MockedSqlGatewayService implements SqlGatewayService {
             throws SqlGatewayException {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 3f6678e45e4..9419b6071c3 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 
 /** The implementation of the {@link SqlGatewayService} interface. */
@@ -210,6 +211,16 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
         }
     }
 
+    @Override
+    public Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException {
+        try {
+            return getSession(sessionHandle).createExecutor().listCatalogs();
+        } catch (Throwable t) {
+            LOG.error("Failed to listCatalogs.", t);
+            throw new SqlGatewayException("Failed to listCatalogs.", t);
+        }
+    }
+
     @VisibleForTesting
     Session getSession(SessionHandle sessionHandle) {
         return sessionManager.getSession(sessionHandle);
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index b54afea608e..e49a91f2ae8 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -46,6 +46,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
 import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
@@ -99,6 +100,12 @@ public class OperationExecutor {
         }
     }
 
+    public Set<String> listCatalogs() {
+        return getTableEnvironment().getCatalogManager().listCatalogs();
+    }
+
+    // --------------------------------------------------------------------------------------------
+
     @VisibleForTesting
     public TableEnvironmentInternal getTableEnvironment() {
         TableEnvironmentInternal tableEnv = sessionContext.createTableEnvironment();
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
index 74689d7e5ac..87d77a87c36 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
@@ -66,6 +66,10 @@ public class Session implements Closeable {
         return sessionContext.getOperationManager();
     }
 
+    public OperationExecutor createExecutor() {
+        return sessionContext.createOperationExecutor(new Configuration());
+    }
+
     public OperationExecutor createExecutor(Configuration executionConfig) {
         return sessionContext.createOperationExecutor(executionConfig);
     }
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 9cf33996b04..69880a74dec 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -304,6 +304,18 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
     }
 
+    @Test
+    public void testListCatalogs() {
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .registerCatalog("cat1", new GenericInMemoryCatalog("cat1"))
+                        .registerCatalog("cat2", new GenericInMemoryCatalog("cat2"))
+                        .build();
+        SessionHandle sessionHandle = service.openSession(environment);
+        assertThat(service.listCatalogs(sessionHandle)).contains("cat1", "cat2");
+    }
+
     // --------------------------------------------------------------------------------------------
     // Concurrent tests
     // --------------------------------------------------------------------------------------------


[flink] 01/02: [FLINK-28629][sql-gateway] Return operation schema until operation is in terminal state

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b73437524fe9718a50feed2ce47c9e17c27ed98
Author: Shengkai <10...@qq.com>
AuthorDate: Tue Aug 2 11:56:13 2022 +0800

    [FLINK-28629][sql-gateway] Return operation schema until operation is in terminal state
---
 .../table/gateway/api/results/OperationInfo.java   |  17 ++-
 .../service/operation/OperationManager.java        |  25 ++--
 .../gateway/service/SqlGatewayServiceITCase.java   | 127 +++++++++++++--------
 3 files changed, 114 insertions(+), 55 deletions(-)

diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
index 3abe6bdb7ce..a85404f8400 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.gateway.api.results;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
 import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nullable;
 
@@ -67,11 +68,25 @@ public class OperationInfo {
             return false;
         }
         OperationInfo that = (OperationInfo) o;
-        return status == that.status && type == that.type && exception == that.exception;
+        return status == that.status
+                && type == that.type
+                && Objects.equals(exception, that.exception);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(status, type, exception);
     }
+
+    @Override
+    public String toString() {
+        return "OperationInfo{"
+                + "status="
+                + status
+                + ", type="
+                + type
+                + ", exception="
+                + (exception == null ? "null" : ExceptionUtils.stringifyException(exception))
+                + '}';
+    }
 }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 1e941317ce7..f493c24f868 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -157,7 +157,8 @@ public class OperationManager {
      *
      * @param operationHandle identifies the {@link Operation}.
      */
-    public ResolvedSchema getOperationResultSchema(OperationHandle operationHandle) {
+    public ResolvedSchema getOperationResultSchema(OperationHandle operationHandle)
+            throws Exception {
         return getOperation(operationHandle).getResultSchema();
     }
 
@@ -210,7 +211,6 @@ public class OperationManager {
         private final OperationHandle operationHandle;
 
         private final OperationType operationType;
-        private final boolean hasResults;
         private final AtomicReference<OperationStatus> status;
 
         private final Callable<ResultFetcher> resultSupplier;
@@ -226,7 +226,6 @@ public class OperationManager {
             this.operationHandle = operationHandle;
             this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
             this.operationType = operationType;
-            this.hasResults = true;
             this.resultSupplier = resultSupplier;
         }
 
@@ -313,13 +312,21 @@ public class OperationManager {
             return fetchResultsInternal(() -> resultFetcher.fetchResults(orientation, maxRows));
         }
 
-        public ResolvedSchema getResultSchema() {
+        public ResolvedSchema getResultSchema() throws Exception {
+            synchronized (status) {
+                while (!status.get().isTerminalStatus()) {
+                    status.wait();
+                }
+            }
             OperationStatus current = status.get();
-            if (current != OperationStatus.FINISHED || !hasResults) {
+            if (current == OperationStatus.ERROR) {
+                throw operationError;
+            } else if (current != OperationStatus.FINISHED) {
                 throw new IllegalStateException(
-                        "The result schema is available when the Operation is in FINISHED state and the Operation indicates it has data.");
+                        String.format(
+                                "The result schema is available when the Operation is in FINISHED state but the current status is %s.",
+                                status));
             }
-
             return resultFetcher.getResultSchema();
         }
 
@@ -361,6 +368,10 @@ public class OperationManager {
                 }
             } while (!status.compareAndSet(currentStatus, toStatus));
 
+            synchronized (status) {
+                status.notifyAll();
+            }
+
             LOG.debug(
                     String.format(
                             "Convert operation %s from %s to %s.",
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 283d10b8b90..9cf33996b04 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -47,10 +47,9 @@ import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
-import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Condition;
-import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -60,10 +59,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
@@ -75,11 +75,8 @@ import static org.apache.flink.types.RowKind.DELETE;
 import static org.apache.flink.types.RowKind.INSERT;
 import static org.apache.flink.types.RowKind.UPDATE_AFTER;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** ITCase for {@link SqlGatewayServiceImpl}. */
 public class SqlGatewayServiceITCase extends AbstractTestBase {
@@ -119,13 +116,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         SessionHandle sessionHandle = service.openSession(environment);
         Map<String, String> actualConfig = service.getSessionConfig(sessionHandle);
 
-        options.forEach(
-                (k, v) ->
-                        assertThat(
-                                String.format(
-                                        "Should contains (%s, %s) in the actual config.", k, v),
-                                actualConfig,
-                                Matchers.hasEntry(k, v)));
+        assertThat(actualConfig).containsAllEntriesOf(options);
     }
 
     @Test
@@ -150,9 +141,9 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 service.getSession(sessionHandle)
                         .createExecutor(new Configuration())
                         .getTableEnvironment();
-        assertEquals(catalogName, tableEnv.getCurrentCatalog());
-        assertEquals(databaseName, tableEnv.getCurrentDatabase());
-        assertTrue(new HashSet<>(Arrays.asList(tableEnv.listModules())).contains(moduleName));
+        assertThat(tableEnv.getCurrentCatalog()).isEqualTo(catalogName);
+        assertThat(tableEnv.getCurrentDatabase()).isEqualTo(databaseName);
+        assertThat(tableEnv.listModules()).contains(moduleName);
     }
 
     @Test
@@ -170,9 +161,8 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                         });
 
         startRunningLatch.await();
-        assertEquals(
-                ResultSet.NOT_READY_RESULTS,
-                service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE));
+        assertThat(service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE))
+                .isEqualTo(ResultSet.NOT_READY_RESULTS);
         endRunningLatch.countDown();
     }
 
@@ -192,9 +182,8 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                         });
 
         startRunningLatch.await();
-        assertEquals(
-                new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN),
-                service.getOperationInfo(sessionHandle, operationHandle));
+        assertThat(service.getOperationInfo(sessionHandle, operationHandle))
+                .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
 
         endRunningLatch.countDown();
         OperationInfo expectedInfo =
@@ -214,10 +203,10 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             actualData.addAll(checkNotNull(currentResult.getData()));
             token = currentResult.getNextToken();
         }
-        assertEquals(expectedData, actualData);
+        assertThat(actualData).isEqualTo(expectedData);
 
         service.closeOperation(sessionHandle, operationHandle);
-        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+        assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
     }
 
     @Test
@@ -236,17 +225,15 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                         });
 
         startRunningLatch.await();
-        assertEquals(
-                new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN),
-                service.getOperationInfo(sessionHandle, operationHandle));
+        assertThat(service.getOperationInfo(sessionHandle, operationHandle))
+                .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
 
         service.cancelOperation(sessionHandle, operationHandle);
 
-        assertEquals(
-                new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN),
-                service.getOperationInfo(sessionHandle, operationHandle));
+        assertThat(service.getOperationInfo(sessionHandle, operationHandle))
+                .isEqualTo(new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN));
         service.closeOperation(sessionHandle, operationHandle);
-        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+        assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
     }
 
     @Test
@@ -273,14 +260,14 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 Duration.ofSeconds(10),
                 "Failed to get expected operation status.");
 
-        Assertions.assertThatThrownBy(
+        assertThatThrownBy(
                         () ->
                                 service.fetchResults(
                                         sessionHandle, operationHandle, 0, Integer.MAX_VALUE))
                 .satisfies(anyCauseMatches(SqlExecutionException.class, msg));
 
         service.closeOperation(sessionHandle, operationHandle);
-        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+        assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
     }
 
     @Test
@@ -304,11 +291,17 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             token = result.getNextToken();
         }
 
-        assertThat(
-                settings,
-                hasItem(
+        assertThat(settings)
+                .contains(
                         GenericRowData.of(
-                                StringData.fromString(key), StringData.fromString(value))));
+                                StringData.fromString(key), StringData.fromString(value)));
+    }
+
+    @Test
+    public void testGetOperationSchemaUntilOperationIsReady() throws Exception {
+        runGetOperationSchemaUntilOperationIsReadyOrError(
+                this::getDefaultResultSet,
+                task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
     }
 
     // --------------------------------------------------------------------------------------------
@@ -402,7 +395,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 "All operations should be closed.");
 
         for (OperationManager.Operation op : operations) {
-            assertEquals(OperationStatus.CLOSED, op.getOperationInfo().getStatus());
+            assertThat(op.getOperationInfo().getStatus()).isEqualTo(OperationStatus.CLOSED);
         }
     }
 
@@ -426,7 +419,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         }
         manager.close();
         latch.await();
-        assertEquals(0, manager.getOperationCount());
+        assertThat(manager.getOperationCount()).isEqualTo(0);
     }
 
     @Test
@@ -445,7 +438,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                                             () -> {
                                                 startRunning.countDown();
                                                 terminateRunning.await();
-                                                return null;
+                                                return getDefaultResultSet();
                                             }))
                     .start();
         }
@@ -472,7 +465,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                                 // another thread
                                 int origin = v.get();
                                 v.set(origin + 1);
-                                return null;
+                                return getDefaultResultSet();
                             }));
         }
         for (OperationHandle handle : handles) {
@@ -485,7 +478,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                     "Failed to wait operation terminate");
         }
 
-        assertEquals(threadNum, v.get());
+        assertThat(v.get()).isEqualTo(threadNum);
     }
 
     @Test
@@ -503,7 +496,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                             OperationType.UNKNOWN,
                             () -> {
                                 latch.await();
-                                return null;
+                                return getDefaultResultSet();
                             }));
         }
         // The queue is full and should reject
@@ -515,7 +508,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                                         OperationType.UNKNOWN,
                                         () -> {
                                             latch.await();
-                                            return null;
+                                            return getDefaultResultSet();
                                         }))
                 .satisfies(anyCauseMatches(RejectedExecutionException.class));
         latch.countDown();
@@ -534,7 +527,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 OperationType.UNKNOWN,
                 () -> {
                     success.countDown();
-                    return null;
+                    return getDefaultResultSet();
                 });
         CommonTestUtils.waitUtil(
                 () -> success.getCount() == 0, Duration.ofSeconds(10), "Should come to end.");
@@ -587,6 +580,18 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         }
     }
 
+    @Test
+    public void testGetOperationSchemaWhenOperationGetError() throws Exception {
+        String msg = "Artificial Exception.";
+        runGetOperationSchemaUntilOperationIsReadyOrError(
+                () -> {
+                    throw new SqlGatewayException(msg);
+                },
+                task ->
+                        assertThatThrownBy(task::get)
+                                .satisfies(anyCauseMatches(SqlGatewayException.class, msg)));
+    }
+
     // --------------------------------------------------------------------------------------------
 
     private OperationHandle submitDefaultOperation(
@@ -617,6 +622,34 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 data);
     }
 
+    private void runGetOperationSchemaUntilOperationIsReadyOrError(
+            Callable<ResultSet> executor,
+            ThrowingConsumer<FutureTask<ResolvedSchema>, Exception> validator)
+            throws Exception {
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+        CountDownLatch operationIsRunning = new CountDownLatch(1);
+        CountDownLatch schemaFetcherIsRunning = new CountDownLatch(1);
+        OperationHandle operationHandle =
+                service.submitOperation(
+                        sessionHandle,
+                        OperationType.UNKNOWN,
+                        () -> {
+                            operationIsRunning.await();
+                            return executor.call();
+                        });
+        FutureTask<ResolvedSchema> task =
+                new FutureTask<>(
+                        () -> {
+                            schemaFetcherIsRunning.countDown();
+                            return service.getOperationResultSchema(sessionHandle, operationHandle);
+                        });
+        threadFactory.newThread(task).start();
+
+        schemaFetcherIsRunning.await();
+        operationIsRunning.countDown();
+        validator.accept(task);
+    }
+
     private void runCancelOrCloseOperationWhenFetchResults(
             SessionHandle sessionHandle,
             OperationHandle operationHandle,
@@ -659,6 +692,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                                 assertThatChainOfCauses(t)
                                         .anySatisfy(t1 -> condition.matches(t1.getMessage())));
 
-        assertTrue(new HashSet<>(getDefaultResultSet().getData()).containsAll(actual));
+        assertThat(getDefaultResultSet().getData()).containsAll(actual);
     }
 }