You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2020/03/20 23:44:43 UTC

[flink] 01/02: [FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 74b8bdee9fb0bf7cfc27ca8d992dac2a07473a0c
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Mar 6 14:05:27 2020 -0800

    [FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog
    
    closes #11336
---
 .../src/test/resources/log4j2-test.properties      |  28 --
 flink-connectors/flink-jdbc/pom.xml                |  36 ++-
 .../java/io/jdbc/catalog/AbstractJDBCCatalog.java  | 277 ++++++++++++++++++
 .../api/java/io/jdbc/catalog/JDBCCatalog.java      |  84 ++++++
 .../api/java/io/jdbc/catalog/JDBCCatalogUtils.java |  54 ++++
 .../api/java/io/jdbc/catalog/PostgresCatalog.java  | 323 ++++++++++++++++++++
 .../java/io/jdbc/catalog/PostgresTablePath.java    |  95 ++++++
 .../api/java/io/jdbc/dialect/JDBCDialects.java     |  10 +-
 .../java/io/jdbc/catalog/JDBCCatalogUtilsTest.java |  44 +++
 .../io/jdbc/catalog/PostgresCatalogITCase.java     | 325 +++++++++++++++++++++
 .../io/jdbc/catalog/PostgresTablePathTest.java     |  33 +++
 11 files changed, 1275 insertions(+), 34 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties
deleted file mode 100644
index 835c2ec..0000000
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-rootLogger.level = OFF
-rootLogger.appenderRef.test.ref = TestLogger
-
-appender.testlogger.name = TestLogger
-appender.testlogger.type = CONSOLE
-appender.testlogger.target = SYSTEM_ERR
-appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index 3e83311..cb7afab 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -35,6 +35,11 @@ under the License.
 
 	<packaging>jar</packaging>
 
+	<properties>
+		<postgres.version>42.2.10</postgres.version>
+		<otj-pg-embedded.version>0.13.3</otj-pg-embedded.version>
+	</properties>
+
 	<dependencies>
 		<!-- Table ecosystem -->
 		<!-- Projects depending on this project won't depend on flink-table-*. -->
@@ -53,13 +58,17 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<!-- Postgres dependencies -->
+
 		<dependency>
-			<groupId>org.apache.derby</groupId>
-			<artifactId>derby</artifactId>
-			<version>10.14.2.0</version>
-			<scope>test</scope>
+			<groupId>org.postgresql</groupId>
+			<artifactId>postgresql</artifactId>
+			<version>${postgres.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
+		<!-- test dependencies -->
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
@@ -89,5 +98,24 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<!-- Postgres test dependencies -->
+
+		<dependency>
+			<groupId>com.opentable.components</groupId>
+			<artifactId>otj-pg-embedded</artifactId>
+			<version>${otj-pg-embedded.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- Derby test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.derby</groupId>
+			<artifactId>derby</artifactId>
+			<version>10.14.2.0</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 </project>
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
new file mode 100644
index 0000000..523de83
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Abstract catalog for any JDBC catalogs.
+ */
+public abstract class AbstractJDBCCatalog extends AbstractCatalog {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCCatalog.class);
+
+	protected final String username;
+	protected final String pwd;
+	protected final String baseUrl;
+	protected final String defaultUrl;
+
+	public AbstractJDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+		super(catalogName, defaultDatabase);
+
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
+
+		JDBCCatalogUtils.validateJDBCUrl(baseUrl);
+
+		this.username = username;
+		this.pwd = pwd;
+		this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+		this.defaultUrl = baseUrl + defaultDatabase;
+	}
+
+	@Override
+	public void open() throws CatalogException {
+		// test connection, fail early if we cannot connect to database
+		try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+		} catch (SQLException e) {
+			throw new ValidationException(
+				String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
+		}
+
+		LOG.info("Catalog {} established connection to {}", getName(), defaultUrl);
+	}
+
+	@Override
+	public void close() throws CatalogException {
+		LOG.info("Catalog {} closing", getName());
+	}
+
+	// ------ table factory ------
+
+	public Optional<TableFactory> getTableFactory() {
+		return Optional.of(new JDBCTableSourceSinkFactory());
+	}
+
+	// ------ databases ------
+
+	@Override
+	public boolean databaseExists(String databaseName) throws CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+		return listDatabases().contains(databaseName);
+	}
+
+	@Override
+	public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	// ------ tables and views ------
+
+	@Override
+	public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
+		return Collections.emptyList();
+	}
+
+	// ------ partitions ------
+
+	@Override
+	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	// ------ functions ------
+
+	@Override
+	public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	// ------ stats ------
+
+	@Override
+	public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		return CatalogTableStatistics.UNKNOWN;
+	}
+
+	@Override
+	public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		return CatalogColumnStatistics.UNKNOWN;
+	}
+
+	@Override
+	public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+		return CatalogTableStatistics.UNKNOWN;
+	}
+
+	@Override
+	public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+		return CatalogColumnStatistics.UNKNOWN;
+	}
+
+	@Override
+	public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
new file mode 100644
index 0000000..629412c
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Catalogs for relational databases via JDBC.
+ */
+@PublicEvolving
+public class JDBCCatalog extends AbstractJDBCCatalog {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JDBCCatalog.class);
+
+	private final Catalog internal;
+
+	public JDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+		super(catalogName, defaultDatabase, username, pwd, baseUrl);
+
+		internal = JDBCCatalogUtils.createCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
+	}
+
+	// ------ databases -----
+
+	@Override
+	public List<String> listDatabases() throws CatalogException {
+		return internal.listDatabases();
+	}
+
+	@Override
+	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+		return internal.getDatabase(databaseName);
+	}
+
+	// ------ tables and views ------
+
+	@Override
+	public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+		return internal.listTables(databaseName);
+	}
+
+	@Override
+	public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		return internal.getTable(tablePath);
+	}
+
+	@Override
+	public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+		try {
+			return databaseExists(tablePath.getDatabaseName()) &&
+				listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());
+		} catch (DatabaseNotExistException e) {
+			return false;
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java
new file mode 100644
index 0000000..b9e3a19
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utils for {@link JDBCCatalog}.
+ */
+public class JDBCCatalogUtils {
+	/**
+	 * URL has to be without database, like "jdbc:postgresql://localhost:5432/" or "jdbc:postgresql://localhost:5432"
+	 * rather than "jdbc:postgresql://localhost:5432/db".
+	 */
+	public static void validateJDBCUrl(String url) {
+		String[] parts = url.trim().split("\\/+");
+
+		checkArgument(parts.length == 2);
+	}
+
+	/**
+	 * Create catalog instance from given information.
+	 */
+	public static AbstractJDBCCatalog createCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+		JDBCDialect dialect = JDBCDialects.get(baseUrl).get();
+
+		if (dialect instanceof JDBCDialects.PostgresDialect) {
+			return new PostgresCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
+		} else {
+			throw new UnsupportedOperationException(
+				String.format("Catalog for '%s' is not supported yet.", dialect)
+			);
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
new file mode 100644
index 0000000..d12f254
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Catalog for PostgreSQL.
+ */
+@Internal
+public class PostgresCatalog extends AbstractJDBCCatalog {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PostgresCatalog.class);
+
+	public static final String POSTGRES_TABLE_TYPE = "postgres";
+
+	public static final String DEFAULT_DATABASE = "postgres";
+
+	// ------ Postgres default objects that shouldn't be exposed to users ------
+
+	private static final Set<String> builtinDatabases = new HashSet<String>() {{
+		add("template0");
+		add("template1");
+	}};
+
+	private static final Set<String> builtinSchemas = new HashSet<String>() {{
+		add("pg_toast");
+		add("pg_temp_1");
+		add("pg_toast_temp_1");
+		add("pg_catalog");
+		add("information_schema");
+	}};
+
+	protected PostgresCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
+		super(catalogName, defaultDatabase, username, pwd, baseUrl);
+	}
+
+	// ------ databases ------
+
+	@Override
+	public List<String> listDatabases() throws CatalogException {
+		List<String> pgDatabases = new ArrayList<>();
+
+		try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+
+			PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");
+
+			ResultSet rs = ps.executeQuery();
+
+			while (rs.next()) {
+				String dbName = rs.getString(1);
+				if (!builtinDatabases.contains(dbName)) {
+					pgDatabases.add(rs.getString(1));
+				}
+			}
+
+			return pgDatabases;
+		} catch (Exception e) {
+			throw new CatalogException(
+				String.format("Failed listing database in catalog %s", getName()), e);
+		}
+	}
+
+	@Override
+	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+		if (listDatabases().contains(databaseName)) {
+			return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+		} else {
+			throw new DatabaseNotExistException(getName(), databaseName);
+		}
+	}
+
+	// ------ tables ------
+
+	@Override
+	public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+		if (!databaseExists(databaseName)) {
+			throw new DatabaseNotExistException(getName(), databaseName);
+		}
+
+		// get all schemas
+		try (Connection conn = DriverManager.getConnection(baseUrl + databaseName, username, pwd)) {
+			PreparedStatement ps = conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;");
+
+			ResultSet rs = ps.executeQuery();
+
+			List<String> schemas = new ArrayList<>();
+
+			while (rs.next()) {
+				String pgSchema = rs.getString(1);
+				if (!builtinSchemas.contains(pgSchema)) {
+					schemas.add(pgSchema);
+				}
+			}
+
+			List<String> tables = new ArrayList<>();
+
+			for (String schema : schemas) {
+				PreparedStatement stmt = conn.prepareStatement(
+					"SELECT * \n" +
+						"FROM information_schema.tables \n" +
+						"WHERE table_type = 'BASE TABLE' \n" +
+						"    AND table_schema = ? \n" +
+						"ORDER BY table_type, table_name;");
+
+				stmt.setString(1, schema);
+
+				ResultSet rstables = stmt.executeQuery();
+
+				while (rstables.next()) {
+					// position 1 is database name, position 2 is schema name, position 3 is table name
+					tables.add(schema + "." + rstables.getString(3));
+				}
+			}
+
+			return tables;
+		} catch (Exception e) {
+			throw new CatalogException(
+				String.format("Failed listing database in catalog %s", getName()), e);
+		}
+	}
+
+	@Override
+	public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		if (!tableExists(tablePath)) {
+			throw new TableNotExistException(getName(), tablePath);
+		}
+
+		PostgresTablePath pgPath = PostgresTablePath.fromFlinkTableName(tablePath.getObjectName());
+
+		try (Connection conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), username, pwd)) {
+
+			PreparedStatement ps = conn.prepareStatement(
+				String.format("SELECT * FROM %s;", pgPath.getFullPath()));
+
+			ResultSetMetaData rsmd = ps.getMetaData();
+
+			String[] names = new String[rsmd.getColumnCount()];
+			DataType[] types = new DataType[rsmd.getColumnCount()];
+
+			for (int i = 1; i <= rsmd.getColumnCount(); i++) {
+				names[i - 1] = rsmd.getColumnName(i);
+				types[i - 1] = fromJDBCType(rsmd, i);
+			}
+
+			TableSchema tableSchema = new TableSchema.Builder().fields(names, types).build();
+
+			return new CatalogTableImpl(
+				tableSchema,
+				new HashMap<>(),
+				""
+			);
+		} catch (Exception e) {
+			throw new CatalogException(
+				String.format("Failed getting table %s", tablePath.getFullName()), e);
+		}
+	}
+
+	public static final String PG_BYTEA = "bytea";
+	public static final String PG_BYTEA_ARRAY = "_bytea";
+	public static final String PG_SMALLINT = "int2";
+	public static final String PG_SMALLINT_ARRAY = "_int2";
+	public static final String PG_INTEGER = "int4";
+	public static final String PG_INTEGER_ARRAY = "_int4";
+	public static final String PG_BIGINT = "int8";
+	public static final String PG_BIGINT_ARRAY = "_int8";
+	public static final String PG_REAL = "float4";
+	public static final String PG_REAL_ARRAY = "_float4";
+	public static final String PG_DOUBLE_PRECISION = "float8";
+	public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
+	public static final String PG_NUMERIC = "numeric";
+	public static final String PG_NUMERIC_ARRAY = "_numeric";
+	public static final String PG_BOOLEAN = "bool";
+	public static final String PG_BOOLEAN_ARRAY = "_bool";
+	public static final String PG_TIMESTAMP = "timestamp";
+	public static final String PG_TIMESTAMP_ARRAY = "_timestamp";
+	public static final String PG_TIMESTAMPTZ = "timestamptz";
+	public static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
+	public static final String PG_DATE = "date";
+	public static final String PG_DATE_ARRAY = "_date";
+	public static final String PG_TIME = "time";
+	public static final String PG_TIME_ARRAY = "_time";
+	public static final String PG_TEXT = "text";
+	public static final String PG_TEXT_ARRAY = "_text";
+	public static final String PG_CHAR = "bpchar";
+	public static final String PG_CHAR_ARRAY = "_bpchar";
+	public static final String PG_CHARACTER = "character";
+	public static final String PG_CHARACTER_ARRAY = "_character";
+	public static final String PG_CHARACTER_VARYING = "varchar";
+	public static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+
+	private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
+		String pgType = metadata.getColumnTypeName(colIndex);
+
+		int precision = metadata.getPrecision(colIndex);
+
+		switch (pgType) {
+			case PG_BOOLEAN:
+				return DataTypes.BOOLEAN();
+			case PG_BOOLEAN_ARRAY:
+				return DataTypes.ARRAY(DataTypes.BOOLEAN());
+			case PG_BYTEA:
+				return DataTypes.BYTES();
+			case PG_BYTEA_ARRAY:
+				return DataTypes.ARRAY(DataTypes.BYTES());
+			case PG_SMALLINT:
+				return DataTypes.SMALLINT();
+			case PG_SMALLINT_ARRAY:
+				return DataTypes.ARRAY(DataTypes.SMALLINT());
+			case PG_INTEGER:
+				return DataTypes.INT();
+			case PG_INTEGER_ARRAY:
+				return DataTypes.ARRAY(DataTypes.INT());
+			case PG_BIGINT:
+				return DataTypes.BIGINT();
+			case PG_BIGINT_ARRAY:
+				return DataTypes.ARRAY(DataTypes.BIGINT());
+			case PG_REAL:
+				return DataTypes.FLOAT();
+			case PG_REAL_ARRAY:
+				return DataTypes.ARRAY(DataTypes.FLOAT());
+			case PG_DOUBLE_PRECISION:
+				return DataTypes.DOUBLE();
+			case PG_DOUBLE_PRECISION_ARRAY:
+				return DataTypes.ARRAY(DataTypes.DOUBLE());
+			case PG_NUMERIC:
+				return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
+			case PG_NUMERIC_ARRAY:
+				return DataTypes.ARRAY(
+					DataTypes.DECIMAL(precision, metadata.getScale(colIndex)));
+			case PG_CHAR:
+			case PG_CHARACTER:
+				return DataTypes.CHAR(precision);
+			case PG_CHAR_ARRAY:
+			case PG_CHARACTER_ARRAY:
+				return DataTypes.ARRAY(DataTypes.CHAR(precision));
+			case PG_CHARACTER_VARYING:
+				return DataTypes.VARCHAR(precision);
+			case PG_CHARACTER_VARYING_ARRAY:
+				return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
+			case PG_TEXT:
+				return DataTypes.STRING();
+			case PG_TEXT_ARRAY:
+				return DataTypes.ARRAY(DataTypes.STRING());
+			case PG_TIMESTAMP:
+				return DataTypes.TIMESTAMP();
+			case PG_TIMESTAMP_ARRAY:
+				return DataTypes.ARRAY(DataTypes.TIMESTAMP());
+			case PG_TIMESTAMPTZ:
+				return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+			case PG_TIMESTAMPTZ_ARRAY:
+				return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+			case PG_TIME:
+				return DataTypes.TIME();
+			case PG_TIME_ARRAY:
+				return DataTypes.ARRAY(DataTypes.TIME());
+			case PG_DATE:
+				return DataTypes.DATE();
+			case PG_DATE_ARRAY:
+				return DataTypes.ARRAY(DataTypes.DATE());
+			default:
+				throw new UnsupportedOperationException(
+					String.format("Doesn't support Postgres type '%s' yet", pgType));
+		}
+	}
+
+	@Override
+	public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+
+		List<String> tables = null;
+		try {
+			tables = listTables(tablePath.getDatabaseName());
+		} catch (DatabaseNotExistException e) {
+			return false;
+		}
+
+		return tables.contains(PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath());
+	}
+
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java
new file mode 100644
index 0000000..99cc2b4
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Table path of PostgreSQL in Flink. Can be of formats "table_name" or "schema_name.table_name".
+ * When it's "table_name", the schema name defaults to "public".
+ */
+public class PostgresTablePath {
+
+	private static final String DEFAULT_POSTGRES_SCHEMA_NAME = "public";
+
+	private final String pgSchemaName;
+	private final String pgTableName;
+
+	public PostgresTablePath(String pgSchemaName, String pgTableName) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgSchemaName));
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgTableName));
+
+		this.pgSchemaName = pgSchemaName;
+		this.pgTableName = pgTableName;
+	}
+
+	public static PostgresTablePath fromFlinkTableName(String flinkTableName) {
+		if (flinkTableName.contains(".")) {
+			String[] path = flinkTableName.split("\\.");
+
+			checkArgument(path != null && path.length == 2,
+				String.format("Table name '%s' is not valid. The parsed length is %d", flinkTableName, path.length));
+
+			return new PostgresTablePath(path[0], path[1]);
+		} else {
+			return new PostgresTablePath(DEFAULT_POSTGRES_SCHEMA_NAME, flinkTableName);
+		}
+	}
+
+	public static String toFlinkTableName(String schema, String table) {
+		return new PostgresTablePath(schema, table).getFullPath();
+	}
+
+	public String getFullPath() {
+		return String.format("%s.%s", pgSchemaName, pgTableName);
+	}
+
+	public String getFullPathWithQuotes() {
+		return String.format("`%s.%s`", pgSchemaName, pgTableName);
+	}
+
+	@Override
+	public String toString() {
+		return getFullPathWithQuotes();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		PostgresTablePath that = (PostgresTablePath) o;
+		return Objects.equals(pgSchemaName, that.pgSchemaName) &&
+			Objects.equals(pgTableName, that.pgTableName);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(pgSchemaName, pgTableName);
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
index fa7192e..743d16a 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
@@ -203,7 +203,10 @@ public final class JDBCDialects {
 		}
 	}
 
