You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/04 10:07:15 UTC
[flink] 02/02: [FLINK-25176][table] Introduce "ALTER TABLE ... COMPACT" syntax
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3849f1194904374f1b692688d1d3f711b3be9a27
Author: Jane Chan <qi...@gmail.com>
AuthorDate: Thu Dec 30 17:13:41 2021 +0800
[FLINK-25176][table] Introduce "ALTER TABLE ... COMPACT" syntax
This closes #18236
---
.../src/main/codegen/data/Parser.tdd | 3 +
.../src/main/codegen/includes/parserImpls.ftl | 12 ++
.../flink/sql/parser/ddl/SqlAlterTableCompact.java | 54 +++++++++
.../flink/sql/parser/FlinkSqlParserImplTest.java | 15 +++
.../table/api/internal/TableEnvironmentImpl.java | 3 +
.../flink/table/catalog/ManagedTableListener.java | 4 +-
.../operations/ddl/AlterTableCompactOperation.java | 58 ++++++++++
.../operations/SqlToOperationConverter.java | 58 +++++++++-
.../operations/SqlToOperationConverterTest.java | 121 +++++++++++++++++++--
.../flink/table/api/TableEnvironmentTest.scala | 37 +++++++
10 files changed, 351 insertions(+), 14 deletions(-)
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 1fb8518..e400d3f 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -31,6 +31,7 @@
"org.apache.flink.sql.parser.ddl.SqlAlterFunction"
"org.apache.flink.sql.parser.ddl.SqlAlterTable"
"org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint"
+ "org.apache.flink.sql.parser.ddl.SqlAlterTableCompact"
"org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint"
"org.apache.flink.sql.parser.ddl.SqlAlterTableOptions"
"org.apache.flink.sql.parser.ddl.SqlAlterTableRename"
@@ -110,6 +111,7 @@
"CATALOGS"
"CHANGELOG_MODE"
"COMMENT"
+ "COMPACT"
"COLUMNS"
"DATABASES"
"ENFORCED"
@@ -184,6 +186,7 @@
"COMMAND_FUNCTION"
"COMMAND_FUNCTION_CODE"
"COMMITTED"
+ "COMPACT"
"CONDITIONAL"
"CONDITION_NUMBER"
"CONNECTION"
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 86537a7..f68c50f 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -508,6 +508,7 @@ SqlAlterTable SqlAlterTable() :
SqlIdentifier newTableIdentifier = null;
SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlNodeList propertyKeyList = SqlNodeList.EMPTY;
+ SqlNodeList partitionSpec = null;
SqlIdentifier constraintName;
SqlTableConstraint constraint;
}
@@ -556,6 +557,17 @@ SqlAlterTable SqlAlterTable() :
constraintName,
startPos.plus(getPos()));
}
+ |
+ [
+ <PARTITION>
+ { partitionSpec = new SqlNodeList(getPos());
+ PartitionSpecCommaList(partitionSpec);
+ }
+ ]
+ <COMPACT>
+ {
+ return new SqlAlterTableCompact(startPos.plus(getPos()), tableIdentifier, partitionSpec);
+ }
)
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java
new file mode 100644
index 0000000..247f4f9
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** ALTER TABLE [[catalogName.] dataBasesName].tableName [PARTITION partition_spec] COMPACT. */
+public class SqlAlterTableCompact extends SqlAlterTable {
+
+ public SqlAlterTableCompact(
+ SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) {
+ super(pos, tableName, partitionSpec);
+ }
+
+ public SqlAlterTableCompact(SqlParserPos pos, SqlIdentifier tableName) {
+ super(pos, tableName);
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(tableIdentifier, partitionSpec);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+ writer.keyword("COMPACT");
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index a6bbbb8..cc624bb 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -288,6 +288,21 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
}
@Test
+ public void testAlterTableCompact() {
+ sql("alter table t1 compact").ok("ALTER TABLE `T1` COMPACT");
+
+ sql("alter table db1.t1 compact").ok("ALTER TABLE `DB1`.`T1` COMPACT");
+
+ sql("alter table cat1.db1.t1 compact").ok("ALTER TABLE `CAT1`.`DB1`.`T1` COMPACT");
+
+ sql("alter table t1 partition(x='y',m='n') compact")
+ .ok("ALTER TABLE `T1` PARTITION (`X` = 'y', `M` = 'n') COMPACT");
+
+ sql("alter table t1 partition(^)^ compact")
+ .fails("(?s).*Encountered \"\\)\" at line 1, column 26.\n.*");
+ }
+
+ @Test
public void testCreateTable() {
final String sql =
"CREATE TABLE tbl1 (\n"
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index e163cd7..51550d8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -112,6 +112,7 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
+import org.apache.flink.table.operations.ddl.AlterTableCompactOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOperation;
import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
@@ -998,6 +999,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
catalog.dropPartition(tablePath, spec, ifExists);
}
+ } else if (alterTableOperation instanceof AlterTableCompactOperation) {
+ // TODO: FLINK-25176 work with managed table
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (TableAlreadyExistException | TableNotExistException e) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
index 211dce7..49779de 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
@@ -79,7 +79,9 @@ public class ManagedTableListener {
}
}
- private boolean isManagedTable(@Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) {
+ /** Check a resolved catalog table is Flink's managed table or not. */
+ public static boolean isManagedTable(
+ @Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) {
if (catalog == null || !catalog.supportsManagedTable()) {
// catalog not support managed table
return false;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java
new file mode 100644
index 0000000..14af950
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Operation to describe "ALTER TABLE [PARTITION partition_spec] COMPACT" statement. */
+public class AlterTableCompactOperation extends AlterTableOperation {
+
+ private final CatalogPartitionSpec partitionSpec;
+
+ public AlterTableCompactOperation(
+ ObjectIdentifier tableIdentifier, @Nullable CatalogPartitionSpec partitionSpec) {
+ super(tableIdentifier);
+ this.partitionSpec = partitionSpec;
+ }
+
+ public Map<String, String> getPartitionSpec() {
+ return partitionSpec == null
+ ? Collections.emptyMap()
+ : new LinkedHashMap<>(partitionSpec.getPartitionSpec());
+ }
+
+ @Override
+ public String asSummaryString() {
+ String spec =
+ partitionSpec == null
+ ? ""
+ : String.format(
+ "PARTITION (%s) ",
+ OperationUtils.formatPartitionSpec(partitionSpec));
+ return String.format("ALTER TABLE %s %sCOMPACT", tableIdentifier.asSummaryString(), spec);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index a946308..28837d2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -25,6 +25,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
import org.apache.flink.sql.parser.ddl.SqlAlterTable;
import org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableCompact;
import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
import org.apache.flink.sql.parser.ddl.SqlAlterTableOptions;
import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
@@ -91,10 +92,13 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ManagedTableListener;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
@@ -129,6 +133,7 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
+import org.apache.flink.table.operations.ddl.AlterTableCompactOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
@@ -169,6 +174,7 @@ import org.apache.calcite.sql.parser.SqlParser;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -484,6 +490,11 @@ public class SqlToOperationConverter {
specs.add(new CatalogPartitionSpec(dropPartitions.getPartitionKVs(i)));
}
return new DropPartitionsOperation(tableIdentifier, dropPartitions.ifExists(), specs);
+ } else if (sqlAlterTable instanceof SqlAlterTableCompact) {
+ ResolvedCatalogTable resolvedCatalogTable =
+ (ResolvedCatalogTable) optionalCatalogTable.get().getResolvedTable();
+ return convertAlterTableCompact(
+ tableIdentifier, resolvedCatalogTable, (SqlAlterTableCompact) sqlAlterTable);
} else {
throw new ValidationException(
String.format(
@@ -531,16 +542,57 @@ public class SqlToOperationConverter {
CatalogTable oldTable,
SqlAlterTableReset alterTableReset) {
Map<String, String> newOptions = new HashMap<>(oldTable.getOptions());
- // reset empty key is not allowed
+ // reset empty or 'connector' key is not allowed
Set<String> resetKeys = alterTableReset.getResetKeys();
- if (resetKeys.isEmpty()) {
- throw new ValidationException("ALTER TABLE RESET does not support empty key");
+ if (resetKeys.isEmpty() || resetKeys.contains(FactoryUtil.CONNECTOR.key())) {
+ String exMsg =
+ resetKeys.isEmpty()
+ ? "ALTER TABLE RESET does not support empty key"
+ : "ALTER TABLE RESET does not support changing 'connector'";
+ throw new ValidationException(exMsg);
}
// reset table option keys
resetKeys.forEach(newOptions::remove);
return new AlterTableOptionsOperation(tableIdentifier, oldTable.copy(newOptions));
}
+ private Operation convertAlterTableCompact(
+ ObjectIdentifier tableIdentifier,
+ ResolvedCatalogTable resolvedCatalogTable,
+ SqlAlterTableCompact alterTableCompact) {
+ Catalog catalog = catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null);
+ if (ManagedTableListener.isManagedTable(catalog, resolvedCatalogTable)) {
+ LinkedHashMap<String, String> partitionKVs = alterTableCompact.getPartitionKVs();
+ CatalogPartitionSpec partitionSpec = null;
+ if (partitionKVs != null) {
+ List<String> orderedPartitionKeys = resolvedCatalogTable.getPartitionKeys();
+ Set<String> validPartitionKeySet = new HashSet<>(orderedPartitionKeys);
+ String exMsg =
+ orderedPartitionKeys.isEmpty()
+ ? String.format("Table %s is not partitioned.", tableIdentifier)
+ : String.format(
+ "Available ordered partition columns: [%s]",
+ orderedPartitionKeys.stream()
+ .collect(Collectors.joining("', '", "'", "'")));
+ partitionKVs.forEach(
+ (partitionKey, partitionValue) -> {
+ if (!validPartitionKeySet.contains(partitionKey)) {
+ throw new ValidationException(
+ String.format(
+ "Partition column '%s' not defined in the table schema. %s",
+ partitionKey, exMsg));
+ }
+ });
+ partitionSpec = new CatalogPartitionSpec(partitionKVs);
+ }
+ return new AlterTableCompactOperation(tableIdentifier, partitionSpec);
+ }
+ throw new ValidationException(
+ String.format(
+ "ALTER TABLE COMPACT operation is not supported for non-managed table %s",
+ tableIdentifier));
+ }
+
/** Convert CREATE FUNCTION statement. */
private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
UnresolvedIdentifier unresolvedIdentifier =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 97d4ae3..9651cc6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -66,6 +66,7 @@ import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.command.ShowJarsOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
+import org.apache.flink.table.operations.ddl.AlterTableCompactOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
@@ -1167,7 +1168,7 @@ public class SqlToOperationConverterTest {
@Test
public void testAlterTable() throws Exception {
- prepareTable(false);
+ prepareNonManagedTable(false);
final String[] renameTableSqls =
new String[] {
"alter table cat1.db1.tb1 rename to tb2",
@@ -1193,6 +1194,7 @@ public class SqlToOperationConverterTest {
"alter table cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')",
SqlDialect.DEFAULT);
Map<String, String> expectedOptions = new HashMap<>();
+ expectedOptions.put("connector", "dummy");
expectedOptions.put("k", "v");
expectedOptions.put("k1", "v1");
expectedOptions.put("K2", "V2");
@@ -1201,7 +1203,15 @@ public class SqlToOperationConverterTest {
// test alter table reset
operation = parse("alter table cat1.db1.tb1 reset ('k')", SqlDialect.DEFAULT);
- assertAlterTableOptions(operation, expectedIdentifier, Collections.emptyMap());
+ assertAlterTableOptions(
+ operation, expectedIdentifier, Collections.singletonMap("connector", "dummy"));
+ assertThatThrownBy(
+ () ->
+ parse(
+ "alter table cat1.db1.tb1 reset ('connector')",
+ SqlDialect.DEFAULT))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("ALTER TABLE RESET does not support changing 'connector'");
assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset ()", SqlDialect.DEFAULT))
.isInstanceOf(ValidationException.class)
@@ -1210,7 +1220,7 @@ public class SqlToOperationConverterTest {
@Test
public void testAlterTableAddPkConstraint() throws Exception {
- prepareTable(false);
+ prepareNonManagedTable(false);
// Test alter add table constraint.
Operation operation =
parse(
@@ -1236,7 +1246,7 @@ public class SqlToOperationConverterTest {
@Test
public void testAlterTableAddPkConstraintEnforced() throws Exception {
- prepareTable(false);
+ prepareNonManagedTable(false);
// Test alter table add enforced
assertThatThrownBy(
() ->
@@ -1253,7 +1263,7 @@ public class SqlToOperationConverterTest {
@Test
public void testAlterTableAddUniqueConstraint() throws Exception {
- prepareTable(false);
+ prepareNonManagedTable(false);
// Test alter add table constraint.
assertThatThrownBy(
() ->
@@ -1266,7 +1276,7 @@ public class SqlToOperationConverterTest {
@Test
public void testAlterTableAddUniqueConstraintEnforced() throws Exception {
- prepareTable(false);
+ prepareNonManagedTable(false);
// Test alter table add enforced
assertThatThrownBy(
() ->
@@ -1279,7 +1289,7 @@ public class SqlToOperationConverterTest {
@Test
public void testAlterTableDropConstraint() throws Exception {
- prepareTable(true);
+ prepareNonManagedTable(true);
// Test alter table add enforced
Operation operation = parse("alter table tb1 drop constraint ct1", SqlDialect.DEFAULT);
assertThat(operation).isInstanceOf(AlterTableDropConstraintOperation.class);
@@ -1293,6 +1303,83 @@ public class SqlToOperationConverterTest {
}
@Test
+ public void testAlterTableCompactOnNonManagedTable() throws Exception {
+ prepareNonManagedTable(false);
+ assertThatThrownBy(() -> parse("alter table tb1 compact", SqlDialect.DEFAULT))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "ALTER TABLE COMPACT operation is not supported for non-managed table `cat1`.`db1`.`tb1`");
+ }
+
+ @Test
+ public void testAlterTableCompact() throws Exception {
+ prepareManagedTable(false);
+ Operation operation = parse("alter table tb1 compact", SqlDialect.DEFAULT);
+ assertThat(operation).isInstanceOf(AlterTableCompactOperation.class);
+ AlterTableCompactOperation compactOperation = (AlterTableCompactOperation) operation;
+
+ assertThat(compactOperation.asSummaryString())
+ .isEqualTo("ALTER TABLE cat1.db1.tb1 COMPACT");
+
+ // specify partition on a non-partitioned table
+ assertThatThrownBy(
+ () ->
+ parse(
+ "alter table tb1 partition(dt = 'a') compact",
+ SqlDialect.DEFAULT))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "Partition column 'dt' not defined in the table schema. Table `cat1`.`db1`.`tb1` is not partitioned.");
+
+ // alter a non-existed table
+ assertThatThrownBy(() -> parse("alter table tb2 compact", SqlDialect.DEFAULT))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Table `cat1`.`db1`.`tb2` doesn't exist or is a temporary table.");
+ }
+
+ @Test
+ public void testAlterTableCompactPartition() throws Exception {
+ prepareManagedTable(true);
+
+ // compact partitioned table without partition_spec
+ assertThat(parse("alter table tb1 compact", SqlDialect.DEFAULT).asSummaryString())
+ .isEqualTo("ALTER TABLE cat1.db1.tb1 COMPACT");
+
+ // compact partitioned table without full partition_spec
+ assertThat(
+ parse("alter table tb1 partition (b=1) compact", SqlDialect.DEFAULT)
+ .asSummaryString())
+ .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (b=1) COMPACT");
+
+ assertThat(
+ parse("alter table tb1 partition (c=2) compact", SqlDialect.DEFAULT)
+ .asSummaryString())
+ .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (c=2) COMPACT");
+
+ // compact partitioned table with full partition_spec
+ assertThat(
+ parse("alter table tb1 partition (b=1,c=2) compact", SqlDialect.DEFAULT)
+ .asSummaryString())
+ .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (b=1, c=2) COMPACT");
+
+ // compact partitioned table with disordered partition_spec
+ assertThat(
+ parse("alter table tb1 partition (c=2,b=1) compact", SqlDialect.DEFAULT)
+ .asSummaryString())
+ .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (c=2, b=1) COMPACT");
+
+ // compact partitioned table with a non-existed partition_spec
+ assertThatThrownBy(
+ () ->
+ parse(
+ "alter table tb1 partition (dt = 'a') compact",
+ SqlDialect.DEFAULT))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "Partition column 'dt' not defined in the table schema. Available ordered partition columns: ['b', 'c']");
+ }
+
+ @Test
public void testCreateViewWithMatchRecognize() {
Map<String, String> prop = new HashMap<>();
prop.put("connector", "values");
@@ -1534,7 +1621,16 @@ public class SqlToOperationConverterTest {
return SqlToOperationConverter.convert(planner, catalogManager, node).get();
}
- private void prepareTable(boolean hasConstraint) throws Exception {
+ private void prepareNonManagedTable(boolean hasConstraint) throws Exception {
+ prepareTable(false, false, hasConstraint);
+ }
+
+ private void prepareManagedTable(boolean hasPartition) throws Exception {
+ prepareTable(true, hasPartition, false);
+ }
+
+ private void prepareTable(boolean managedTable, boolean hasPartition, boolean hasConstraint)
+ throws Exception {
Catalog catalog = new GenericInMemoryCatalog("default", "default");
catalogManager.registerCatalog("cat1", catalog);
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
@@ -1543,14 +1639,19 @@ public class SqlToOperationConverterTest {
.column("a", DataTypes.STRING().notNull())
.column("b", DataTypes.BIGINT().notNull())
.column("c", DataTypes.BIGINT());
+ Map<String, String> options = new HashMap<>();
+ options.put("k", "v");
+ if (!managedTable) {
+ options.put("connector", "dummy");
+ }
CatalogTable catalogTable =
CatalogTable.of(
hasConstraint
? builder.primaryKeyNamed("ct1", "a", "b").build()
: builder.build(),
"tb1",
- Collections.emptyList(),
- Collections.singletonMap("k", "v"));
+ hasPartition ? Arrays.asList("b", "c") : Collections.emptyList(),
+ Collections.unmodifiableMap(options));
catalogManager.setCurrentCatalog("cat1");
catalogManager.setCurrentDatabase("db1");
ObjectPath tablePath = new ObjectPath("db1", "tb1");
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 0ec319d..8ccac9c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -280,6 +280,43 @@ class TableEnvironmentTest {
}
@Test
+ def testAlterTableCompactOnNonManagedTable(): Unit = {
+ val statement =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ tableEnv.executeSql(statement)
+
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage("ALTER TABLE COMPACT operation is not supported for " +
+ "non-managed table `default_catalog`.`default_database`.`MyTable`")
+ tableEnv.executeSql("alter table MyTable compact")
+ }
+
+ @Test
+ def testAlterTableCompactOnManagedTable(): Unit = {
+ val statement =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |)
+ """.stripMargin
+ tableEnv.executeSql(statement)
+
+ assertEquals(ResultKind.SUCCESS,
+ tableEnv.executeSql("ALTER TABLE MyTable COMPACT").getResultKind)
+ }
+
+ @Test
def testExecuteSqlWithCreateAlterDropTable(): Unit = {
val createTableStmt =
"""