You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2020/03/20 23:44:42 UTC

[flink] branch master updated (8a27bd9 -> 75ad29c)

This is an automated email from the ASF dual-hosted git repository.

bli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8a27bd9  [FLINK-16691][python][docs] Improve Python UDF documentation to remind users to install PyFlink on the cluster
     new 74b8bde  [FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog
     new 75ad29c  [FLINK-16472] support precision of timestamp and time data types

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/test/resources/log4j2-test.properties      |  28 --
 flink-connectors/flink-jdbc/pom.xml                |  36 ++-
 .../java/io/jdbc/catalog/AbstractJDBCCatalog.java  | 277 ++++++++++++++++++
 .../api/java/io/jdbc/catalog/JDBCCatalog.java      |  84 ++++++
 .../api/java/io/jdbc/catalog/JDBCCatalogUtils.java |  54 ++++
 .../api/java/io/jdbc/catalog/PostgresCatalog.java  | 322 ++++++++++++++++++++
 .../java/io/jdbc/catalog/PostgresTablePath.java    |  95 ++++++
 .../api/java/io/jdbc/dialect/JDBCDialects.java     |  10 +-
 .../java/io/jdbc/catalog/JDBCCatalogUtilsTest.java |  25 +-
 .../io/jdbc/catalog/PostgresCatalogITCase.java     | 325 +++++++++++++++++++++
 .../io/jdbc/catalog/PostgresTablePathTest.java     |  12 +-
 11 files changed, 1222 insertions(+), 46 deletions(-)
 delete mode 100644 flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties
 create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
 create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
 create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java
 create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
 create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java => flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java (60%)
 create mode 100644 flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
 copy flink-table/flink-table-common/src/test/java/org/apache/flink/table/module/CoreModuleTest.java => flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java (72%)


[flink] 01/02: [FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 74b8bdee9fb0bf7cfc27ca8d992dac2a07473a0c
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Mar 6 14:05:27 2020 -0800

    [FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog
    
    closes #11336
---
 .../src/test/resources/log4j2-test.properties      |  28 --
 flink-connectors/flink-jdbc/pom.xml                |  36 ++-
 .../java/io/jdbc/catalog/AbstractJDBCCatalog.java  | 277 ++++++++++++++++++
 .../api/java/io/jdbc/catalog/JDBCCatalog.java      |  84 ++++++
 .../api/java/io/jdbc/catalog/JDBCCatalogUtils.java |  54 ++++
 .../api/java/io/jdbc/catalog/PostgresCatalog.java  | 323 ++++++++++++++++++++
 .../java/io/jdbc/catalog/PostgresTablePath.java    |  95 ++++++
 .../api/java/io/jdbc/dialect/JDBCDialects.java     |  10 +-
 .../java/io/jdbc/catalog/JDBCCatalogUtilsTest.java |  44 +++
 .../io/jdbc/catalog/PostgresCatalogITCase.java     | 325 +++++++++++++++++++++
 .../io/jdbc/catalog/PostgresTablePathTest.java     |  33 +++
 11 files changed, 1275 insertions(+), 34 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties
deleted file mode 100644
index 835c2ec..0000000
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-rootLogger.level = OFF
-rootLogger.appenderRef.test.ref = TestLogger
-
-appender.testlogger.name = TestLogger
-appender.testlogger.type = CONSOLE
-appender.testlogger.target = SYSTEM_ERR
-appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index 3e83311..cb7afab 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -35,6 +35,11 @@ under the License.
 
 	<packaging>jar</packaging>
 
+	<properties>
+		<postgres.version>42.2.10</postgres.version>
+		<otj-pg-embedded.version>0.13.3</otj-pg-embedded.version>
+	</properties>
+
 	<dependencies>
 		<!-- Table ecosystem -->
 		<!-- Projects depending on this project won't depend on flink-table-*. -->
@@ -53,13 +58,17 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<!-- Postgres dependencies -->
+
 		<dependency>
-			<groupId>org.apache.derby</groupId>
-			<artifactId>derby</artifactId>
-			<version>10.14.2.0</version>
-			<scope>test</scope>
+			<groupId>org.postgresql</groupId>
+			<artifactId>postgresql</artifactId>
+			<version>${postgres.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
+		<!-- test dependencies -->
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
@@ -89,5 +98,24 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<!-- Postgres test dependencies -->
+
+		<dependency>
+			<groupId>com.opentable.components</groupId>
+			<artifactId>otj-pg-embedded</artifactId>
+			<version>${otj-pg-embedded.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- Derby test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.derby</groupId>
+			<artifactId>derby</artifactId>
+			<version>10.14.2.0</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 </project>
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
new file mode 100644
index 0000000..523de83
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Abstract catalog for any JDBC catalogs.
+ */
+public abstract class AbstractJDBCCatalog extends AbstractCatalog {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCCatalog.class);
+
+	protected final String username;
+	protected final String pwd;
+	protected final String baseUrl;
+	protected final String defaultUrl;
+
+	public AbstractJDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+		super(catalogName, defaultDatabase);
+
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
+
+		JDBCCatalogUtils.validateJDBCUrl(baseUrl);
+
+		this.username = username;
+		this.pwd = pwd;
+		this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+		this.defaultUrl = baseUrl + defaultDatabase;
+	}
+
+	@Override
+	public void open() throws CatalogException {
+		// test connection, fail early if we cannot connect to database
+		try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+		} catch (SQLException e) {
+			throw new ValidationException(
+				String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
+		}
+
+		LOG.info("Catalog {} established connection to {}", getName(), defaultUrl);
+	}
+
+	@Override
+	public void close() throws CatalogException {
+		LOG.info("Catalog {} closing", getName());
+	}
+
+	// ------ table factory ------
+
+	public Optional<TableFactory> getTableFactory() {
+		return Optional.of(new JDBCTableSourceSinkFactory());
+	}
+
+	// ------ databases ------
+
+	@Override
+	public boolean databaseExists(String databaseName) throws CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+		return listDatabases().contains(databaseName);
+	}
+
+	@Override
+	public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	// ------ tables and views ------
+
+	@Override
+	public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
+		return Collections.emptyList();
+	}
+
+	// ------ partitions ------
+
+	@Override
+	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	// ------ functions ------
+
+	@Override
+	public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	// ------ stats ------
+
+	@Override
+	public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		return CatalogTableStatistics.UNKNOWN;
+	}
+
+	@Override
+	public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		return CatalogColumnStatistics.UNKNOWN;
+	}
+
+	@Override
+	public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+		return CatalogTableStatistics.UNKNOWN;
+	}
+
+	@Override
+	public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+		return CatalogColumnStatistics.UNKNOWN;
+	}
+
+	@Override
+	public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
new file mode 100644
index 0000000..629412c
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Catalogs for relational databases via JDBC.
+ */
+@PublicEvolving
+public class JDBCCatalog extends AbstractJDBCCatalog {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JDBCCatalog.class);
+
+	private final Catalog internal;
+
+	public JDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+		super(catalogName, defaultDatabase, username, pwd, baseUrl);
+
+		internal = JDBCCatalogUtils.createCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
+	}
+
+	// ------ databases -----
+
+	@Override
+	public List<String> listDatabases() throws CatalogException {
+		return internal.listDatabases();
+	}
+
+	@Override
+	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+		return internal.getDatabase(databaseName);
+	}
+
+	// ------ tables and views ------
+
+	@Override
+	public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+		return internal.listTables(databaseName);
+	}
+
+	@Override
+	public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		return internal.getTable(tablePath);
+	}
+
+	@Override
+	public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+		try {
+			return databaseExists(tablePath.getDatabaseName()) &&
+				listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());
+		} catch (DatabaseNotExistException e) {
+			return false;
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java
new file mode 100644
index 0000000..b9e3a19
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utils for {@link JDBCCatalog}.
+ */
+public class JDBCCatalogUtils {
+	/**
+	 * URL has to be without database, like "jdbc:postgresql://localhost:5432/" or "jdbc:postgresql://localhost:5432"
+	 * rather than "jdbc:postgresql://localhost:5432/db".
+	 */
+	public static void validateJDBCUrl(String url) {
+		String[] parts = url.trim().split("\\/+");
+
+		checkArgument(parts.length == 2);
+	}
+
+	/**
+	 * Create catalog instance from given information.
+	 */
+	public static AbstractJDBCCatalog createCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+		JDBCDialect dialect = JDBCDialects.get(baseUrl).get();
+
+		if (dialect instanceof JDBCDialects.PostgresDialect) {
+			return new PostgresCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
+		} else {
+			throw new UnsupportedOperationException(
+				String.format("Catalog for '%s' is not supported yet.", dialect)
+			);
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
new file mode 100644
index 0000000..d12f254
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Catalog for PostgreSQL.
+ */
+@Internal
+public class PostgresCatalog extends AbstractJDBCCatalog {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PostgresCatalog.class);
+
+	public static final String POSTGRES_TABLE_TYPE = "postgres";
+
+	public static final String DEFAULT_DATABASE = "postgres";
+
+	// ------ Postgres default objects that shouldn't be exposed to users ------
+
+	private static final Set<String> builtinDatabases = new HashSet<String>() {{
+		add("template0");
+		add("template1");
+	}};
+
+	private static final Set<String> builtinSchemas = new HashSet<String>() {{
+		add("pg_toast");
+		add("pg_temp_1");
+		add("pg_toast_temp_1");
+		add("pg_catalog");
+		add("information_schema");
+	}};
+
+	protected PostgresCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+		super(catalogName, defaultDatabase, username, pwd, baseUrl);
+	}
+
+	// ------ databases ------
+
+	@Override
+	public List<String> listDatabases() throws CatalogException {
+		List<String> pgDatabases = new ArrayList<>();
+
+		try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+
+			PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");
+
+			ResultSet rs = ps.executeQuery();
+
+			while (rs.next()) {
+				String dbName = rs.getString(1);
+				if (!builtinDatabases.contains(dbName)) {
+					pgDatabases.add(rs.getString(1));
+				}
+			}
+
+			return pgDatabases;
+		} catch (Exception e) {
+			throw new CatalogException(
+				String.format("Failed listing database in catalog %s", getName()), e);
+		}
+	}
+
+	@Override
+	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+		if (listDatabases().contains(databaseName)) {
+			return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+		} else {
+			throw new DatabaseNotExistException(getName(), databaseName);
+		}
+	}
+
+	// ------ tables ------
+
+	@Override
+	public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+		if (!databaseExists(databaseName)) {
+			throw new DatabaseNotExistException(getName(), databaseName);
+		}
+
+		// get all schemas
+		try (Connection conn = DriverManager.getConnection(baseUrl + databaseName, username, pwd)) {
+			PreparedStatement ps = conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;");
+
+			ResultSet rs = ps.executeQuery();
+
+			List<String> schemas = new ArrayList<>();
+
+			while (rs.next()) {
+				String pgSchema = rs.getString(1);
+				if (!builtinSchemas.contains(pgSchema)) {
+					schemas.add(pgSchema);
+				}
+			}
+
+			List<String> tables = new ArrayList<>();
+
+			for (String schema : schemas) {
+				PreparedStatement stmt = conn.prepareStatement(
+					"SELECT * \n" +
+						"FROM information_schema.tables \n" +
+						"WHERE table_type = 'BASE TABLE' \n" +
+						"    AND table_schema = ? \n" +
+						"ORDER BY table_type, table_name;");
+
+				stmt.setString(1, schema);
+
+				ResultSet rstables = stmt.executeQuery();
+
+				while (rstables.next()) {
+					// position 1 is database name, position 2 is schema name, position 3 is table name
+					tables.add(schema + "." + rstables.getString(3));
+				}
+			}
+
+			return tables;
+		} catch (Exception e) {
+			throw new CatalogException(
+				String.format("Failed listing database in catalog %s", getName()), e);
+		}
+	}
+
+	@Override
+	public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		if (!tableExists(tablePath)) {
+			throw new TableNotExistException(getName(), tablePath);
+		}
+
+		PostgresTablePath pgPath = PostgresTablePath.fromFlinkTableName(tablePath.getObjectName());
+
+		try (Connection conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), username, pwd)) {
+
+			PreparedStatement ps = conn.prepareStatement(
+				String.format("SELECT * FROM %s;", pgPath.getFullPath()));
+
+			ResultSetMetaData rsmd = ps.getMetaData();
+
+			String[] names = new String[rsmd.getColumnCount()];
+			DataType[] types = new DataType[rsmd.getColumnCount()];
+
+			for (int i = 1; i <= rsmd.getColumnCount(); i++) {
+				names[i - 1] = rsmd.getColumnName(i);
+				types[i - 1] = fromJDBCType(rsmd, i);
+			}
+
+			TableSchema tableSchema = new TableSchema.Builder().fields(names, types).build();
+
+			return new CatalogTableImpl(
+				tableSchema,
+				new HashMap<>(),
+				""
+			);
+		} catch (Exception e) {
+			throw new CatalogException(
+				String.format("Failed getting table %s", tablePath.getFullName()), e);
+		}
+	}
+
+	public static final String PG_BYTEA = "bytea";
+	public static final String PG_BYTEA_ARRAY = "_bytea";
+	public static final String PG_SMALLINT = "int2";
+	public static final String PG_SMALLINT_ARRAY = "_int2";
+	public static final String PG_INTEGER = "int4";
+	public static final String PG_INTEGER_ARRAY = "_int4";
+	public static final String PG_BIGINT = "int8";
+	public static final String PG_BIGINT_ARRAY = "_int8";
+	public static final String PG_REAL = "float4";
+	public static final String PG_REAL_ARRAY = "_float4";
+	public static final String PG_DOUBLE_PRECISION = "float8";
+	public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
+	public static final String PG_NUMERIC = "numeric";
+	public static final String PG_NUMERIC_ARRAY = "_numeric";
+	public static final String PG_BOOLEAN = "bool";
+	public static final String PG_BOOLEAN_ARRAY = "_bool";
+	public static final String PG_TIMESTAMP = "timestamp";
+	public static final String PG_TIMESTAMP_ARRAY = "_timestamp";
+	public static final String PG_TIMESTAMPTZ = "timestamptz";
+	public static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
+	public static final String PG_DATE = "date";
+	public static final String PG_DATE_ARRAY = "_date";
+	public static final String PG_TIME = "time";
+	public static final String PG_TIME_ARRAY = "_time";
+	public static final String PG_TEXT = "text";
+	public static final String PG_TEXT_ARRAY = "_text";
+	public static final String PG_CHAR = "bpchar";
+	public static final String PG_CHAR_ARRAY = "_bpchar";
+	public static final String PG_CHARACTER = "character";
+	public static final String PG_CHARACTER_ARRAY = "_character";
+	public static final String PG_CHARACTER_VARYING = "varchar";
+	public static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+
+	private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
+		String pgType = metadata.getColumnTypeName(colIndex);
+
+		int precision = metadata.getPrecision(colIndex);
+
+		switch (pgType) {
+			case PG_BOOLEAN:
+				return DataTypes.BOOLEAN();
+			case PG_BOOLEAN_ARRAY:
+				return DataTypes.ARRAY(DataTypes.BOOLEAN());
+			case PG_BYTEA:
+				return DataTypes.BYTES();
+			case PG_BYTEA_ARRAY:
+				return DataTypes.ARRAY(DataTypes.BYTES());
+			case PG_SMALLINT:
+				return DataTypes.SMALLINT();
+			case PG_SMALLINT_ARRAY:
+				return DataTypes.ARRAY(DataTypes.SMALLINT());
+			case PG_INTEGER:
+				return DataTypes.INT();
+			case PG_INTEGER_ARRAY:
+				return DataTypes.ARRAY(DataTypes.INT());
+			case PG_BIGINT:
+				return DataTypes.BIGINT();
+			case PG_BIGINT_ARRAY:
+				return DataTypes.ARRAY(DataTypes.BIGINT());
+			case PG_REAL:
+				return DataTypes.FLOAT();
+			case PG_REAL_ARRAY:
+				return DataTypes.ARRAY(DataTypes.FLOAT());
+			case PG_DOUBLE_PRECISION:
+				return DataTypes.DOUBLE();
+			case PG_DOUBLE_PRECISION_ARRAY:
+				return DataTypes.ARRAY(DataTypes.DOUBLE());
+			case PG_NUMERIC:
+				return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
+			case PG_NUMERIC_ARRAY:
+				return DataTypes.ARRAY(
+					DataTypes.DECIMAL(precision, metadata.getScale(colIndex)));
+			case PG_CHAR:
+			case PG_CHARACTER:
+				return DataTypes.CHAR(precision);
+			case PG_CHAR_ARRAY:
+			case PG_CHARACTER_ARRAY:
+				return DataTypes.ARRAY(DataTypes.CHAR(precision));
+			case PG_CHARACTER_VARYING:
+				return DataTypes.VARCHAR(precision);
+			case PG_CHARACTER_VARYING_ARRAY:
+				return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
+			case PG_TEXT:
+				return DataTypes.STRING();
+			case PG_TEXT_ARRAY:
+				return DataTypes.ARRAY(DataTypes.STRING());
+			case PG_TIMESTAMP:
+				return DataTypes.TIMESTAMP();
+			case PG_TIMESTAMP_ARRAY:
+				return DataTypes.ARRAY(DataTypes.TIMESTAMP());
+			case PG_TIMESTAMPTZ:
+				return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+			case PG_TIMESTAMPTZ_ARRAY:
+				return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+			case PG_TIME:
+				return DataTypes.TIME();
+			case PG_TIME_ARRAY:
+				return DataTypes.ARRAY(DataTypes.TIME());
+			case PG_DATE:
+				return DataTypes.DATE();
+			case PG_DATE_ARRAY:
+				return DataTypes.ARRAY(DataTypes.DATE());
+			default:
+				throw new UnsupportedOperationException(
+					String.format("Doesn't support Postgres type '%s' yet", pgType));
+		}
+	}
+
+	@Override
+	public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+
+		List<String> tables = null;
+		try {
+			tables = listTables(tablePath.getDatabaseName());
+		} catch (DatabaseNotExistException e) {
+			return false;
+		}
+
+		return tables.contains(PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath());
+	}
+
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java
new file mode 100644
index 0000000..99cc2b4
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Table path of PostgreSQL in Flink. Can be of formats "table_name" or "schema_name.table_name".
+ * When it's "table_name", the schema name defaults to "public".
+ */
+public class PostgresTablePath {
+
+	private static final String DEFAULT_POSTGRES_SCHEMA_NAME = "public";
+
+	private final String pgSchemaName;
+	private final String pgTableName;
+
+	public PostgresTablePath(String pgSchemaName, String pgTableName) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgSchemaName));
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgTableName));
+
+		this.pgSchemaName = pgSchemaName;
+		this.pgTableName = pgTableName;
+	}
+
+	public static PostgresTablePath fromFlinkTableName(String flinkTableName) {
+		if (flinkTableName.contains(".")) {
+			String[] path = flinkTableName.split("\\.");
+
+			checkArgument(path != null && path.length == 2,
+				String.format("Table name '%s' is not valid. The parsed length is %d", flinkTableName, path.length));
+
+			return new PostgresTablePath(path[0], path[1]);
+		} else {
+			return new PostgresTablePath(DEFAULT_POSTGRES_SCHEMA_NAME, flinkTableName);
+		}
+	}
+
+	public static String toFlinkTableName(String schema, String table) {
+		return new PostgresTablePath(schema, table).getFullPath();
+	}
+
+	public String getFullPath() {
+		return String.format("%s.%s", pgSchemaName, pgTableName);
+	}
+
+	public String getFullPathWithQuotes() {
+		return String.format("`%s.%s`", pgSchemaName, pgTableName);
+	}
+
+	@Override
+	public String toString() {
+		return getFullPathWithQuotes();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		PostgresTablePath that = (PostgresTablePath) o;
+		return Objects.equals(pgSchemaName, that.pgSchemaName) &&
+			Objects.equals(pgTableName, that.pgTableName);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(pgSchemaName, pgTableName);
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
index fa7192e..743d16a 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
@@ -203,7 +203,10 @@ public final class JDBCDialects {
 		}
 	}
 
