You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "FangYongs (via GitHub)" <gi...@apache.org> on 2023/04/06 05:13:35 UTC

[GitHub] [flink] FangYongs opened a new pull request, #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

FangYongs opened a new pull request, #22362:
URL: https://github.com/apache/flink/pull/22362

   ## What is the purpose of the change
   
   This PR aims to introduce database meta for jdbc driver
   
   ## Brief change log
   
     - Added `FlinkDatabaseMeta`
     - Added `CloseableResultIterator` for statement result and `getCatalogs`/`getSchemas` in database meta
     - Get catalogs and schemas in `FlinkDatabaseMeta`
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
     - Added `FlinkSqlDriverTestBase` to build flink cluster and sql gateway
     - Added `FlinkDatabaseMetaDataTest` for `FlinkDatabaseMetaData`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no) no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no
     - The serializers: (yes / no / don't know) no
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no
     - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no) no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on a diff in pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on code in PR #22362:
URL: https://github.com/apache/flink/pull/22362#discussion_r1175955153


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        connection.setCatalog(catalog);
+
+        List<String> schemas = new ArrayList<>();
+        try (StatementResult schemaResult = schemas()) {
+            while (schemaResult.hasNext()) {
+                String schema = schemaResult.next().getString(0).toString();
+                if (schemaPattern == null || schema.contains(schemaPattern)) {

Review Comment:
   Get it, I check the implementation in trino jdbc driver, the `schemaPattern` will be used in like. I will update it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao commented on PR #22362:
URL: https://github.com/apache/flink/pull/22362#issuecomment-1523277660

   @FangYongs The CI fails now, could you fix that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on a diff in pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on code in PR #22362:
URL: https://github.com/apache/flink/pull/22362#discussion_r1175177267


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        connection.setCatalog(catalog);
+
+        List<String> schemas = new ArrayList<>();
+        try (StatementResult schemaResult = schemas()) {
+            while (schemaResult.hasNext()) {
+                String schema = schemaResult.next().getString(0).toString();
+                if (schemaPattern == null || schema.contains(schemaPattern)) {

Review Comment:
   Currently Flink does not support "show databases like '%keyword%'" or "show schemas like '%keyword%'" statements



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on a diff in pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on code in PR #22362:
URL: https://github.com/apache/flink/pull/22362#discussion_r1175176169


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        connection.setCatalog(catalog);

Review Comment:
   DONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao closed pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao closed pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver
URL: https://github.com/apache/flink/pull/22362


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao commented on code in PR #22362:
URL: https://github.com/apache/flink/pull/22362#discussion_r1174528770


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        connection.setCatalog(catalog);

Review Comment:
   This will change the default catalog for `Connection` which means this method has a side-effect, we should avoid that, or reset to the original one?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {

Review Comment:
   Is `protected` enough for testing purpose?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.jdbc.FlinkDatabaseMetaData;
+import org.apache.flink.table.jdbc.FlinkResultSet;
+
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/** Utils to create catalog/schema results for {@link FlinkDatabaseMetaData}. */
+public class DatabaseMetaDataUtils {
+    private static final Column TABLE_CAT_COLUMN =
+            Column.physical("TABLE_CAT", DataTypes.STRING().notNull());
+    private static final Column TABLE_SCHEM_COLUMN =
+            Column.physical("TABLE_SCHEM", DataTypes.STRING().notNull());
+    private static final Column TABLE_CATALOG_COLUMN =
+            Column.physical("TABLE_CATALOG", DataTypes.STRING());
+
+    /**
+     * Create result set for catalogs. The schema columns are:
+     *
+     * <ul>
+     *   <li>TABLE_CAT String => catalog name.
+     * </ul>
+     *
+     * <p>The results are ordered by catalog name.
+     *
+     * @param statement The statement for database meta data
+     * @param result The result for catalogs
+     * @return a ResultSet object in which each row has a single String column that is a catalog
+     *     name
+     */
+    public static FlinkResultSet createCatalogsResultSet(
+            Statement statement, StatementResult result) {
+        List<RowData> catalogs = new ArrayList<>();
+        result.forEachRemaining(catalogs::add);
+        catalogs.sort(Comparator.comparing(v -> v.getString(0)));
+
+        return new FlinkResultSet(
+                statement,
+                new CollectionResultIterator(catalogs.iterator()),
+                ResolvedSchema.of(TABLE_CAT_COLUMN));
+    }
+
+    /**
+     * Create result set for schemas. The schema columns are:
+     *
+     * <ul>
+     *   <li>TABLE_SCHEM String => schema name
+     *   <li>TABLE_CATALOG String => catalog name (may be null)
+     * </ul>
+     *
+     * <p>The results are ordered by TABLE_CATALOG and TABLE_SCHEM.
+     *
+     * @param statement The statement for database meta data
+     * @param catalogs The catalog list
+     * @param catalogSchemas The catalog with schema list
+     * @return a ResultSet object in which each row is a schema description
+     */
+    public static FlinkResultSet createSchemasResultSet(
+            Statement statement, List<String> catalogs, Map<String, List<String>> catalogSchemas) {
+        List<RowData> schemaWithCatalogList = new ArrayList<>();
+        catalogs.sort(String::compareTo);

Review Comment:
   Since this will bring side-effect to the input parameters, I would suggest to sort on a copy instead of the input.



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        connection.setCatalog(catalog);
+
+        List<String> schemas = new ArrayList<>();
+        try (StatementResult schemaResult = schemas()) {
+            while (schemaResult.hasNext()) {
+                String schema = schemaResult.next().getString(0).toString();
+                if (schemaPattern == null || schema.contains(schemaPattern)) {

Review Comment:
   I'm not sure that this is the right way to implement `schemaPattern`, should it be something like 'sql like', e.g., "show schemas like '%your_schema%'"?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CloseableResultIterator.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.data.RowData;
+
+import java.util.Iterator;
+
+/** Closeable result iterator for jdbc driver. */
+public interface CloseableResultIterator extends Iterator<RowData>, AutoCloseable {}

Review Comment:
   Do we need to make this more generic, e.g., `Iterator<T>`, and let the subclasses specify the parameter? (I'm also ok with current approach if you think it's better)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on PR #22362:
URL: https://github.com/apache/flink/pull/22362#issuecomment-1520011069

   Thanks @libenchao I have rebased master and updated this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao commented on code in PR #22362:
URL: https://github.com/apache/flink/pull/22362#discussion_r1175257104


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        connection.setCatalog(catalog);
+
+        List<String> schemas = new ArrayList<>();
+        try (StatementResult schemaResult = schemas()) {
+            while (schemaResult.hasNext()) {
+                String schema = schemaResult.next().getString(0).toString();
+                if (schemaPattern == null || schema.contains(schemaPattern)) {

Review Comment:
   I'm not saying we must use that to implement this. What I meant is the semantic of your implementation is different from 'like xxx', e.g. we have schemas: "ab", "abc", "abcd", and schemaPattern is "a", your implementation will return all three schemas, and "show databases like 'a'" will return empty, if you want to match all the three schemas, you need to use "show databases like 'a%'"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on a diff in pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on code in PR #22362:
URL: https://github.com/apache/flink/pull/22362#discussion_r1176111155


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        connection.setCatalog(catalog);
+
+        List<String> schemas = new ArrayList<>();
+        try (StatementResult schemaResult = schemas()) {
+            while (schemaResult.hasNext()) {
+                String schema = schemaResult.next().getString(0).toString();
+                if (schemaPattern == null || schema.contains(schemaPattern)) {

Review Comment:
   Thanks @libenchao , agree and done 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on PR #22362:
URL: https://github.com/apache/flink/pull/22362#issuecomment-1498529894

   Hi @libenchao Please help to review this PR when you're free, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22362:
URL: https://github.com/apache/flink/pull/22362#issuecomment-1498505826

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e37ef659f2937d871f9e26af1235edd142d736e8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e37ef659f2937d871f9e26af1235edd142d736e8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e37ef659f2937d871f9e26af1235edd142d736e8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao commented on code in PR #22362:
URL: https://github.com/apache/flink/pull/22362#discussion_r1176084042


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        connection.setCatalog(catalog);
+
+        List<String> schemas = new ArrayList<>();
+        try (StatementResult schemaResult = schemas()) {
+            while (schemaResult.hasNext()) {
+                String schema = schemaResult.next().getString(0).toString();
+                if (schemaPattern == null || schema.contains(schemaPattern)) {

Review Comment:
   [FLIP-297](https://cwiki.apache.org/confluence/display/FLINK/FLIP-297%3A+Improve+Auxiliary+Sql+Statements) is going to support `show catalogs like` and `show databases like`, if this is not urgent, we may defer it to use FLIP-297, and just leave it unimplemented for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on a diff in pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on code in PR #22362:
URL: https://github.com/apache/flink/pull/22362#discussion_r1175177578


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.jdbc.FlinkDatabaseMetaData;
+import org.apache.flink.table.jdbc.FlinkResultSet;
+
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/** Utils to create catalog/schema results for {@link FlinkDatabaseMetaData}. */
+public class DatabaseMetaDataUtils {
+    private static final Column TABLE_CAT_COLUMN =
+            Column.physical("TABLE_CAT", DataTypes.STRING().notNull());
+    private static final Column TABLE_SCHEM_COLUMN =
+            Column.physical("TABLE_SCHEM", DataTypes.STRING().notNull());
+    private static final Column TABLE_CATALOG_COLUMN =
+            Column.physical("TABLE_CATALOG", DataTypes.STRING());
+
+    /**
+     * Create result set for catalogs. The schema columns are:
+     *
+     * <ul>
+     *   <li>TABLE_CAT String => catalog name.
+     * </ul>
+     *
+     * <p>The results are ordered by catalog name.
+     *
+     * @param statement The statement for database meta data
+     * @param result The result for catalogs
+     * @return a ResultSet object in which each row has a single String column that is a catalog
+     *     name
+     */
+    public static FlinkResultSet createCatalogsResultSet(
+            Statement statement, StatementResult result) {
+        List<RowData> catalogs = new ArrayList<>();
+        result.forEachRemaining(catalogs::add);
+        catalogs.sort(Comparator.comparing(v -> v.getString(0)));
+
+        return new FlinkResultSet(
+                statement,
+                new CollectionResultIterator(catalogs.iterator()),
+                ResolvedSchema.of(TABLE_CAT_COLUMN));
+    }
+
+    /**
+     * Create result set for schemas. The schema columns are:
+     *
+     * <ul>
+     *   <li>TABLE_SCHEM String => schema name
+     *   <li>TABLE_CATALOG String => catalog name (may be null)
+     * </ul>
+     *
+     * <p>The results are ordered by TABLE_CATALOG and TABLE_SCHEM.
+     *
+     * @param statement The statement for database meta data
+     * @param catalogs The catalog list
+     * @param catalogSchemas The catalog with schema list
+     * @return a ResultSet object in which each row is a schema description
+     */
+    public static FlinkResultSet createSchemasResultSet(
+            Statement statement, List<String> catalogs, Map<String, List<String>> catalogSchemas) {
+        List<RowData> schemaWithCatalogList = new ArrayList<>();
+        catalogs.sort(String::compareTo);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on a diff in pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on code in PR #22362:
URL: https://github.com/apache/flink/pull/22362#discussion_r1175175847


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {

Review Comment:
   DONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org