You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/10 00:55:18 UTC

[GitHub] [iceberg] jackye1995 commented on a change in pull request #2410: Flink: Support SQL primary key

jackye1995 commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r628987753



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -61,7 +65,22 @@ public static Schema convert(TableSchema schema) {
     RowType root = (RowType) schemaType;
     Type converted = root.accept(new FlinkTypeToType(root));
 
-    return new Schema(converted.asStructType().fields());
+    Schema iSchema = new Schema(converted.asStructType().fields());
+    return freshIdentifierFieldIds(iSchema, schema);
+  }
+
+  private static Schema freshIdentifierFieldIds(Schema iSchema, TableSchema schema) {
+    // Locate the identifier field id list.
+    Set<Integer> identifierFieldIds = Sets.newHashSet();
+    if (schema.getPrimaryKey().isPresent()) {
+      for (String column : schema.getPrimaryKey().get().getColumns()) {
+        Types.NestedField field = iSchema.findField(column);
+        Preconditions.checkNotNull(field, "Column %s does not found in schema %s", column, iSchema);

Review comment:
       nit: error message format should follow `Cannot ...`, such as `Cannot find field ID for primary key column %s in schema %s`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org