You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "libenchao (via GitHub)" <gi...@apache.org> on 2023/03/27 14:27:34 UTC

[GitHub] [flink-connector-jdbc] libenchao commented on a diff in pull request #29: [FLINK-31551] Add support for CrateDB

libenchao commented on code in PR #29:
URL: https://github.com/apache/flink-connector-jdbc/pull/29#discussion_r1148393402


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/CrateDBTablePath.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Table path of CrateDB in Flink. Can be of formats "table_name" or "schema_name.table_name". When
+ * it's "table_name", the schema name defaults to "doc".
+ */
+public class CrateDBTablePath {
+
+    private static final String DEFAULT_CRATE_SCHEMA_NAME = "doc";
+
+    private final String crateDBSchemaName;
+    private final String crateDBTableName;
+
+    public CrateDBTablePath(String crateDBSchemaName, String crateDBTableName) {
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(crateDBSchemaName));
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(crateDBTableName));

Review Comment:
   Add exception message for it.



##########
docs/content/docs/connectors/table/jdbc.md:
##########
@@ -45,14 +45,14 @@ See how to link with it for cluster execution [here]({{< ref "docs/dev/configura
 
 A driver dependency is also required to connect to a specified database. Here are drivers currently supported:
 
-| Driver     |      Group Id      |      Artifact Id       |      JAR         |
-|:-----------| :------------------| :----------------------| :----------------|
-| MySQL      |       `mysql`      | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
-| Oracle     | `com.oracle.database.jdbc` |        `ojdbc8`        | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
-| PostgreSQL |  `org.postgresql`  |      `postgresql`      | [Download](https://jdbc.postgresql.org/download.html) |
-| Derby      | `org.apache.derby` |        `derby`         | [Download](http://db.apache.org/derby/derby_downloads.html) |
-| SQL Server | `com.microsoft.sqlserver` |        `mssql-jdbc`         | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
-
+| Driver     | Group Id                   | Artifact Id            |      JAR         |
+|:-----------|:---------------------------|:-----------------------| :----------------|
+| MySQL      | `mysql`                    | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| Oracle     | `com.oracle.database.jdbc` | `ojdbc8`               | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
+| PostgreSQL | `org.postgresql`           | `postgresql`           | [Download](https://jdbc.postgresql.org/download.html) |
+| Derby      | `org.apache.derby`         | `derby`                | [Download](http://db.apache.org/derby/derby_downloads.html) |
+| SQL Server | `com.microsoft.sqlserver`  | `mssql-jdbc`           | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
+| CrateDB    | `io.crate`                 | `crate-jdbc`           | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |

Review Comment:
   Could you also help to update the corresponding section in 'zh' doc?



##########
docs/content/docs/connectors/table/jdbc.md:
##########
@@ -45,14 +45,14 @@ See how to link with it for cluster execution [here]({{< ref "docs/dev/configura
 
 A driver dependency is also required to connect to a specified database. Here are drivers currently supported:
 
-| Driver     |      Group Id      |      Artifact Id       |      JAR         |
-|:-----------| :------------------| :----------------------| :----------------|
-| MySQL      |       `mysql`      | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
-| Oracle     | `com.oracle.database.jdbc` |        `ojdbc8`        | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
-| PostgreSQL |  `org.postgresql`  |      `postgresql`      | [Download](https://jdbc.postgresql.org/download.html) |
-| Derby      | `org.apache.derby` |        `derby`         | [Download](http://db.apache.org/derby/derby_downloads.html) |
-| SQL Server | `com.microsoft.sqlserver` |        `mssql-jdbc`         | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
-
+| Driver     | Group Id                   | Artifact Id            |      JAR         |
+|:-----------|:---------------------------|:-----------------------| :----------------|
+| MySQL      | `mysql`                    | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| Oracle     | `com.oracle.database.jdbc` | `ojdbc8`               | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
+| PostgreSQL | `org.postgresql`           | `postgresql`           | [Download](https://jdbc.postgresql.org/download.html) |
+| Derby      | `org.apache.derby`         | `derby`                | [Download](http://db.apache.org/derby/derby_downloads.html) |
+| SQL Server | `com.microsoft.sqlserver`  | `mssql-jdbc`           | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
+| CrateDB    | `io.crate`                 | `crate-jdbc`           | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |

Review Comment:
   Also add a section in the "JDBC Catalog".



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/CrateDBCatalog.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
+import org.apache.flink.connector.jdbc.dialect.cratedb.CrateDBTypeMapper;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Catalog for CrateDB. */
+@Internal
+public class CrateDBCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CrateDBCatalog.class);
+
+    public static final String DEFAULT_DATABASE = "crate";
+
+    private static final Set<String> builtinSchemas =
+            new HashSet<String>() {
+                {
+                    add("pg_catalog");
+                    add("information_schema");
+                    add("sys");
+                }
+            };
+
+    private final JdbcDialectTypeMapper dialectTypeMapper;
+
+    protected CrateDBCatalog(
+            ClassLoader userClassLoader,
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.dialectTypeMapper = new CrateDBTypeMapper();
+    }
+
+    // ------ databases ------
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return ImmutableList.of(DEFAULT_DATABASE);
+    }
+
+    // ------ tables ------
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        List<String> tables = Lists.newArrayList();
+
+        // get all schemas
+        List<String> schemas =
+                extractColumnValuesBySQL(
+                        baseUrl + databaseName,
+                        "SELECT schema_name FROM information_schema.schemata",
+                        1,
+                        schema -> !builtinSchemas.contains(schema));
+
+        // get all tables
+        for (String schema : schemas) {
+            // position 1 is database name, position 2 is schema name, position 3 is table name
+            List<String> pureTables =
+                    extractColumnValuesBySQL(
+                            baseUrl + databaseName,
+                            "SELECT table_name FROM information_schema.tables "
+                                    + "WHERE table_schema = ? "
+                                    + "ORDER BY table_type, table_name",
+                            1,
+                            null,
+                            schema);
+            tables.addAll(
+                    pureTables.stream()
+                            .map(pureTable -> schema + "." + pureTable)
+                            .collect(Collectors.toList()));
+        }
+        return tables;
+    }
+
+    /**
+     * Converts Postgres type to Flink {@link DataType}.
+     *
+     * @see org.postgresql.jdbc.TypeInfoCache

Review Comment:
   The javadoc should talk about CrateDB?



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/CrateDBTablePathTest.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CrateDBTablePath}. */
+class CrateDBTablePathTest {
+    @Test
+    void testFromFlinkTableName() {

Review Comment:
   I would suggest to add some exception cases, what do you think?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/CrateDBTablePath.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Table path of CrateDB in Flink. Can be of formats "table_name" or "schema_name.table_name". When
+ * it's "table_name", the schema name defaults to "doc".
+ */
+public class CrateDBTablePath {
+
+    private static final String DEFAULT_CRATE_SCHEMA_NAME = "doc";
+
+    private final String crateDBSchemaName;
+    private final String crateDBTableName;
+
+    public CrateDBTablePath(String crateDBSchemaName, String crateDBTableName) {
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(crateDBSchemaName));
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(crateDBTableName));
+
+        this.crateDBSchemaName = crateDBSchemaName;
+        this.crateDBTableName = crateDBTableName;
+    }
+
+    public static CrateDBTablePath fromFlinkTableName(String flinkTableName) {
+        if (flinkTableName.contains(".")) {
+            String[] path = flinkTableName.split("\\.");
+
+            checkArgument(
+                    path != null && path.length == 2,

Review Comment:
   `path != null` seems unnecessary?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/cratedb/CrateDBDialect.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.cratedb;
+
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.internal.converter.CrateDBRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for CrateDB. */
+public class CrateDBDialect extends AbstractDialect {
+
+    private static final long serialVersionUID = 1L;
+
+    // Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs:
+    // https://www.postgresql.org/docs/12/datatype-datetime.html
+    private static final int MAX_TIMESTAMP_PRECISION = 6;
+    private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+    // Define MAX/MIN precision of DECIMAL type according to PostgreSQL docs:
+    // https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-NUMERIC-DECIMAL
+    private static final int MAX_DECIMAL_PRECISION = 1000;
+    private static final int MIN_DECIMAL_PRECISION = 1;
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new CrateDBRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        return "LIMIT " + limit;
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("io.crate.client.jdbc.CrateDriver");
+    }
+
+    /** Postgres upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Postgres. */
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        String uniqueColumns =
+                Arrays.stream(uniqueKeyFields)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String updateClause =
+                Arrays.stream(fieldNames)
+                        .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+        return Optional.of(
+                getInsertIntoStatement(tableName, fieldNames)
+                        + " ON CONFLICT ("
+                        + uniqueColumns
+                        + ")"
+                        + " DO UPDATE SET "
+                        + updateClause);
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return identifier;
+    }
+
+    @Override
+    public String dialectName() {
+        return "CrateDB";
+    }
+
+    @Override
+    public Optional<Range> decimalPrecisionRange() {
+        return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
+    }
+
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+    }
+
+    @Override
+    public Set<LogicalTypeRoot> supportedTypes() {
+        // The data types used in PostgreSQL are list at:
+        // https://www.postgresql.org/docs/12/datatype.html
+
+        // TODO: We can't convert BINARY data type to
+        //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
+        // LegacyTypeInfoDataTypeConverter.

Review Comment:
   These comments also should talk about CrateDB, instead of pg.



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/CrateDBCatalog.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
+import org.apache.flink.connector.jdbc.dialect.cratedb.CrateDBTypeMapper;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Catalog for CrateDB. */
+@Internal
+public class CrateDBCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CrateDBCatalog.class);
+
+    public static final String DEFAULT_DATABASE = "crate";
+
+    private static final Set<String> builtinSchemas =
+            new HashSet<String>() {
+                {
+                    add("pg_catalog");

Review Comment:
   I'm not familiar with CrateDB, is `pg_catalog` an internal schema? (it sounds like something in Postgres)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org