You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/25 09:33:48 UTC

[GitHub] [flink] luoyuxia commented on a diff in pull request #19558: [FLINK-27242][table] Support RENAME PARTITION statement for partitioned table

luoyuxia commented on code in PR #19558:
URL: https://github.com/apache/flink/pull/19558#discussion_r857413479


##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -338,6 +338,21 @@ void testAlterTableCompact() {
                 .fails("(?s).*Encountered \"\\)\" at line 1, column 26.\n.*");
     }
 
+    @Test
+    public void testAlterPartitionRename() {
+        sql("alter table tbl partition (p=1) rename to partition (p=2)")
+                .ok(
+                        "ALTER TABLE `TBL` PARTITION (`P` = 1)\n"
+                                + "RENAME TO\n"
+                                + "PARTITION (`P` = 2)");
+
+        sql("alter table tbl rename to partition (p=2)")

Review Comment:
   the sql can't be parsed



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -543,6 +550,65 @@ private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
         }
     }
 
+    private Operation convertPartitionRename(
+            ObjectIdentifier tableIdentifier,
+            ResolvedCatalogTable resolvedTable,
+            SqlAlterPartitionRename partitionRename) {
+        List<String> partitionKeys = resolvedTable.getPartitionKeys();
+        LinkedHashMap<String, String> oldPartKVs = partitionRename.getPartitionKVs();
+        LinkedHashMap<String, String> newPartKVs = partitionRename.getNewPartitionKVs();
+        // validate the partition spec
+        validatePartitionSpec(tableIdentifier, partitionKeys, oldPartKVs);
+        validatePartitionSpec(tableIdentifier, partitionKeys, newPartKVs);
+
+        CatalogPartitionSpec oldPartitionSpec = new CatalogPartitionSpec(oldPartKVs);
+        CatalogPartitionSpec newPartitionSpec = new CatalogPartitionSpec(newPartKVs);
+        CatalogPartition catalogPartition =
+                catalogManager
+                        .getPartition(tableIdentifier, oldPartitionSpec)
+                        .orElseThrow(
+                                () ->
+                                        new ValidationException(
+                                                String.format(
+                                                        "Partition %s of table %s doesn't exist.",
+                                                        oldPartitionSpec.getPartitionSpec(),
+                                                        tableIdentifier)));
+        return new PartitionRenameOperation(
+                tableIdentifier, oldPartitionSpec, newPartitionSpec, catalogPartition.copy());

Review Comment:
   why `catalogPartition.copy()`? do we need copy `catalogPartition`?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -986,6 +987,20 @@ public TableResultInternal executeInternal(Operation operation) {
                     for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
                         catalog.dropPartition(tablePath, spec, ifExists);
                     }
+                } else if (alterTableOperation instanceof PartitionRenameOperation) {
+                    PartitionRenameOperation partitionRenameOperation =
+                            (PartitionRenameOperation) alterTableOperation;
+                    ObjectPath tablePath =
+                            partitionRenameOperation.getTableIdentifier().toObjectPath();
+                    // drop old partition first, require old partition exist
+                    catalog.dropPartition(

Review Comment:
   For `PartitionRenameOperation`, I suggest to make catalog itself to deal with it insteading always drop partition +  add partition. 
   For example, Hive provides renamePartition API. Although it also first drops and then adds partition, but it does more things such like update added partition to refer to the old partition's location.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org