You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/04 10:07:15 UTC

[flink] 02/02: [FLINK-25176][table] Introduce "ALTER TABLE ... COMPACT" syntax

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3849f1194904374f1b692688d1d3f711b3be9a27
Author: Jane Chan <qi...@gmail.com>
AuthorDate: Thu Dec 30 17:13:41 2021 +0800

    [FLINK-25176][table] Introduce "ALTER TABLE ... COMPACT" syntax
    
    This closes #18236
---
 .../src/main/codegen/data/Parser.tdd               |   3 +
 .../src/main/codegen/includes/parserImpls.ftl      |  12 ++
 .../flink/sql/parser/ddl/SqlAlterTableCompact.java |  54 +++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  15 +++
 .../table/api/internal/TableEnvironmentImpl.java   |   3 +
 .../flink/table/catalog/ManagedTableListener.java  |   4 +-
 .../operations/ddl/AlterTableCompactOperation.java |  58 ++++++++++
 .../operations/SqlToOperationConverter.java        |  58 +++++++++-
 .../operations/SqlToOperationConverterTest.java    | 121 +++++++++++++++++++--
 .../flink/table/api/TableEnvironmentTest.scala     |  37 +++++++
 10 files changed, 351 insertions(+), 14 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 1fb8518..e400d3f 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -31,6 +31,7 @@
     "org.apache.flink.sql.parser.ddl.SqlAlterFunction"
     "org.apache.flink.sql.parser.ddl.SqlAlterTable"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint"
+    "org.apache.flink.sql.parser.ddl.SqlAlterTableCompact"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableOptions"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableRename"
@@ -110,6 +111,7 @@
     "CATALOGS"
     "CHANGELOG_MODE"
     "COMMENT"
+    "COMPACT"
     "COLUMNS"
     "DATABASES"
     "ENFORCED"
@@ -184,6 +186,7 @@
     "COMMAND_FUNCTION"
     "COMMAND_FUNCTION_CODE"
     "COMMITTED"
+    "COMPACT"
     "CONDITIONAL"
     "CONDITION_NUMBER"
     "CONNECTION"
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 86537a7..f68c50f 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -508,6 +508,7 @@ SqlAlterTable SqlAlterTable() :
     SqlIdentifier newTableIdentifier = null;
     SqlNodeList propertyList = SqlNodeList.EMPTY;
     SqlNodeList propertyKeyList = SqlNodeList.EMPTY;
+    SqlNodeList partitionSpec = null;
     SqlIdentifier constraintName;
     SqlTableConstraint constraint;
 }
@@ -556,6 +557,17 @@ SqlAlterTable SqlAlterTable() :
                 constraintName,
                 startPos.plus(getPos()));
         }
+    |
+        [
+            <PARTITION>
+            {   partitionSpec = new SqlNodeList(getPos());
+                PartitionSpecCommaList(partitionSpec);
+            }
+        ]
+        <COMPACT>
+        {
+            return new SqlAlterTableCompact(startPos.plus(getPos()), tableIdentifier, partitionSpec);
+        }
     )
 }
 
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java
new file mode 100644
index 0000000..247f4f9
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** ALTER TABLE [[catalogName.] dataBasesName].tableName [PARTITION partition_spec] COMPACT. */
+public class SqlAlterTableCompact extends SqlAlterTable {
+
+    public SqlAlterTableCompact(
+            SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) {
+        super(pos, tableName, partitionSpec);
+    }
+
+    public SqlAlterTableCompact(SqlParserPos pos, SqlIdentifier tableName) {
+        super(pos, tableName);
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(tableIdentifier, partitionSpec);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("COMPACT");
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index a6bbbb8..cc624bb 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -288,6 +288,21 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
     }
 
     @Test
+    public void testAlterTableCompact() {
+        sql("alter table t1 compact").ok("ALTER TABLE `T1` COMPACT");
+
+        sql("alter table db1.t1 compact").ok("ALTER TABLE `DB1`.`T1` COMPACT");
+
+        sql("alter table cat1.db1.t1 compact").ok("ALTER TABLE `CAT1`.`DB1`.`T1` COMPACT");
+
+        sql("alter table t1 partition(x='y',m='n') compact")
+                .ok("ALTER TABLE `T1` PARTITION (`X` = 'y', `M` = 'n') COMPACT");
+
+        sql("alter table t1 partition(^)^ compact")
+                .fails("(?s).*Encountered \"\\)\" at line 1, column 26.\n.*");
+    }
+
+    @Test
     public void testCreateTable() {
         final String sql =
                 "CREATE TABLE tbl1 (\n"
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index e163cd7..51550d8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -112,6 +112,7 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
+import org.apache.flink.table.operations.ddl.AlterTableCompactOperation;
 import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableOperation;
 import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
@@ -998,6 +999,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                     for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
                         catalog.dropPartition(tablePath, spec, ifExists);
                     }
+                } else if (alterTableOperation instanceof AlterTableCompactOperation) {
+                    // TODO: FLINK-25176 work with managed table
                 }
                 return TableResultImpl.TABLE_RESULT_OK;
             } catch (TableAlreadyExistException | TableNotExistException e) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
