You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "libenchao (via GitHub)" <gi...@apache.org> on 2023/04/23 08:13:20 UTC

[GitHub] [flink] libenchao commented on a diff in pull request #22362: [FLINK-31544][jdbc-driver] Introduce database metadata for jdbc driver

libenchao commented on code in PR #22362:
URL: https://github.com/apache/flink/pull/22362#discussion_r1174528770


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        connection.setCatalog(catalog);

Review Comment:
   This will change the default catalog for `Connection` which means this method has a side-effect, we should avoid that, or reset to the original one?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {

Review Comment:
   Is `protected` enough for testing purpose?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.jdbc.FlinkDatabaseMetaData;
+import org.apache.flink.table.jdbc.FlinkResultSet;
+
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/** Utils to create catalog/schema results for {@link FlinkDatabaseMetaData}. */
+public class DatabaseMetaDataUtils {
+    private static final Column TABLE_CAT_COLUMN =
+            Column.physical("TABLE_CAT", DataTypes.STRING().notNull());
+    private static final Column TABLE_SCHEM_COLUMN =
+            Column.physical("TABLE_SCHEM", DataTypes.STRING().notNull());
+    private static final Column TABLE_CATALOG_COLUMN =
+            Column.physical("TABLE_CATALOG", DataTypes.STRING());
+
+    /**
+     * Create result set for catalogs. The schema columns are:
+     *
+     * <ul>
+     *   <li>TABLE_CAT String => catalog name.
+     * </ul>
+     *
+     * <p>The results are ordered by catalog name.
+     *
+     * @param statement The statement for database meta data
+     * @param result The result for catalogs
+     * @return a ResultSet object in which each row has a single String column that is a catalog
+     *     name
+     */
+    public static FlinkResultSet createCatalogsResultSet(
+            Statement statement, StatementResult result) {
+        List<RowData> catalogs = new ArrayList<>();
+        result.forEachRemaining(catalogs::add);
+        catalogs.sort(Comparator.comparing(v -> v.getString(0)));
+
+        return new FlinkResultSet(
+                statement,
+                new CollectionResultIterator(catalogs.iterator()),
+                ResolvedSchema.of(TABLE_CAT_COLUMN));
+    }
+
+    /**
+     * Create result set for schemas. The schema columns are:
+     *
+     * <ul>
+     *   <li>TABLE_SCHEM String => schema name
+     *   <li>TABLE_CATALOG String => catalog name (may be null)
+     * </ul>
+     *
+     * <p>The results are ordered by TABLE_CATALOG and TABLE_SCHEM.
+     *
+     * @param statement The statement for database meta data
+     * @param catalogs The catalog list
+     * @param catalogSchemas The catalog with schema list
+     * @return a ResultSet object in which each row is a schema description
+     */
+    public static FlinkResultSet createSchemasResultSet(
+            Statement statement, List<String> catalogs, Map<String, List<String>> catalogSchemas) {
+        List<RowData> schemaWithCatalogList = new ArrayList<>();
+        catalogs.sort(String::compareTo);

Review Comment:
   Since this will bring side-effect to the input parameters, I would suggest to sort on a copy instead of the input.



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    public FlinkDatabaseMetaData(String url, FlinkConnection connection, Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        connection.setCatalog(catalog);
+
+        List<String> schemas = new ArrayList<>();
+        try (StatementResult schemaResult = schemas()) {
+            while (schemaResult.hasNext()) {
+                String schema = schemaResult.next().getString(0).toString();
+                if (schemaPattern == null || schema.contains(schemaPattern)) {

Review Comment:
   I'm not sure that this is the right way to implement `schemaPattern`, should it be something like 'sql like', e.g., "show schemas like '%your_schema%'"?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CloseableResultIterator.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.data.RowData;
+
+import java.util.Iterator;
+
+/** Closeable result iterator for jdbc driver. */
+public interface CloseableResultIterator extends Iterator<RowData>, AutoCloseable {}

Review Comment:
   Do we need to make this more generic, e.g., `Iterator<T>`, and let the subclasses specify the parameter? (I'm also ok with current approach if you think it's better)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org