-	private static class MySQLDialect extends AbstractDialect {
+	/**
+	 * MySQL dialect.
+	 */
+	public static class MySQLDialect extends AbstractDialect {
 
 		private static final long serialVersionUID = 1L;
 
@@ -301,7 +304,10 @@ public final class JDBCDialects {
 		}
 	}
 
-	private static class PostgresDialect extends AbstractDialect {
+	/**
+	 * Postgres dialect.
+	 */
+	public static class PostgresDialect extends AbstractDialect {
 
 		private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java
new file mode 100644
index 0000000..7a4132b
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test for {@link JDBCCatalogUtils}.
+ */
+public class JDBCCatalogUtilsTest {
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
+
+	@Test
+	public void testJDBCUrl() {
+		JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/");
+
+		JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432");
+	}
+
+	@Test
+	public void testInvalidJDBCUrl() {
+		exception.expect(IllegalArgumentException.class);
+		JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/db");
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
new file mode 100644
index 0000000..e103780
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
+import com.opentable.db.postgres.junit.SingleInstancePostgresRule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link PostgresCatalog}.
+ */
+public class PostgresCatalogITCase {
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
+
+	@ClassRule
+	public static SingleInstancePostgresRule pg = EmbeddedPostgresRules.singleInstance();
+
+	protected static final String TEST_USERNAME = "postgres";
+	protected static final String TEST_PWD = "postgres";
+	protected static final String TEST_DB = "test";
+	protected static final String TEST_SCHEMA = "test_schema";
+	protected static final String TABLE1 = "t1";
+	protected static final String TABLE2 = "t2";
+	protected static final String TABLE3 = "t3";
+
+	protected static String baseUrl;
+	protected static Catalog catalog;
+
+	public static Catalog createCatalog(String name, String defaultDb, String username, String pwd, String jdbcUrl) {
+		return new PostgresCatalog("mypg", PostgresCatalog.DEFAULT_DATABASE, username, pwd, jdbcUrl);
+	}
+
+	@BeforeClass
+	public static void setup() throws SQLException {
+		// jdbc:postgresql://localhost:50807/postgres?user=postgres
+		String embeddedJdbcUrl = pg.getEmbeddedPostgres().getJdbcUrl(TEST_USERNAME, TEST_PWD);
+		// jdbc:postgresql://localhost:50807/
+		baseUrl = embeddedJdbcUrl.substring(0, embeddedJdbcUrl.lastIndexOf("/") + 1);
+
+		catalog = createCatalog("mypg", PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl);
+
+		// create test database and schema
+		createDatabase(TEST_DB);
+		createSchema(TEST_DB, TEST_SCHEMA);
+
+		// create test tables
+		// table: postgres.public.user1
+		createTable(PostgresTablePath.fromFlinkTableName(TABLE1), getSimpleTable().pgSchemaSql);
+
+		// table: testdb.public.user2
+		// table: testdb.testschema.user3
+		// table: testdb.public.datatypes
+		createTable(TEST_DB, PostgresTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql);
+		createTable(TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql);
+		createTable(TEST_DB, PostgresTablePath.fromFlinkTableName("datatypes"), getDataTypesTable().pgSchemaSql);
+	}
+
+	// ------ databases ------
+
+	@Test
+	public void testGetDb_DatabaseNotExistException() throws Exception {
+		exception.expect(DatabaseNotExistException.class);
+		exception.expectMessage("Database nonexistent does not exist in Catalog");
+		catalog.getDatabase("nonexistent");
+	}
+
+	@Test
+	public void testListDatabases() {
+		List<String> actual = catalog.listDatabases();
+
+		assertEquals(
+			Arrays.asList("postgres", "test"),
+			actual
+		);
+	}
+
+	@Test
+	public void testDbExists() throws Exception {
+		assertFalse(catalog.databaseExists("nonexistent"));
+
+		assertTrue(catalog.databaseExists(PostgresCatalog.DEFAULT_DATABASE));
+	}
+
+	// ------ tables ------
+
+	@Test
+	public void testListTables() throws DatabaseNotExistException {
+		List<String> actual = catalog.listTables(PostgresCatalog.DEFAULT_DATABASE);
+
+		assertEquals(Arrays.asList("public.t1"), actual);
+
+		actual = catalog.listTables(TEST_DB);
+
+		assertEquals(Arrays.asList("public.datatypes", "public.t2", "test_schema.t3"), actual);
+	}
+
+	@Test
+	public void testListTables_DatabaseNotExistException() throws DatabaseNotExistException {
+		exception.expect(DatabaseNotExistException.class);
+		catalog.listTables("postgres/nonexistschema");
+	}
+
+	@Test
+	public void testTableExists() {
+		assertFalse(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist")));
+
+		assertTrue(catalog.tableExists(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE1)));
+		assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, TABLE2)));
+		assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, "test_schema.t3")));
+	}
+
+	@Test
+	public void testGetTables_TableNotExistException() throws TableNotExistException {
+		exception.expect(TableNotExistException.class);
+		catalog.getTable(new ObjectPath(TEST_DB, PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+	}
+
+	@Test
+	public void testGetTables_TableNotExistException_NoSchema() throws TableNotExistException {
+		exception.expect(TableNotExistException.class);
+		catalog.getTable(new ObjectPath(TEST_DB, PostgresTablePath.toFlinkTableName("nonexistschema", "anytable")));
+	}
+
+	@Test
+	public void testGetTables_TableNotExistException_NoDb() throws TableNotExistException {
+		exception.expect(TableNotExistException.class);
+		catalog.getTable(new ObjectPath("nonexistdb", PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+	}
+
+	@Test
+	public void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
+		// test postgres.public.user1
+		TableSchema schema = getSimpleTable().schema;
+
+		CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1));
+
+		assertEquals(schema, table.getSchema());
+
+		table = catalog.getTable(new ObjectPath("postgres", "public.t1"));
+
+		assertEquals(schema, table.getSchema());
+
+		// test testdb.public.user2
+		table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2));
+
+		assertEquals(schema, table.getSchema());
+
+		table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2"));
+
+		assertEquals(schema, table.getSchema());
+
+		// test testdb.testschema.user2
+		table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3"));
+
+		assertEquals(schema, table.getSchema());
+
+	}
+
+	@Test
+	public void testDataTypes() throws TableNotExistException {
+		CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, "datatypes"));
+
+		assertEquals(getDataTypesTable().schema, table.getSchema());
+	}
+
+	private static class TestTable {
+		TableSchema schema;
+		String pgSchemaSql;
+
+		public TestTable(TableSchema schema, String pgSchemaSql) {
+			this.schema = schema;
+			this.pgSchemaSql = pgSchemaSql;
+		}
+	}
+
+	private static TestTable getSimpleTable() {
+		return new TestTable(
+			TableSchema.builder()
+				.field("name", DataTypes.INT())
+				.build(),
+			"name integer"
+		);
+	}
+
+	private static TestTable getDataTypesTable() {
+		return new TestTable(
+			TableSchema.builder()
+				.field("int", DataTypes.INT())
+				.field("int_arr", DataTypes.ARRAY(DataTypes.INT()))
+				.field("bytea", DataTypes.BYTES())
+				.field("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES()))
+				.field("short", DataTypes.SMALLINT())
+				.field("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
+				.field("long", DataTypes.BIGINT())
+				.field("long_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
+				.field("real", DataTypes.FLOAT())
+				.field("real_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
+				.field("double_precision", DataTypes.DOUBLE())
+				.field("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
+				.field("numeric", DataTypes.DECIMAL(10, 5))
+				.field("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
+				.field("boolean", DataTypes.BOOLEAN())
+				.field("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
+				.field("text", DataTypes.STRING())
+				.field("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+				.field("char", DataTypes.CHAR(1))
+				.field("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
+				.field("character", DataTypes.CHAR(3))
+				.field("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3)))
+				.field("character_varying", DataTypes.VARCHAR(20))
+				.field("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20)))
+				.field("timestamp", DataTypes.TIMESTAMP())
+				.field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP()))
+				.field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+				.field("timestamptz_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()))
+				.field("date", DataTypes.DATE())
+				.field("date_arr", DataTypes.ARRAY(DataTypes.DATE()))
+				.field("time", DataTypes.TIME())
+				.field("time_arr", DataTypes.ARRAY(DataTypes.TIME()))
+				.build(),
+			"int integer, " +
+				"int_arr integer[], " +
+				"bytea bytea, " +
+				"bytea_arr bytea[], " +
+				"short smallint, " +
+				"short_arr smallint[], " +
+				"long bigint, " +
+				"long_arr bigint[], " +
+				"real real, " +
+				"real_arr real[], " +
+				"double_precision double precision, " +
+				"double_precision_arr double precision[], " +
+				"numeric numeric(10, 5), " +
+				"numeric_arr numeric(10, 5)[], " +
+				"boolean boolean, " +
+				"boolean_arr boolean[], " +
+				"text text, " +
+				"text_arr text[], " +
+				"char char, " +
+				"char_arr char[], " +
+				"character character(3), " +
+				"character_arr character(3)[], " +
+				"character_varying character varying(20), " +
+				"character_varying_arr character varying(20)[], " +
+				"timestamp timestamp(6), " +
+				"timestamp_arr timestamp(6)[], " +
+				"timestamptz timestamptz, " +
+				"timestamptz_arr timestamptz[], " +
+				"date date, " +
+				"date_arr date[], " +
+				"time time(6), " +
+				"time_arr time(6)[]"
+		);
+	}
+
+	private static void createTable(PostgresTablePath tablePath, String tableSchemaSql) throws SQLException {
+		executeSQL(PostgresCatalog.DEFAULT_DATABASE, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+	}
+
+	private static void createTable(String db, PostgresTablePath tablePath, String tableSchemaSql) throws SQLException {
+		executeSQL(db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+	}
+
+	private static void createSchema(String db, String schema) throws SQLException {
+		executeSQL(db, String.format("CREATE SCHEMA %s", schema));
+	}
+
+	private static void createDatabase(String database) throws SQLException {
+		executeSQL(String.format("CREATE DATABASE %s;", database));
+	}
+
+	private static void executeSQL(String sql) throws SQLException {
+		executeSQL("", sql);
+	}
+
+	private static void executeSQL(String db, String sql) throws SQLException {
+		try (Connection conn = DriverManager.getConnection(baseUrl + db, TEST_USERNAME, TEST_PWD);
+				Statement statement = conn.createStatement()) {
+			statement.executeUpdate(sql);
+		} catch (SQLException e) {
+			throw e;
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java
new file mode 100644
index 0000000..46f32bc
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link PostgresTablePath}.
+ */
+public class PostgresTablePathTest {
+	@Test
+	public void testFromFlinkTableName() {
+		assertEquals(new PostgresTablePath("public", "topic"), PostgresTablePath.fromFlinkTableName("public.topic"));
+	}
+}


[flink] 02/02: [FLINK-16472] support precision of timestamp and time data types

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 75ad29cb9f4f377df27b71e67dbd33f36bb08bee
Author: bowen.li <bo...@gmail.com>
AuthorDate: Sun Mar 8 20:58:27 2020 -0700

    [FLINK-16472] support precision of timestamp and time data types
    
    closes #11336
---
 .../api/java/io/jdbc/catalog/PostgresCatalog.java  | 15 +++++++-------
 .../io/jdbc/catalog/PostgresCatalogITCase.java     | 24 +++++++++++-----------
 2 files changed, 19 insertions(+), 20 deletions(-)

diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
index d12f254..c598073 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
@@ -55,8 +55,6 @@ public class PostgresCatalog extends AbstractJDBCCatalog {
 
 	private static final Logger LOG = LoggerFactory.getLogger(PostgresCatalog.class);
 
-	public static final String POSTGRES_TABLE_TYPE = "postgres";
-
 	public static final String DEFAULT_DATABASE = "postgres";
 
 	// ------ Postgres default objects that shouldn't be exposed to users ------
@@ -236,6 +234,7 @@ public class PostgresCatalog extends AbstractJDBCCatalog {
 		String pgType = metadata.getColumnTypeName(colIndex);
 
 		int precision = metadata.getPrecision(colIndex);
+		int scale = metadata.getScale(colIndex);
 
 		switch (pgType) {
 			case PG_BOOLEAN:
@@ -286,17 +285,17 @@ public class PostgresCatalog extends AbstractJDBCCatalog {
 			case PG_TEXT_ARRAY:
 				return DataTypes.ARRAY(DataTypes.STRING());
 			case PG_TIMESTAMP:
-				return DataTypes.TIMESTAMP();
+				return DataTypes.TIMESTAMP(scale);
 			case PG_TIMESTAMP_ARRAY:
-				return DataTypes.ARRAY(DataTypes.TIMESTAMP());
+				return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale));
 			case PG_TIMESTAMPTZ:
-				return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+				return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale);
 			case PG_TIMESTAMPTZ_ARRAY:
-				return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+				return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale));
 			case PG_TIME:
-				return DataTypes.TIME();
+				return DataTypes.TIME(scale);
 			case PG_TIME_ARRAY:
-				return DataTypes.ARRAY(DataTypes.TIME());
+				return DataTypes.ARRAY(DataTypes.TIME(scale));
 			case PG_DATE:
 				return DataTypes.DATE();
 			case PG_DATE_ARRAY:
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
index e103780..b197bf0 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
@@ -249,14 +249,14 @@ public class PostgresCatalogITCase {
 				.field("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3)))
 				.field("character_varying", DataTypes.VARCHAR(20))
 				.field("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20)))
-				.field("timestamp", DataTypes.TIMESTAMP())
-				.field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP()))
-				.field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
-				.field("timestamptz_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()))
+				.field("timestamp", DataTypes.TIMESTAMP(5))
+				.field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5)))
+				.field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))
+				.field("timestamptz_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)))
 				.field("date", DataTypes.DATE())
 				.field("date_arr", DataTypes.ARRAY(DataTypes.DATE()))
-				.field("time", DataTypes.TIME())
-				.field("time_arr", DataTypes.ARRAY(DataTypes.TIME()))
+				.field("time", DataTypes.TIME(3))
+				.field("time_arr", DataTypes.ARRAY(DataTypes.TIME(3)))
 				.build(),
 			"int integer, " +
 				"int_arr integer[], " +
@@ -282,14 +282,14 @@ public class PostgresCatalogITCase {
 				"character_arr character(3)[], " +
 				"character_varying character varying(20), " +
 				"character_varying_arr character varying(20)[], " +
-				"timestamp timestamp(6), " +
-				"timestamp_arr timestamp(6)[], " +
-				"timestamptz timestamptz, " +
-				"timestamptz_arr timestamptz[], " +
+				"timestamp timestamp(5), " +
+				"timestamp_arr timestamp(5)[], " +
+				"timestamptz timestamptz(4), " +
+				"timestamptz_arr timestamptz(4)[], " +
 				"date date, " +
 				"date_arr date[], " +
-				"time time(6), " +
-				"time_arr time(6)[]"
+				"time time(3), " +
+				"time_arr time(3)[]"
 		);
 	}