You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 14:35:39 UTC

[flink] 01/03: [FLINK-17356][jdbc][postgres] Support PK and Unique constraints

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a83ee6c90b605f0807a40c82f2f5879f80f1f2dd
Author: Flavio Pompermaier <f....@gmail.com>
AuthorDate: Mon May 18 00:24:45 2020 +0200

    [FLINK-17356][jdbc][postgres] Support PK and Unique constraints
    
    This closes #11906
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 36 ++++++++++++++++++++++
 .../connector/jdbc/catalog/PostgresCatalog.java    | 14 ++++++++-
 .../connector/jdbc/catalog/PostgresTablePath.java  |  8 +++++
 .../jdbc/catalog/PostgresCatalogTestBase.java      |  8 +++--
 4 files changed, 62 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 9e27816..199d481 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.catalog;
 
 import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
@@ -50,11 +51,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -116,6 +124,34 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
 		return baseUrl;
 	}
 
+	// ------ retrieve PK constraint ------
+
+	protected UniqueConstraint getPrimaryKey(DatabaseMetaData metaData, String schema, String table) throws SQLException {
+
+		// According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+		// the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+		// We need to sort them based on the KEY_SEQ value.
+		ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+
+		List<Map.Entry<Integer, String>> columnsWithIndex = null;
+		String pkName = null;
+		while (rs.next()) {
+			String columnName = rs.getString("COLUMN_NAME");
+			pkName = rs.getString("PK_NAME");
+			int keySeq = rs.getInt("KEY_SEQ");
+			if (columnsWithIndex == null) {
+				columnsWithIndex = new ArrayList<>();
+			}
+			columnsWithIndex.add(new AbstractMap.SimpleEntry<>(Integer.valueOf(keySeq), columnName));
+		}
+		if (columnsWithIndex != null) {
+			// sort columns by KEY_SEQ
+			columnsWithIndex.sort(Comparator.comparingInt(Map.Entry::getKey));
+			List<String> cols = columnsWithIndex.stream().map(Map.Entry::getValue).collect(Collectors.toList());
+			return UniqueConstraint.primaryKey(pkName, cols);
+		}
+		return null;
+	}
 
 	// ------ table factory ------
 
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
index 31b4185..cb20fb0 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.jdbc.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
@@ -36,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -180,6 +182,8 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
 
 		String dbUrl = baseUrl + tablePath.getDatabaseName();
 		try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+			DatabaseMetaData metaData = conn.getMetaData();
+			UniqueConstraint pk = getPrimaryKey(metaData, pgPath.getPgSchemaName(), pgPath.getPgTableName());
 
 			PreparedStatement ps = conn.prepareStatement(
 				String.format("SELECT * FROM %s;", pgPath.getFullPath()));
@@ -192,9 +196,17 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
 			for (int i = 1; i <= rsmd.getColumnCount(); i++) {
 				names[i - 1] = rsmd.getColumnName(i);
 				types[i - 1] = fromJDBCType(rsmd, i);
+				if (rsmd.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+					types[i - 1] = types[i - 1].notNull();
+				}
 			}
 
-			TableSchema tableSchema = new TableSchema.Builder().fields(names, types).build();
+			TableSchema.Builder tableBuilder = new TableSchema.Builder()
+				.fields(names, types);
+			if (pk != null) {
+				tableBuilder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0]));
+			}
+			TableSchema tableSchema = tableBuilder.build();
 
 			Map<String, String> props = new HashMap<>();
 			props.put(CONNECTOR.key(), IDENTIFIER);
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePath.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePath.java
index a989024..a266867 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePath.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePath.java
@@ -64,6 +64,14 @@ public class PostgresTablePath {
 		return String.format("%s.%s", pgSchemaName, pgTableName);
 	}
 
+	public String getPgTableName() {
+		return pgTableName;
+	}
+
+	public String getPgSchemaName() {
+		return pgSchemaName;
+	}
+
 	@Override
 	public String toString() {
 		return getFullPath();
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index b4b1b44..fd916e7 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -157,9 +157,9 @@ public class PostgresCatalogTestBase {
 	public static TestTable getPrimitiveTable() {
 		return new TestTable(
 			TableSchema.builder()
-				.field("int", DataTypes.INT())
+				.field("int", DataTypes.INT().notNull())
 				.field("bytea", DataTypes.BYTES())
-				.field("short", DataTypes.SMALLINT())
+				.field("short", DataTypes.SMALLINT().notNull())
 				.field("long", DataTypes.BIGINT())
 				.field("real", DataTypes.FLOAT())
 				.field("double_precision", DataTypes.DOUBLE())
@@ -175,6 +175,7 @@ public class PostgresCatalogTestBase {
 				.field("date", DataTypes.DATE())
 				.field("time", DataTypes.TIME(0))
 				.field("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))
+				.primaryKey("test_pk", new String[]{"int", "short"})
 				.build(),
 			"int integer, " +
 				"bytea bytea, " +
@@ -193,7 +194,8 @@ public class PostgresCatalogTestBase {
 //				"timestamptz timestamptz(4), " +
 				"date date," +
 				"time time(0), " +
-				"default_numeric numeric ",
+				"default_numeric numeric, " +
+				"CONSTRAINT test_pk PRIMARY KEY (int, short)",
 			"1," +
 				"'2'," +
 				"3," +