index 211dce7..49779de 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
@@ -79,7 +79,9 @@ public class ManagedTableListener {
         }
     }
 
-    private boolean isManagedTable(@Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) {
+    /** Check a resolved catalog table is Flink's managed table or not. */
+    public static boolean isManagedTable(
+            @Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) {
         if (catalog == null || !catalog.supportsManagedTable()) {
             // catalog not support managed table
             return false;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java
new file mode 100644
index 0000000..14af950
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Operation to describe "ALTER TABLE [PARTITION partition_spec] COMPACT" statement. */
+public class AlterTableCompactOperation extends AlterTableOperation {
+
+    private final CatalogPartitionSpec partitionSpec;
+
+    public AlterTableCompactOperation(
+            ObjectIdentifier tableIdentifier, @Nullable CatalogPartitionSpec partitionSpec) {
+        super(tableIdentifier);
+        this.partitionSpec = partitionSpec;
+    }
+
+    public Map<String, String> getPartitionSpec() {
+        return partitionSpec == null
+                ? Collections.emptyMap()
+                : new LinkedHashMap<>(partitionSpec.getPartitionSpec());
+    }
+
+    @Override
+    public String asSummaryString() {
+        String spec =
+                partitionSpec == null
+                        ? ""
+                        : String.format(
+                                "PARTITION (%s) ",
+                                OperationUtils.formatPartitionSpec(partitionSpec));
+        return String.format("ALTER TABLE %s %sCOMPACT", tableIdentifier.asSummaryString(), spec);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index a946308..28837d2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -25,6 +25,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
 import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
 import org.apache.flink.sql.parser.ddl.SqlAlterTable;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableCompact;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableOptions;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
@@ -91,10 +92,13 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
 import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ManagedTableListener;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.operations.BeginStatementSetOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
@@ -129,6 +133,7 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
+import org.apache.flink.table.operations.ddl.AlterTableCompactOperation;
 import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
@@ -169,6 +174,7 @@ import org.apache.calcite.sql.parser.SqlParser;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -484,6 +490,11 @@ public class SqlToOperationConverter {
                 specs.add(new CatalogPartitionSpec(dropPartitions.getPartitionKVs(i)));
             }
             return new DropPartitionsOperation(tableIdentifier, dropPartitions.ifExists(), specs);
+        } else if (sqlAlterTable instanceof SqlAlterTableCompact) {
+            ResolvedCatalogTable resolvedCatalogTable =
+                    (ResolvedCatalogTable) optionalCatalogTable.get().getResolvedTable();
+            return convertAlterTableCompact(
+                    tableIdentifier, resolvedCatalogTable, (SqlAlterTableCompact) sqlAlterTable);
         } else {
             throw new ValidationException(
                     String.format(
@@ -531,16 +542,57 @@ public class SqlToOperationConverter {
             CatalogTable oldTable,
             SqlAlterTableReset alterTableReset) {
         Map<String, String> newOptions = new HashMap<>(oldTable.getOptions());
-        // reset empty key is not allowed
+        // reset empty or 'connector' key is not allowed
         Set<String> resetKeys = alterTableReset.getResetKeys();
-        if (resetKeys.isEmpty()) {
-            throw new ValidationException("ALTER TABLE RESET does not support empty key");
+        if (resetKeys.isEmpty() || resetKeys.contains(FactoryUtil.CONNECTOR.key())) {
+            String exMsg =
+                    resetKeys.isEmpty()
+                            ? "ALTER TABLE RESET does not support empty key"
+                            : "ALTER TABLE RESET does not support changing 'connector'";
+            throw new ValidationException(exMsg);
         }
         // reset table option keys
         resetKeys.forEach(newOptions::remove);
         return new AlterTableOptionsOperation(tableIdentifier, oldTable.copy(newOptions));
     }
 
+    private Operation convertAlterTableCompact(
+            ObjectIdentifier tableIdentifier,
+            ResolvedCatalogTable resolvedCatalogTable,
+            SqlAlterTableCompact alterTableCompact) {
+        Catalog catalog = catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null);
+        if (ManagedTableListener.isManagedTable(catalog, resolvedCatalogTable)) {
+            LinkedHashMap<String, String> partitionKVs = alterTableCompact.getPartitionKVs();
+            CatalogPartitionSpec partitionSpec = null;
+            if (partitionKVs != null) {
+                List<String> orderedPartitionKeys = resolvedCatalogTable.getPartitionKeys();
+                Set<String> validPartitionKeySet = new HashSet<>(orderedPartitionKeys);
+                String exMsg =
+                        orderedPartitionKeys.isEmpty()
+                                ? String.format("Table %s is not partitioned.", tableIdentifier)
+                                : String.format(
+                                        "Available ordered partition columns: [%s]",
+                                        orderedPartitionKeys.stream()
+                                                .collect(Collectors.joining("', '", "'", "'")));
+                partitionKVs.forEach(
+                        (partitionKey, partitionValue) -> {
+                            if (!validPartitionKeySet.contains(partitionKey)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Partition column '%s' not defined in the table schema. %s",
+                                                partitionKey, exMsg));
+                            }
+                        });
+                partitionSpec = new CatalogPartitionSpec(partitionKVs);
+            }
+            return new AlterTableCompactOperation(tableIdentifier, partitionSpec);
+        }
+        throw new ValidationException(
+                String.format(
+                        "ALTER TABLE COMPACT operation is not supported for non-managed table %s",
+                        tableIdentifier));
+    }
+
     /** Convert CREATE FUNCTION statement. */
     private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
         UnresolvedIdentifier unresolvedIdentifier =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 97d4ae3..9651cc6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -66,6 +66,7 @@ import org.apache.flink.table.operations.command.SetOperation;
 import org.apache.flink.table.operations.command.ShowJarsOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
+import org.apache.flink.table.operations.ddl.AlterTableCompactOperation;
 import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
@@ -1167,7 +1168,7 @@ public class SqlToOperationConverterTest {
 
     @Test
     public void testAlterTable() throws Exception {
-        prepareTable(false);
+        prepareNonManagedTable(false);
         final String[] renameTableSqls =
                 new String[] {
                     "alter table cat1.db1.tb1 rename to tb2",
@@ -1193,6 +1194,7 @@ public class SqlToOperationConverterTest {
                         "alter table cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')",
                         SqlDialect.DEFAULT);
         Map<String, String> expectedOptions = new HashMap<>();
+        expectedOptions.put("connector", "dummy");
         expectedOptions.put("k", "v");
         expectedOptions.put("k1", "v1");
         expectedOptions.put("K2", "V2");
@@ -1201,7 +1203,15 @@ public class SqlToOperationConverterTest {
 
         // test alter table reset
         operation = parse("alter table cat1.db1.tb1 reset ('k')", SqlDialect.DEFAULT);
-        assertAlterTableOptions(operation, expectedIdentifier, Collections.emptyMap());
+        assertAlterTableOptions(
+                operation, expectedIdentifier, Collections.singletonMap("connector", "dummy"));
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table cat1.db1.tb1 reset ('connector')",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("ALTER TABLE RESET does not support changing 'connector'");
 
         assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset ()", SqlDialect.DEFAULT))
                 .isInstanceOf(ValidationException.class)
@@ -1210,7 +1220,7 @@ public class SqlToOperationConverterTest {
 
     @Test
     public void testAlterTableAddPkConstraint() throws Exception {
-        prepareTable(false);
+        prepareNonManagedTable(false);
         // Test alter add table constraint.
         Operation operation =
                 parse(
@@ -1236,7 +1246,7 @@ public class SqlToOperationConverterTest {
 
     @Test
     public void testAlterTableAddPkConstraintEnforced() throws Exception {
-        prepareTable(false);
+        prepareNonManagedTable(false);
         // Test alter table add enforced
         assertThatThrownBy(
                         () ->
@@ -1253,7 +1263,7 @@ public class SqlToOperationConverterTest {
 
     @Test
     public void testAlterTableAddUniqueConstraint() throws Exception {
-        prepareTable(false);
+        prepareNonManagedTable(false);
         // Test alter add table constraint.
         assertThatThrownBy(
                         () ->
@@ -1266,7 +1276,7 @@ public class SqlToOperationConverterTest {
 
     @Test
     public void testAlterTableAddUniqueConstraintEnforced() throws Exception {
-        prepareTable(false);
+        prepareNonManagedTable(false);
         // Test alter table add enforced
         assertThatThrownBy(
                         () ->
@@ -1279,7 +1289,7 @@ public class SqlToOperationConverterTest {
 
     @Test
     public void testAlterTableDropConstraint() throws Exception {
-        prepareTable(true);
+        prepareNonManagedTable(true);
         // Test alter table add enforced
         Operation operation = parse("alter table tb1 drop constraint ct1", SqlDialect.DEFAULT);
         assertThat(operation).isInstanceOf(AlterTableDropConstraintOperation.class);
@@ -1293,6 +1303,83 @@ public class SqlToOperationConverterTest {
     }
 
     @Test
+    public void testAlterTableCompactOnNonManagedTable() throws Exception {
+        prepareNonManagedTable(false);
+        assertThatThrownBy(() -> parse("alter table tb1 compact", SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "ALTER TABLE COMPACT operation is not supported for non-managed table `cat1`.`db1`.`tb1`");
+    }
+
+    @Test
+    public void testAlterTableCompact() throws Exception {
+        prepareManagedTable(false);
+        Operation operation = parse("alter table tb1 compact", SqlDialect.DEFAULT);
+        assertThat(operation).isInstanceOf(AlterTableCompactOperation.class);
+        AlterTableCompactOperation compactOperation = (AlterTableCompactOperation) operation;
+
+        assertThat(compactOperation.asSummaryString())
+                .isEqualTo("ALTER TABLE cat1.db1.tb1 COMPACT");
+
+        // specify partition on a non-partitioned table
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 partition(dt = 'a') compact",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "Partition column 'dt' not defined in the table schema. Table `cat1`.`db1`.`tb1` is not partitioned.");
+
+        // alter a non-existed table
+        assertThatThrownBy(() -> parse("alter table tb2 compact", SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage("Table `cat1`.`db1`.`tb2` doesn't exist or is a temporary table.");
+    }
+
+    @Test
+    public void testAlterTableCompactPartition() throws Exception {
+        prepareManagedTable(true);
+
+        // compact partitioned table without partition_spec
+        assertThat(parse("alter table tb1 compact", SqlDialect.DEFAULT).asSummaryString())
+                .isEqualTo("ALTER TABLE cat1.db1.tb1 COMPACT");
+
+        // compact partitioned table without full partition_spec
+        assertThat(
+                        parse("alter table tb1 partition (b=1) compact", SqlDialect.DEFAULT)
+                                .asSummaryString())
+                .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (b=1) COMPACT");
+
+        assertThat(
+                        parse("alter table tb1 partition (c=2) compact", SqlDialect.DEFAULT)
+                                .asSummaryString())
+                .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (c=2) COMPACT");
+
+        // compact partitioned table with full partition_spec
+        assertThat(
+                        parse("alter table tb1 partition (b=1,c=2) compact", SqlDialect.DEFAULT)
+                                .asSummaryString())
+                .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (b=1, c=2) COMPACT");
+
+        // compact partitioned table with disordered partition_spec
+        assertThat(
+                        parse("alter table tb1 partition (c=2,b=1) compact", SqlDialect.DEFAULT)
+                                .asSummaryString())
+                .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (c=2, b=1) COMPACT");
+
+        // compact partitioned table with a non-existed partition_spec
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 partition (dt = 'a') compact",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "Partition column 'dt' not defined in the table schema. Available ordered partition columns: ['b', 'c']");
+    }
+
+    @Test
     public void testCreateViewWithMatchRecognize() {
         Map<String, String> prop = new HashMap<>();
         prop.put("connector", "values");
@@ -1534,7 +1621,16 @@ public class SqlToOperationConverterTest {
         return SqlToOperationConverter.convert(planner, catalogManager, node).get();
     }
 
-    private void prepareTable(boolean hasConstraint) throws Exception {
+    private void prepareNonManagedTable(boolean hasConstraint) throws Exception {
+        prepareTable(false, false, hasConstraint);
+    }
+
+    private void prepareManagedTable(boolean hasPartition) throws Exception {
+        prepareTable(true, hasPartition, false);
+    }
+
+    private void prepareTable(boolean managedTable, boolean hasPartition, boolean hasConstraint)
+            throws Exception {
         Catalog catalog = new GenericInMemoryCatalog("default", "default");
         catalogManager.registerCatalog("cat1", catalog);
         catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
@@ -1543,14 +1639,19 @@ public class SqlToOperationConverterTest {
                         .column("a", DataTypes.STRING().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
                         .column("c", DataTypes.BIGINT());
+        Map<String, String> options = new HashMap<>();
+        options.put("k", "v");
+        if (!managedTable) {
+            options.put("connector", "dummy");
+        }
         CatalogTable catalogTable =
                 CatalogTable.of(
                         hasConstraint
                                 ? builder.primaryKeyNamed("ct1", "a", "b").build()
                                 : builder.build(),
                         "tb1",
-                        Collections.emptyList(),
-                        Collections.singletonMap("k", "v"));
+                        hasPartition ? Arrays.asList("b", "c") : Collections.emptyList(),
+                        Collections.unmodifiableMap(options));
         catalogManager.setCurrentCatalog("cat1");
         catalogManager.setCurrentDatabase("db1");
         ObjectPath tablePath = new ObjectPath("db1", "tb1");
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 0ec319d..8ccac9c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -280,6 +280,43 @@ class TableEnvironmentTest {
   }
 
   @Test
+  def testAlterTableCompactOnNonManagedTable(): Unit = {
+    val statement =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) WITH (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(statement)
+
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage("ALTER TABLE COMPACT operation is not supported for " +
+          "non-managed table `default_catalog`.`default_database`.`MyTable`")
+    tableEnv.executeSql("alter table MyTable compact")
+  }
+
+  @Test
+  def testAlterTableCompactOnManagedTable(): Unit = {
+    val statement =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |)
+      """.stripMargin
+      tableEnv.executeSql(statement)
+
+    assertEquals(ResultKind.SUCCESS,
+      tableEnv.executeSql("ALTER TABLE MyTable COMPACT").getResultKind)
+  }
+
+  @Test
   def testExecuteSqlWithCreateAlterDropTable(): Unit = {
     val createTableStmt =
       """