You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 14:35:39 UTC
[flink] 01/03: [FLINK-17356][jdbc][postgres] Support PK and Unique
constraints
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a83ee6c90b605f0807a40c82f2f5879f80f1f2dd
Author: Flavio Pompermaier <f....@gmail.com>
AuthorDate: Mon May 18 00:24:45 2020 +0200
[FLINK-17356][jdbc][postgres] Support PK and Unique constraints
This closes #11906
---
.../jdbc/catalog/AbstractJdbcCatalog.java | 36 ++++++++++++++++++++++
.../connector/jdbc/catalog/PostgresCatalog.java | 14 ++++++++-
.../connector/jdbc/catalog/PostgresTablePath.java | 8 +++++
.../jdbc/catalog/PostgresCatalogTestBase.java | 8 +++--
4 files changed, 62 insertions(+), 4 deletions(-)
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 9e27816..199d481 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.catalog;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -50,11 +51,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -116,6 +124,34 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
return baseUrl;
}
+ // ------ retrieve PK constraint ------
+
+ protected UniqueConstraint getPrimaryKey(DatabaseMetaData metaData, String schema, String table) throws SQLException {
+
+ // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+ // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+ // We need to sort them based on the KEY_SEQ value.
+ ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+
+ List<Map.Entry<Integer, String>> columnsWithIndex = null;
+ String pkName = null;
+ while (rs.next()) {
+ String columnName = rs.getString("COLUMN_NAME");
+ pkName = rs.getString("PK_NAME");
+ int keySeq = rs.getInt("KEY_SEQ");
+ if (columnsWithIndex == null) {
+ columnsWithIndex = new ArrayList<>();
+ }
+ columnsWithIndex.add(new AbstractMap.SimpleEntry<>(Integer.valueOf(keySeq), columnName));
+ }
+ if (columnsWithIndex != null) {
+ // sort columns by KEY_SEQ
+ columnsWithIndex.sort(Comparator.comparingInt(Map.Entry::getKey));
+ List<String> cols = columnsWithIndex.stream().map(Map.Entry::getValue).collect(Collectors.toList());
+ return UniqueConstraint.primaryKey(pkName, cols);
+ }
+ return null;
+ }
// ------ table factory ------
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
index 31b4185..cb20fb0 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.jdbc.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
@@ -36,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -180,6 +182,8 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
String dbUrl = baseUrl + tablePath.getDatabaseName();
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ UniqueConstraint pk = getPrimaryKey(metaData, pgPath.getPgSchemaName(), pgPath.getPgTableName());
PreparedStatement ps = conn.prepareStatement(
String.format("SELECT * FROM %s;", pgPath.getFullPath()));
@@ -192,9 +196,17 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
names[i - 1] = rsmd.getColumnName(i);
types[i - 1] = fromJDBCType(rsmd, i);
+ if (rsmd.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+ types[i - 1] = types[i - 1].notNull();
+ }
}
- TableSchema tableSchema = new TableSchema.Builder().fields(names, types).build();
+ TableSchema.Builder tableBuilder = new TableSchema.Builder()
+ .fields(names, types);
+ if (pk != null) {
+ tableBuilder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0]));
+ }
+ TableSchema tableSchema = tableBuilder.build();
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR.key(), IDENTIFIER);
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePath.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePath.java
index a989024..a266867 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePath.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePath.java
@@ -64,6 +64,14 @@ public class PostgresTablePath {
return String.format("%s.%s", pgSchemaName, pgTableName);
}
+ public String getPgTableName() {
+ return pgTableName;
+ }
+
+ public String getPgSchemaName() {
+ return pgSchemaName;
+ }
+
@Override
public String toString() {
return getFullPath();
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index b4b1b44..fd916e7 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -157,9 +157,9 @@ public class PostgresCatalogTestBase {
public static TestTable getPrimitiveTable() {
return new TestTable(
TableSchema.builder()
- .field("int", DataTypes.INT())
+ .field("int", DataTypes.INT().notNull())
.field("bytea", DataTypes.BYTES())
- .field("short", DataTypes.SMALLINT())
+ .field("short", DataTypes.SMALLINT().notNull())
.field("long", DataTypes.BIGINT())
.field("real", DataTypes.FLOAT())
.field("double_precision", DataTypes.DOUBLE())
@@ -175,6 +175,7 @@ public class PostgresCatalogTestBase {
.field("date", DataTypes.DATE())
.field("time", DataTypes.TIME(0))
.field("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))
+ .primaryKey("test_pk", new String[]{"int", "short"})
.build(),
"int integer, " +
"bytea bytea, " +
@@ -193,7 +194,8 @@ public class PostgresCatalogTestBase {
// "timestamptz timestamptz(4), " +
"date date," +
"time time(0), " +
- "default_numeric numeric ",
+ "default_numeric numeric, " +
+ "CONSTRAINT test_pk PRIMARY KEY (int, short)",
"1," +
"'2'," +
"3," +