You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/19 04:34:20 UTC

[GitHub] [flink] wuchong commented on a change in pull request #11906: [FLINK-17356][jdbc][postgres] Support PK and Unique constraints

wuchong commented on a change in pull request #11906:
URL: https://github.com/apache/flink/pull/11906#discussion_r427022575



##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -116,6 +122,36 @@ public String getBaseUrl() {
 		return baseUrl;
 	}
 
+	// ------ Postgres default objects that shouldn't be exposed to users ------
+
+	/**
+	 * Retrieve the list of system schemas to ignore.
+	 */
+	public abstract Set<String> getBuiltinSchemas();
+
+	/**
+	 * Retrieve the list of system database to ignore.
+	 */
+	public abstract Set<String> getBuiltinDatabases();
+
+	// ------ retrieve PK constraint ------
+
+	protected Map.Entry<String, List<String>> getPrimaryKey(DatabaseMetaData metaData, PostgresTablePath pgPath) throws SQLException {

Review comment:
       If we want to move this method into the base class, I would suggest to change the signature into 
   
   ```java
   UniqueConstraint getPrimaryKey(DatabaseMetaData metaData, String schema, String table)
   ```
   
   1. `UniqueConstraint` is a standard public API to describe primary key which is exposed by Table API.
   2. `PostgresTablePath` is a postgres speicifc class which shouldn't be in the base class.  

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -116,6 +122,36 @@ public String getBaseUrl() {
 		return baseUrl;
 	}
 
+	// ------ Postgres default objects that shouldn't be exposed to users ------
+
+	/**
+	 * Retrieve the list of system schemas to ignore.
+	 */
+	public abstract Set<String> getBuiltinSchemas();
+
+	/**
+	 * Retrieve the list of system database to ignore.
+	 */
+	public abstract Set<String> getBuiltinDatabases();

Review comment:
       I think we don't need to move this interfaces into base classes and `JdbcCatalog`, because they are not needed in `getPrimaryKey`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/Constraint.java
##########
@@ -61,6 +61,6 @@
 	 */
 	enum ConstraintType {
 		PRIMARY_KEY,
-		UNIQUE_KEY
+		UNIQUE

Review comment:
       Could you revert this? We can discuss this and change it when we support UNIQUE.. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -116,6 +122,36 @@ public String getBaseUrl() {
 		return baseUrl;
 	}
 
+	// ------ Postgres default objects that shouldn't be exposed to users ------
+
+	/**
+	 * Retrieve the list of system schemas to ignore.
+	 */
+	public abstract Set<String> getBuiltinSchemas();
+
+	/**
+	 * Retrieve the list of system database to ignore.
+	 */
+	public abstract Set<String> getBuiltinDatabases();
+
+	// ------ retrieve PK constraint ------
+
+	protected Map.Entry<String, List<String>> getPrimaryKey(DatabaseMetaData metaData, PostgresTablePath pgPath) throws SQLException {
+		ResultSet rs = metaData.getPrimaryKeys(null,  pgPath.getPgSchemaName(), pgPath.getPgTableName());
+		Map.Entry<String, List<String>> ret = null;
+		while (rs.next()) {
+			String schema = rs.getString("table_schem");
+			String columnName = rs.getString("column_name");
+			String pkName = rs.getString("pk_name");

Review comment:
       use upper case ?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -116,6 +122,36 @@ public String getBaseUrl() {
 		return baseUrl;
 	}
 
+	// ------ Postgres default objects that shouldn't be exposed to users ------
+
+	/**
+	 * Retrieve the list of system schemas to ignore.
+	 */
+	public abstract Set<String> getBuiltinSchemas();
+
+	/**
+	 * Retrieve the list of system database to ignore.
+	 */
+	public abstract Set<String> getBuiltinDatabases();
+
+	// ------ retrieve PK constraint ------
+
+	protected Map.Entry<String, List<String>> getPrimaryKey(DatabaseMetaData metaData, PostgresTablePath pgPath) throws SQLException {
+		ResultSet rs = metaData.getPrimaryKeys(null,  pgPath.getPgSchemaName(), pgPath.getPgTableName());
+		Map.Entry<String, List<String>> ret = null;
+		while (rs.next()) {
+			String schema = rs.getString("table_schem");
+			String columnName = rs.getString("column_name");
+			String pkName = rs.getString("pk_name");
+			if (!getBuiltinSchemas().contains(schema)) {
+				if (ret == null) {
+					ret = new AbstractMap.SimpleEntry<>(pkName, new ArrayList<>());
+				}
+				ret.getValue().add(columnName);

Review comment:
       According to the Javadoc of `java.sql.DatabaseMetaData#getPrimaryKeys`, the returned primary key columns are ordered by COLUMN_NAME, not KEY_SEQ. We may need to sort them again based on the KEY_SEQ. I know that posgres return values ordered by the KEY_SEQ, but not all the dialects do like this.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -116,6 +122,36 @@ public String getBaseUrl() {
 		return baseUrl;
 	}
 
+	// ------ Postgres default objects that shouldn't be exposed to users ------
+
+	/**
+	 * Retrieve the list of system schemas to ignore.
+	 */
+	public abstract Set<String> getBuiltinSchemas();
+
+	/**
+	 * Retrieve the list of system database to ignore.
+	 */
+	public abstract Set<String> getBuiltinDatabases();
+
+	// ------ retrieve PK constraint ------
+
+	protected Map.Entry<String, List<String>> getPrimaryKey(DatabaseMetaData metaData, PostgresTablePath pgPath) throws SQLException {
+		ResultSet rs = metaData.getPrimaryKeys(null,  pgPath.getPgSchemaName(), pgPath.getPgTableName());
+		Map.Entry<String, List<String>> ret = null;
+		while (rs.next()) {
+			String schema = rs.getString("table_schem");
+			String columnName = rs.getString("column_name");
+			String pkName = rs.getString("pk_name");
+			if (!getBuiltinSchemas().contains(schema)) {

Review comment:
       Don't need to call `getBuiltinSchemas().contains(..)`, because the `schema` is must not be in builtin shemas, it must be the schema of users's table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org