You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/25 06:31:48 UTC

[GitHub] [flink] wuchong commented on a change in pull request #12275: [FLINK-16021][table-common] DescriptorProperties.putTableSchema does …

wuchong commented on a change in pull request #12275:
URL: https://github.com/apache/flink/pull/12275#discussion_r429746785



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
##########
@@ -241,6 +244,19 @@ public void putTableSchema(String key, TableSchema schema) {
 				Arrays.asList(WATERMARK_ROWTIME, WATERMARK_STRATEGY_EXPR, WATERMARK_STRATEGY_DATA_TYPE),
 				watermarkValues);
 		}
+
+		if (schema.getPrimaryKey().isPresent()) {
+			final UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+			final List<List<String>> uniqueConstraintValues = new ArrayList<>();
+			uniqueConstraintValues.add(Arrays.asList(
+					uniqueConstraint.getName(),
+					uniqueConstraint.getType().name(),
+					String.join(",", uniqueConstraint.getColumns())));
+			putIndexedFixedProperties(
+					key + '.' + CONSTRAINT_UNIQUE,
+					Arrays.asList(NAME, TYPE, CONSTRAINT_UNIQUE_COLUMNS),
+					uniqueConstraintValues);
+		}

Review comment:
       Because we only support primary key now. I think we can have a dedicate primary key properties, so that we don't need to handle the index. For example:
   
   ```java
   public static final String PRIMARY_KEY_NAME = "primary-key.name";
   public static final String PRIMARY_KEY_COLUMNS = "primary-key.columns";
   
   schema.getPrimaryKey().ifPresent(pk -> {
       putString(key + "." + PRIMARY_KEY_NAME, pk.getName());
       putString(key + "." + PRIMARY_KEY_COLUMNS, String.join(",", pk.getColumns()));
   });
   ```
   
   This is also helpful for users who write yaml: 
   
   ```
   tables:
     - name: TableNumber1
       type: source-table
       schema:
         primary-key
           name: constraint1
           columns: f1, f2
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
##########
@@ -610,7 +626,9 @@ public DataType getDataType(String key) {
 	public Optional<TableSchema> getOptionalTableSchema(String key) {
 		// filter for number of fields
 		final int fieldCount = properties.keySet().stream()
-			.filter((k) -> k.startsWith(key) && k.endsWith('.' + TABLE_SCHEMA_NAME))
+			.filter((k) -> k.startsWith(key)
+					// "key." is the prefix.
+					&& SCHEMA_COLUMN_NAME_SUFFIX.matcher(k.substring(key.length() + 1)).matches())

Review comment:
       We can just to exclude the primary key, then don't need the regex matching. 
   
   ```
   .filter((k) -> k.startsWith(key) && !k.startsWith(key + "." + PRIMARY_KEY) && k.endsWith('.' + NAME))
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org