-	private static class MySQLDialect extends AbstractDialect {
+	/**
+	 * MySQL dialect.
+	 */
+	public static class MySQLDialect extends AbstractDialect {
 
 		private static final long serialVersionUID = 1L;
 
@@ -301,7 +304,10 @@ public final class JDBCDialects {
 		}
 	}
 
-	private static class PostgresDialect extends AbstractDialect {
+	/**
+	 * Postgres dialect.
+	 */
+	public static class PostgresDialect extends AbstractDialect {
 
 		private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java
new file mode 100644
index 0000000..7a4132b
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test for {@link JDBCCatalogUtils}.
+ */
+public class JDBCCatalogUtilsTest {
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
+
+	@Test
+	public void testJDBCUrl() {
+		JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/");
+
+		JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432");
+	}
+
+	@Test
+	public void testInvalidJDBCUrl() {
+		exception.expect(IllegalArgumentException.class);
+		JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/db");
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
new file mode 100644
index 0000000..e103780
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
+import com.opentable.db.postgres.junit.SingleInstancePostgresRule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link PostgresCatalog}.
+ */
+public class PostgresCatalogITCase {
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
+
+	@ClassRule
+	public static SingleInstancePostgresRule pg = EmbeddedPostgresRules.singleInstance();
+
+	protected static final String TEST_USERNAME = "postgres";
+	protected static final String TEST_PWD = "postgres";
+	protected static final String TEST_DB = "test";
+	protected static final String TEST_SCHEMA = "test_schema";
+	protected static final String TABLE1 = "t1";
+	protected static final String TABLE2 = "t2";
+	protected static final String TABLE3 = "t3";
+
+	protected static String baseUrl;
+	protected static Catalog catalog;
+
+	public static Catalog createCatalog(String name, String defaultDb, String username, String pwd, String jdbcUrl) {
+		return new PostgresCatalog("mypg", PostgresCatalog.DEFAULT_DATABASE, username, pwd, jdbcUrl);
+	}
+
+	@BeforeClass
+	public static void setup() throws SQLException {
+		// jdbc:postgresql://localhost:50807/postgres?user=postgres
+		String embeddedJdbcUrl = pg.getEmbeddedPostgres().getJdbcUrl(TEST_USERNAME, TEST_PWD);
+		// jdbc:postgresql://localhost:50807/
+		baseUrl = embeddedJdbcUrl.substring(0, embeddedJdbcUrl.lastIndexOf("/") + 1);
+
+		catalog = createCatalog("mypg", PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl);
+
+		// create test database and schema
+		createDatabase(TEST_DB);
+		createSchema(TEST_DB, TEST_SCHEMA);
+
+		// create test tables
+		// table: postgres.public.user1
+		createTable(PostgresTablePath.fromFlinkTableName(TABLE1), getSimpleTable().pgSchemaSql);
+
+		// table: testdb.public.user2
+		// table: testdb.testschema.user3
+		// table: testdb.public.datatypes
+		createTable(TEST_DB, PostgresTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql);
+		createTable(TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql);
+		createTable(TEST_DB, PostgresTablePath.fromFlinkTableName("datatypes"), getDataTypesTable().pgSchemaSql);
+	}
+
+	// ------ databases ------
+
+	@Test
+	public void testGetDb_DatabaseNotExistException() throws Exception {
+		exception.expect(DatabaseNotExistException.class);
+		exception.expectMessage("Database nonexistent does not exist in Catalog");
+		catalog.getDatabase("nonexistent");
+	}
+
+	@Test
+	public void testListDatabases() {
+		List<String> actual = catalog.listDatabases();
+
+		assertEquals(
+			Arrays.asList("postgres", "test"),
+			actual
+		);
+	}
+
+	@Test
+	public void testDbExists() throws Exception {
+		assertFalse(catalog.databaseExists("nonexistent"));
+
+		assertTrue(catalog.databaseExists(PostgresCatalog.DEFAULT_DATABASE));
+	}
+
+	// ------ tables ------
+
+	@Test
+	public void testListTables() throws DatabaseNotExistException {
+		List<String> actual = catalog.listTables(PostgresCatalog.DEFAULT_DATABASE);
+
+		assertEquals(Arrays.asList("public.t1"), actual);
+
+		actual = catalog.listTables(TEST_DB);
+
+		assertEquals(Arrays.asList("public.datatypes", "public.t2", "test_schema.t3"), actual);
+	}
+
+	@Test
+	public void testListTables_DatabaseNotExistException() throws DatabaseNotExistException {
+		exception.expect(DatabaseNotExistException.class);
+		catalog.listTables("postgres/nonexistschema");
+	}
+
+	@Test
+	public void testTableExists() {
+		assertFalse(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist")));
+
+		assertTrue(catalog.tableExists(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE1)));
+		assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, TABLE2)));
+		assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, "test_schema.t3")));
+	}
+
+	@Test
+	public void testGetTables_TableNotExistException() throws TableNotExistException {
+		exception.expect(TableNotExistException.class);
+		catalog.getTable(new ObjectPath(TEST_DB, PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+	}
+
+	@Test
+	public void testGetTables_TableNotExistException_NoSchema() throws TableNotExistException {
+		exception.expect(TableNotExistException.class);
+		catalog.getTable(new ObjectPath(TEST_DB, PostgresTablePath.toFlinkTableName("nonexistschema", "anytable")));
+	}
+
+	@Test
+	public void testGetTables_TableNotExistException_NoDb() throws TableNotExistException {
+		exception.expect(TableNotExistException.class);
+		catalog.getTable(new ObjectPath("nonexistdb", PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+	}
+
+	@Test
+	public void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
+		// test postgres.public.user1
+		TableSchema schema = getSimpleTable().schema;
+
+		CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1));
+
+		assertEquals(schema, table.getSchema());
+
+		table = catalog.getTable(new ObjectPath("postgres", "public.t1"));
+
+		assertEquals(schema, table.getSchema());
+
+		// test testdb.public.user2
+		table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2));
+
+		assertEquals(schema, table.getSchema());
+
+		table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2"));
+
+		assertEquals(schema, table.getSchema());
+
+		// test testdb.testschema.user2
+		table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3"));
+
+		assertEquals(schema, table.getSchema());
+
+	}
+
+	@Test
+	public void testDataTypes() throws TableNotExistException {
+		CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, "datatypes"));
+
+		assertEquals(getDataTypesTable().schema, table.getSchema());
+	}
+
+	private static class TestTable {
+		TableSchema schema;
+		String pgSchemaSql;
+
+		public TestTable(TableSchema schema, String pgSchemaSql) {
+			this.schema = schema;
+			this.pgSchemaSql = pgSchemaSql;
+		}
+	}
+
+	private static TestTable getSimpleTable() {
+		return new TestTable(
+			TableSchema.builder()
+				.field("name", DataTypes.INT())
+				.build(),
+			"name integer"
+		);
+	}
+
+	private static TestTable getDataTypesTable() {
+		return new TestTable(
+			TableSchema.builder()
+				.field("int", DataTypes.INT())
+				.field("int_arr", DataTypes.ARRAY(DataTypes.INT()))
+				.field("bytea", DataTypes.BYTES())
+				.field("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES()))
+				.field("short", DataTypes.SMALLINT())
+				.field("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
+				.field("long", DataTypes.BIGINT())
+				.field("long_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
+				.field("real", DataTypes.FLOAT())
+				.field("real_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
+				.field("double_precision", DataTypes.DOUBLE())
+				.field("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
+				.field("numeric", DataTypes.DECIMAL(10, 5))
+				.field("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
+				.field("boolean", DataTypes.BOOLEAN())
+				.field("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
+				.field("text", DataTypes.STRING())
+				.field("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+				.field("char", DataTypes.CHAR(1))
+				.field("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
+				.field("character", DataTypes.CHAR(3))
+				.field("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3)))
+				.field("character_varying", DataTypes.VARCHAR(20))
+				.field("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20)))
+				.field("timestamp", DataTypes.TIMESTAMP())
+				.field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP()))
+				.field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+				.field("timestamptz_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()))
+				.field("date", DataTypes.DATE())
+				.field("date_arr", DataTypes.ARRAY(DataTypes.DATE()))
+				.field("time", DataTypes.TIME())
+				.field("time_arr", DataTypes.ARRAY(DataTypes.TIME()))
+				.build(),
+			"int integer, " +
+				"int_arr integer[], " +
+				"bytea bytea, " +
+				"bytea_arr bytea[], " +
+				"short smallint, " +
+				"short_arr smallint[], " +
+				"long bigint, " +
+				"long_arr bigint[], " +
+				"real real, " +
+				"real_arr real[], " +
+				"double_precision double precision, " +
+				"double_precision_arr double precision[], " +
+				"numeric numeric(10, 5), " +
+				"numeric_arr numeric(10, 5)[], " +
+				"boolean boolean, " +
+				"boolean_arr boolean[], " +
+				"text text, " +
+				"text_arr text[], " +
+				"char char, " +
+				"char_arr char[], " +
+				"character character(3), " +
+				"character_arr character(3)[], " +
+				"character_varying character varying(20), " +
+				"character_varying_arr character varying(20)[], " +
+				"timestamp timestamp(6), " +
+				"timestamp_arr timestamp(6)[], " +
+				"timestamptz timestamptz, " +
+				"timestamptz_arr timestamptz[], " +
+				"date date, " +
+				"date_arr date[], " +
+				"time time(6), " +
+				"time_arr time(6)[]"
+		);
+	}
+
+	private static void createTable(PostgresTablePath tablePath, String tableSchemaSql) throws SQLException {
+		executeSQL(PostgresCatalog.DEFAULT_DATABASE, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+	}
+
+	private static void createTable(String db, PostgresTablePath tablePath, String tableSchemaSql) throws SQLException {
+		executeSQL(db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+	}
+
+	private static void createSchema(String db, String schema) throws SQLException {
+		executeSQL(db, String.format("CREATE SCHEMA %s", schema));
+	}
+
+	private static void createDatabase(String database) throws SQLException {
+		executeSQL(String.format("CREATE DATABASE %s;", database));
+	}
+
+	private static void executeSQL(String sql) throws SQLException {
+		executeSQL("", sql);
+	}
+
+	private static void executeSQL(String db, String sql) throws SQLException {
+		try (Connection conn = DriverManager.getConnection(baseUrl + db, TEST_USERNAME, TEST_PWD);
+				Statement statement = conn.createStatement()) {
+			statement.executeUpdate(sql);
+		} catch (SQLException e) {
+			throw e;
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java
new file mode 100644
index 0000000..46f32bc
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link PostgresTablePath}.
+ */
+public class PostgresTablePathTest {
+	@Test
+	public void testFromFlinkTableName() {
+		assertEquals(new PostgresTablePath("public", "topic"), PostgresTablePath.fromFlinkTableName("public.topic"));
+	}
+}