You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/28 22:40:11 UTC

[GitHub] [iceberg] rdblue commented on a change in pull request #1393: Flink: Support creating table and altering table in Flink SQL

rdblue commented on a change in pull request #1393:
URL: https://github.com/apache/iceberg/pull/1393#discussion_r479562811



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -277,20 +287,29 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno
   }
 
   @Override
-  public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
-    try {
-      Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
-      TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+  public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+    Table table = getIcebergTable(tablePath);
+    return toCatalogTable(table);
+  }
 
-      // NOTE: We can not create a IcebergCatalogTable, because Flink optimizer may use CatalogTableImpl to copy a new
-      // catalog table.
-      // Let's re-loading table from Iceberg catalog when creating source/sink operators.
-      return new CatalogTableImpl(tableSchema, table.properties(), null);
+  private Table getIcebergTable(ObjectPath tablePath) throws TableNotExistException {
+    try {
+      return icebergCatalog.loadTable(toIdentifier(tablePath));
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
       throw new TableNotExistException(getName(), tablePath, e);
     }
   }
 
+  private CatalogTable toCatalogTable(Table table) {
+    TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+    List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
+
+    // NOTE: We can not create a IcebergCatalogTable, because Flink optimizer may use CatalogTableImpl to copy a new
+    // catalog table.
+    // Let's re-loading table from Iceberg catalog when creating source/sink operators.
+    return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);

Review comment:
       What is null? Could you add a comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org