You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2020/03/20 23:44:43 UTC
[flink] 01/02: [FLINK-16471][jdbc] develop JDBCCatalog and
PostgresCatalog
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 74b8bdee9fb0bf7cfc27ca8d992dac2a07473a0c
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Mar 6 14:05:27 2020 -0800
[FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog
closes #11336
---
.../src/test/resources/log4j2-test.properties | 28 --
flink-connectors/flink-jdbc/pom.xml | 36 ++-
.../java/io/jdbc/catalog/AbstractJDBCCatalog.java | 277 ++++++++++++++++++
.../api/java/io/jdbc/catalog/JDBCCatalog.java | 84 ++++++
.../api/java/io/jdbc/catalog/JDBCCatalogUtils.java | 54 ++++
.../api/java/io/jdbc/catalog/PostgresCatalog.java | 323 ++++++++++++++++++++
.../java/io/jdbc/catalog/PostgresTablePath.java | 95 ++++++
.../api/java/io/jdbc/dialect/JDBCDialects.java | 10 +-
.../java/io/jdbc/catalog/JDBCCatalogUtilsTest.java | 44 +++
.../io/jdbc/catalog/PostgresCatalogITCase.java | 325 +++++++++++++++++++++
.../io/jdbc/catalog/PostgresTablePathTest.java | 33 +++
11 files changed, 1275 insertions(+), 34 deletions(-)
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties
deleted file mode 100644
index 835c2ec..0000000
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-rootLogger.level = OFF
-rootLogger.appenderRef.test.ref = TestLogger
-
-appender.testlogger.name = TestLogger
-appender.testlogger.type = CONSOLE
-appender.testlogger.target = SYSTEM_ERR
-appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index 3e83311..cb7afab 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -35,6 +35,11 @@ under the License.
<packaging>jar</packaging>
+ <properties>
+ <postgres.version>42.2.10</postgres.version>
+ <otj-pg-embedded.version>0.13.3</otj-pg-embedded.version>
+ </properties>
+
<dependencies>
<!-- Table ecosystem -->
<!-- Projects depending on this project won't depend on flink-table-*. -->
@@ -53,13 +58,17 @@ under the License.
<scope>provided</scope>
</dependency>
+ <!-- Postgres dependencies -->
+
<dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <version>10.14.2.0</version>
- <scope>test</scope>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>${postgres.version}</version>
+ <scope>provided</scope>
</dependency>
+ <!-- test dependencies -->
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
@@ -89,5 +98,24 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <!-- Postgres test dependencies -->
+
+ <dependency>
+ <groupId>com.opentable.components</groupId>
+ <artifactId>otj-pg-embedded</artifactId>
+ <version>${otj-pg-embedded.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Derby test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.14.2.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
new file mode 100644
index 0000000..523de83
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Abstract catalog for any JDBC catalogs.
+ */
+public abstract class AbstractJDBCCatalog extends AbstractCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCCatalog.class);
+
+ protected final String username;
+ protected final String pwd;
+ protected final String baseUrl;
+ protected final String defaultUrl;
+
+ public AbstractJDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+ super(catalogName, defaultDatabase);
+
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
+
+ JDBCCatalogUtils.validateJDBCUrl(baseUrl);
+
+ this.username = username;
+ this.pwd = pwd;
+ this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+ this.defaultUrl = baseUrl + defaultDatabase;
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ // test connection, fail early if we cannot connect to database
+ try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+ } catch (SQLException e) {
+ throw new ValidationException(
+ String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
+ }
+
+ LOG.info("Catalog {} established connection to {}", getName(), defaultUrl);
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ LOG.info("Catalog {} closing", getName());
+ }
+
+ // ------ table factory ------
+
+ public Optional<TableFactory> getTableFactory() {
+ return Optional.of(new JDBCTableSourceSinkFactory());
+ }
+
+ // ------ databases ------
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+ return listDatabases().contains(databaseName);
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------ tables and views ------
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
+ return Collections.emptyList();
+ }
+
+ // ------ partitions ------
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------ functions ------
+
+ @Override
+ public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------ stats ------
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
new file mode 100644
index 0000000..629412c
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Catalogs for relational databases via JDBC.
+ */
+@PublicEvolving
+public class JDBCCatalog extends AbstractJDBCCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCCatalog.class);
+
+ private final Catalog internal;
+
+ public JDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+ super(catalogName, defaultDatabase, username, pwd, baseUrl);
+
+ internal = JDBCCatalogUtils.createCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
+ }
+
+ // ------ databases -----
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return internal.listDatabases();
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+ return internal.getDatabase(databaseName);
+ }
+
+ // ------ tables and views ------
+
+ @Override
+ public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+ return internal.listTables(databaseName);
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ return internal.getTable(tablePath);
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ try {
+ return databaseExists(tablePath.getDatabaseName()) &&
+ listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java
new file mode 100644
index 0000000..b9e3a19
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utils for {@link JDBCCatalog}.
+ */
+public class JDBCCatalogUtils {
+ /**
+ * URL has to be without database, like "jdbc:postgresql://localhost:5432/" or "jdbc:postgresql://localhost:5432"
+ * rather than "jdbc:postgresql://localhost:5432/db".
+ */
+ public static void validateJDBCUrl(String url) {
+ String[] parts = url.trim().split("\\/+");
+
+ checkArgument(parts.length == 2);
+ }
+
+ /**
+ * Create catalog instance from given information.
+ */
+ public static AbstractJDBCCatalog createCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+ JDBCDialect dialect = JDBCDialects.get(baseUrl).get();
+
+ if (dialect instanceof JDBCDialects.PostgresDialect) {
+ return new PostgresCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Catalog for '%s' is not supported yet.", dialect)
+ );
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
new file mode 100644
index 0000000..d12f254
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Catalog for PostgreSQL.
+ */
+@Internal
+public class PostgresCatalog extends AbstractJDBCCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PostgresCatalog.class);
+
+ public static final String POSTGRES_TABLE_TYPE = "postgres";
+
+ public static final String DEFAULT_DATABASE = "postgres";
+
+ // ------ Postgres default objects that shouldn't be exposed to users ------
+
+ private static final Set<String> builtinDatabases = new HashSet<String>() {{
+ add("template0");
+ add("template1");
+ }};
+
+ private static final Set<String> builtinSchemas = new HashSet<String>() {{
+ add("pg_toast");
+ add("pg_temp_1");
+ add("pg_toast_temp_1");
+ add("pg_catalog");
+ add("information_schema");
+ }};
+
+ protected PostgresCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+ super(catalogName, defaultDatabase, username, pwd, baseUrl);
+ }
+
+ // ------ databases ------
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ List<String> pgDatabases = new ArrayList<>();
+
+ try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+
+ PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");
+
+ ResultSet rs = ps.executeQuery();
+
+ while (rs.next()) {
+ String dbName = rs.getString(1);
+ if (!builtinDatabases.contains(dbName)) {
+ pgDatabases.add(rs.getString(1));
+ }
+ }
+
+ return pgDatabases;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s", getName()), e);
+ }
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+ if (listDatabases().contains(databaseName)) {
+ return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+ } else {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ }
+
+ // ------ tables ------
+
+ @Override
+ public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ // get all schemas
+ try (Connection conn = DriverManager.getConnection(baseUrl + databaseName, username, pwd)) {
+ PreparedStatement ps = conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;");
+
+ ResultSet rs = ps.executeQuery();
+
+ List<String> schemas = new ArrayList<>();
+
+ while (rs.next()) {
+ String pgSchema = rs.getString(1);
+ if (!builtinSchemas.contains(pgSchema)) {
+ schemas.add(pgSchema);
+ }
+ }
+
+ List<String> tables = new ArrayList<>();
+
+ for (String schema : schemas) {
+ PreparedStatement stmt = conn.prepareStatement(
+ "SELECT * \n" +
+ "FROM information_schema.tables \n" +
+ "WHERE table_type = 'BASE TABLE' \n" +
+ " AND table_schema = ? \n" +
+ "ORDER BY table_type, table_name;");
+
+ stmt.setString(1, schema);
+
+ ResultSet rstables = stmt.executeQuery();
+
+ while (rstables.next()) {
+ // position 1 is database name, position 2 is schema name, position 3 is table name
+ tables.add(schema + "." + rstables.getString(3));
+ }
+ }
+
+ return tables;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s", getName()), e);
+ }
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+
+ PostgresTablePath pgPath = PostgresTablePath.fromFlinkTableName(tablePath.getObjectName());
+
+ try (Connection conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), username, pwd)) {
+
+ PreparedStatement ps = conn.prepareStatement(
+ String.format("SELECT * FROM %s;", pgPath.getFullPath()));
+
+ ResultSetMetaData rsmd = ps.getMetaData();
+
+ String[] names = new String[rsmd.getColumnCount()];
+ DataType[] types = new DataType[rsmd.getColumnCount()];
+
+ for (int i = 1; i <= rsmd.getColumnCount(); i++) {
+ names[i - 1] = rsmd.getColumnName(i);
+ types[i - 1] = fromJDBCType(rsmd, i);
+ }
+
+ TableSchema tableSchema = new TableSchema.Builder().fields(names, types).build();
+
+ return new CatalogTableImpl(
+ tableSchema,
+ new HashMap<>(),
+ ""
+ );
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed getting table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ public static final String PG_BYTEA = "bytea";
+ public static final String PG_BYTEA_ARRAY = "_bytea";
+ public static final String PG_SMALLINT = "int2";
+ public static final String PG_SMALLINT_ARRAY = "_int2";
+ public static final String PG_INTEGER = "int4";
+ public static final String PG_INTEGER_ARRAY = "_int4";
+ public static final String PG_BIGINT = "int8";
+ public static final String PG_BIGINT_ARRAY = "_int8";
+ public static final String PG_REAL = "float4";
+ public static final String PG_REAL_ARRAY = "_float4";
+ public static final String PG_DOUBLE_PRECISION = "float8";
+ public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
+ public static final String PG_NUMERIC = "numeric";
+ public static final String PG_NUMERIC_ARRAY = "_numeric";
+ public static final String PG_BOOLEAN = "bool";
+ public static final String PG_BOOLEAN_ARRAY = "_bool";
+ public static final String PG_TIMESTAMP = "timestamp";
+ public static final String PG_TIMESTAMP_ARRAY = "_timestamp";
+ public static final String PG_TIMESTAMPTZ = "timestamptz";
+ public static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
+ public static final String PG_DATE = "date";
+ public static final String PG_DATE_ARRAY = "_date";
+ public static final String PG_TIME = "time";
+ public static final String PG_TIME_ARRAY = "_time";
+ public static final String PG_TEXT = "text";
+ public static final String PG_TEXT_ARRAY = "_text";
+ public static final String PG_CHAR = "bpchar";
+ public static final String PG_CHAR_ARRAY = "_bpchar";
+ public static final String PG_CHARACTER = "character";
+ public static final String PG_CHARACTER_ARRAY = "_character";
+ public static final String PG_CHARACTER_VARYING = "varchar";
+ public static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+
+ private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
+ String pgType = metadata.getColumnTypeName(colIndex);
+
+ int precision = metadata.getPrecision(colIndex);
+
+ switch (pgType) {
+ case PG_BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case PG_BOOLEAN_ARRAY:
+ return DataTypes.ARRAY(DataTypes.BOOLEAN());
+ case PG_BYTEA:
+ return DataTypes.BYTES();
+ case PG_BYTEA_ARRAY:
+ return DataTypes.ARRAY(DataTypes.BYTES());
+ case PG_SMALLINT:
+ return DataTypes.SMALLINT();
+ case PG_SMALLINT_ARRAY:
+ return DataTypes.ARRAY(DataTypes.SMALLINT());
+ case PG_INTEGER:
+ return DataTypes.INT();
+ case PG_INTEGER_ARRAY:
+ return DataTypes.ARRAY(DataTypes.INT());
+ case PG_BIGINT:
+ return DataTypes.BIGINT();
+ case PG_BIGINT_ARRAY:
+ return DataTypes.ARRAY(DataTypes.BIGINT());
+ case PG_REAL:
+ return DataTypes.FLOAT();
+ case PG_REAL_ARRAY:
+ return DataTypes.ARRAY(DataTypes.FLOAT());
+ case PG_DOUBLE_PRECISION:
+ return DataTypes.DOUBLE();
+ case PG_DOUBLE_PRECISION_ARRAY:
+ return DataTypes.ARRAY(DataTypes.DOUBLE());
+ case PG_NUMERIC:
+ return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
+ case PG_NUMERIC_ARRAY:
+ return DataTypes.ARRAY(
+ DataTypes.DECIMAL(precision, metadata.getScale(colIndex)));
+ case PG_CHAR:
+ case PG_CHARACTER:
+ return DataTypes.CHAR(precision);
+ case PG_CHAR_ARRAY:
+ case PG_CHARACTER_ARRAY:
+ return DataTypes.ARRAY(DataTypes.CHAR(precision));
+ case PG_CHARACTER_VARYING:
+ return DataTypes.VARCHAR(precision);
+ case PG_CHARACTER_VARYING_ARRAY:
+ return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
+ case PG_TEXT:
+ return DataTypes.STRING();
+ case PG_TEXT_ARRAY:
+ return DataTypes.ARRAY(DataTypes.STRING());
+ case PG_TIMESTAMP:
+ return DataTypes.TIMESTAMP();
+ case PG_TIMESTAMP_ARRAY:
+ return DataTypes.ARRAY(DataTypes.TIMESTAMP());
+ case PG_TIMESTAMPTZ:
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+ case PG_TIMESTAMPTZ_ARRAY:
+ return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+ case PG_TIME:
+ return DataTypes.TIME();
+ case PG_TIME_ARRAY:
+ return DataTypes.ARRAY(DataTypes.TIME());
+ case PG_DATE:
+ return DataTypes.DATE();
+ case PG_DATE_ARRAY:
+ return DataTypes.ARRAY(DataTypes.DATE());
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Doesn't support Postgres type '%s' yet", pgType));
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+
+ List<String> tables = null;
+ try {
+ tables = listTables(tablePath.getDatabaseName());
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+
+ return tables.contains(PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath());
+ }
+
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java
new file mode 100644
index 0000000..99cc2b4
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Table path of PostgreSQL in Flink. Can be of formats "table_name" or "schema_name.table_name".
+ * When it's "table_name", the schema name defaults to "public".
+ */
+public class PostgresTablePath {
+
+ private static final String DEFAULT_POSTGRES_SCHEMA_NAME = "public";
+
+ private final String pgSchemaName;
+ private final String pgTableName;
+
+ public PostgresTablePath(String pgSchemaName, String pgTableName) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgSchemaName));
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgTableName));
+
+ this.pgSchemaName = pgSchemaName;
+ this.pgTableName = pgTableName;
+ }
+
+ public static PostgresTablePath fromFlinkTableName(String flinkTableName) {
+ if (flinkTableName.contains(".")) {
+ String[] path = flinkTableName.split("\\.");
+
+ checkArgument(path != null && path.length == 2,
+ String.format("Table name '%s' is not valid. The parsed length is %d", flinkTableName, path.length));
+
+ return new PostgresTablePath(path[0], path[1]);
+ } else {
+ return new PostgresTablePath(DEFAULT_POSTGRES_SCHEMA_NAME, flinkTableName);
+ }
+ }
+
+ public static String toFlinkTableName(String schema, String table) {
+ return new PostgresTablePath(schema, table).getFullPath();
+ }
+
+ public String getFullPath() {
+ return String.format("%s.%s", pgSchemaName, pgTableName);
+ }
+
+ public String getFullPathWithQuotes() {
+ return String.format("`%s.%s`", pgSchemaName, pgTableName);
+ }
+
+ @Override
+ public String toString() {
+ return getFullPathWithQuotes();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PostgresTablePath that = (PostgresTablePath) o;
+ return Objects.equals(pgSchemaName, that.pgSchemaName) &&
+ Objects.equals(pgTableName, that.pgTableName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pgSchemaName, pgTableName);
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
index fa7192e..743d16a 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
@@ -203,7 +203,10 @@ public final class JDBCDialects {
}
}
- private static class MySQLDialect extends AbstractDialect {
+ /**
+ * MySQL dialect.
+ */
+ public static class MySQLDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
@@ -301,7 +304,10 @@ public final class JDBCDialects {
}
}
- private static class PostgresDialect extends AbstractDialect {
+ /**
+ * Postgres dialect.
+ */
+ public static class PostgresDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java
new file mode 100644
index 0000000..7a4132b
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test for {@link JDBCCatalogUtils}.
+ */
+public class JDBCCatalogUtilsTest {
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void testJDBCUrl() {
+ JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/");
+
+ JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432");
+ }
+
+ @Test
+ public void testInvalidJDBCUrl() {
+ exception.expect(IllegalArgumentException.class);
+ JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/db");
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
new file mode 100644
index 0000000..e103780
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
+import com.opentable.db.postgres.junit.SingleInstancePostgresRule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link PostgresCatalog}.
+ */
+public class PostgresCatalogITCase {
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @ClassRule
+ public static SingleInstancePostgresRule pg = EmbeddedPostgresRules.singleInstance();
+
+ protected static final String TEST_USERNAME = "postgres";
+ protected static final String TEST_PWD = "postgres";
+ protected static final String TEST_DB = "test";
+ protected static final String TEST_SCHEMA = "test_schema";
+ protected static final String TABLE1 = "t1";
+ protected static final String TABLE2 = "t2";
+ protected static final String TABLE3 = "t3";
+
+ protected static String baseUrl;
+ protected static Catalog catalog;
+
+ public static Catalog createCatalog(String name, String defaultDb, String username, String pwd, String jdbcUrl) {
+ return new PostgresCatalog("mypg", PostgresCatalog.DEFAULT_DATABASE, username, pwd, jdbcUrl);
+ }
+
+ @BeforeClass
+ public static void setup() throws SQLException {
+ // jdbc:postgresql://localhost:50807/postgres?user=postgres
+ String embeddedJdbcUrl = pg.getEmbeddedPostgres().getJdbcUrl(TEST_USERNAME, TEST_PWD);
+ // jdbc:postgresql://localhost:50807/
+ baseUrl = embeddedJdbcUrl.substring(0, embeddedJdbcUrl.lastIndexOf("/") + 1);
+
+ catalog = createCatalog("mypg", PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl);
+
+ // create test database and schema
+ createDatabase(TEST_DB);
+ createSchema(TEST_DB, TEST_SCHEMA);
+
+ // create test tables
+ // table: postgres.public.user1
+ createTable(PostgresTablePath.fromFlinkTableName(TABLE1), getSimpleTable().pgSchemaSql);
+
+ // table: testdb.public.user2
+ // table: testdb.testschema.user3
+ // table: testdb.public.datatypes
+ createTable(TEST_DB, PostgresTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql);
+ createTable(TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql);
+ createTable(TEST_DB, PostgresTablePath.fromFlinkTableName("datatypes"), getDataTypesTable().pgSchemaSql);
+ }
+
+ // ------ databases ------
+
+ @Test
+ public void testGetDb_DatabaseNotExistException() throws Exception {
+ exception.expect(DatabaseNotExistException.class);
+ exception.expectMessage("Database nonexistent does not exist in Catalog");
+ catalog.getDatabase("nonexistent");
+ }
+
+ @Test
+ public void testListDatabases() {
+ List<String> actual = catalog.listDatabases();
+
+ assertEquals(
+ Arrays.asList("postgres", "test"),
+ actual
+ );
+ }
+
+ @Test
+ public void testDbExists() throws Exception {
+ assertFalse(catalog.databaseExists("nonexistent"));
+
+ assertTrue(catalog.databaseExists(PostgresCatalog.DEFAULT_DATABASE));
+ }
+
+ // ------ tables ------
+
+ @Test
+ public void testListTables() throws DatabaseNotExistException {
+ List<String> actual = catalog.listTables(PostgresCatalog.DEFAULT_DATABASE);
+
+ assertEquals(Arrays.asList("public.t1"), actual);
+
+ actual = catalog.listTables(TEST_DB);
+
+ assertEquals(Arrays.asList("public.datatypes", "public.t2", "test_schema.t3"), actual);
+ }
+
+ @Test
+ public void testListTables_DatabaseNotExistException() throws DatabaseNotExistException {
+ exception.expect(DatabaseNotExistException.class);
+ catalog.listTables("postgres/nonexistschema");
+ }
+
+ @Test
+ public void testTableExists() {
+ assertFalse(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist")));
+
+ assertTrue(catalog.tableExists(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE1)));
+ assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, TABLE2)));
+ assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, "test_schema.t3")));
+ }
+
+ @Test
+ public void testGetTables_TableNotExistException() throws TableNotExistException {
+ exception.expect(TableNotExistException.class);
+ catalog.getTable(new ObjectPath(TEST_DB, PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+ }
+
+ @Test
+ public void testGetTables_TableNotExistException_NoSchema() throws TableNotExistException {
+ exception.expect(TableNotExistException.class);
+ catalog.getTable(new ObjectPath(TEST_DB, PostgresTablePath.toFlinkTableName("nonexistschema", "anytable")));
+ }
+
+ @Test
+ public void testGetTables_TableNotExistException_NoDb() throws TableNotExistException {
+ exception.expect(TableNotExistException.class);
+ catalog.getTable(new ObjectPath("nonexistdb", PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+ }
+
+ @Test
+ public void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
+ // test postgres.public.user1
+ TableSchema schema = getSimpleTable().schema;
+
+ CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1));
+
+ assertEquals(schema, table.getSchema());
+
+ table = catalog.getTable(new ObjectPath("postgres", "public.t1"));
+
+ assertEquals(schema, table.getSchema());
+
+ // test testdb.public.user2
+ table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2));
+
+ assertEquals(schema, table.getSchema());
+
+ table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2"));
+
+ assertEquals(schema, table.getSchema());
+
+ // test testdb.testschema.user2
+ table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3"));
+
+ assertEquals(schema, table.getSchema());
+
+ }
+
+ @Test
+ public void testDataTypes() throws TableNotExistException {
+ CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, "datatypes"));
+
+ assertEquals(getDataTypesTable().schema, table.getSchema());
+ }
+
+ private static class TestTable {
+ TableSchema schema;
+ String pgSchemaSql;
+
+ public TestTable(TableSchema schema, String pgSchemaSql) {
+ this.schema = schema;
+ this.pgSchemaSql = pgSchemaSql;
+ }
+ }
+
+ private static TestTable getSimpleTable() {
+ return new TestTable(
+ TableSchema.builder()
+ .field("name", DataTypes.INT())
+ .build(),
+ "name integer"
+ );
+ }
+
+ private static TestTable getDataTypesTable() {
+ return new TestTable(
+ TableSchema.builder()
+ .field("int", DataTypes.INT())
+ .field("int_arr", DataTypes.ARRAY(DataTypes.INT()))
+ .field("bytea", DataTypes.BYTES())
+ .field("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES()))
+ .field("short", DataTypes.SMALLINT())
+ .field("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
+ .field("long", DataTypes.BIGINT())
+ .field("long_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
+ .field("real", DataTypes.FLOAT())
+ .field("real_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
+ .field("double_precision", DataTypes.DOUBLE())
+ .field("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .field("numeric", DataTypes.DECIMAL(10, 5))
+ .field("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
+ .field("boolean", DataTypes.BOOLEAN())
+ .field("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
+ .field("text", DataTypes.STRING())
+ .field("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+ .field("char", DataTypes.CHAR(1))
+ .field("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
+ .field("character", DataTypes.CHAR(3))
+ .field("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3)))
+ .field("character_varying", DataTypes.VARCHAR(20))
+ .field("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20)))
+ .field("timestamp", DataTypes.TIMESTAMP())
+ .field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP()))
+ .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .field("timestamptz_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()))
+ .field("date", DataTypes.DATE())
+ .field("date_arr", DataTypes.ARRAY(DataTypes.DATE()))
+ .field("time", DataTypes.TIME())
+ .field("time_arr", DataTypes.ARRAY(DataTypes.TIME()))
+ .build(),
+ "int integer, " +
+ "int_arr integer[], " +
+ "bytea bytea, " +
+ "bytea_arr bytea[], " +
+ "short smallint, " +
+ "short_arr smallint[], " +
+ "long bigint, " +
+ "long_arr bigint[], " +
+ "real real, " +
+ "real_arr real[], " +
+ "double_precision double precision, " +
+ "double_precision_arr double precision[], " +
+ "numeric numeric(10, 5), " +
+ "numeric_arr numeric(10, 5)[], " +
+ "boolean boolean, " +
+ "boolean_arr boolean[], " +
+ "text text, " +
+ "text_arr text[], " +
+ "char char, " +
+ "char_arr char[], " +
+ "character character(3), " +
+ "character_arr character(3)[], " +
+ "character_varying character varying(20), " +
+ "character_varying_arr character varying(20)[], " +
+ "timestamp timestamp(6), " +
+ "timestamp_arr timestamp(6)[], " +
+ "timestamptz timestamptz, " +
+ "timestamptz_arr timestamptz[], " +
+ "date date, " +
+ "date_arr date[], " +
+ "time time(6), " +
+ "time_arr time(6)[]"
+ );
+ }
+
+ private static void createTable(PostgresTablePath tablePath, String tableSchemaSql) throws SQLException {
+ executeSQL(PostgresCatalog.DEFAULT_DATABASE, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+ }
+
+ private static void createTable(String db, PostgresTablePath tablePath, String tableSchemaSql) throws SQLException {
+ executeSQL(db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+ }
+
+ private static void createSchema(String db, String schema) throws SQLException {
+ executeSQL(db, String.format("CREATE SCHEMA %s", schema));
+ }
+
+ private static void createDatabase(String database) throws SQLException {
+ executeSQL(String.format("CREATE DATABASE %s;", database));
+ }
+
+ private static void executeSQL(String sql) throws SQLException {
+ executeSQL("", sql);
+ }
+
+ private static void executeSQL(String db, String sql) throws SQLException {
+ try (Connection conn = DriverManager.getConnection(baseUrl + db, TEST_USERNAME, TEST_PWD);
+ Statement statement = conn.createStatement()) {
+ statement.executeUpdate(sql);
+ } catch (SQLException e) {
+ throw e;
+ }
+ }
+
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java
new file mode 100644
index 0000000..46f32bc
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link PostgresTablePath}.
+ */
+public class PostgresTablePathTest {
+ @Test
+ public void testFromFlinkTableName() {
+ assertEquals(new PostgresTablePath("public", "topic"), PostgresTablePath.fromFlinkTableName("public.topic"));
+ }
+}