You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/17 13:07:43 UTC
[flink] branch master updated: [FLINK-17449][sql-parser-hive]
Implement ADD/DROP partitions
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d3c3909 [FLINK-17449][sql-parser-hive] Implement ADD/DROP partitions
d3c3909 is described below
commit d3c39090a73aeb928c78ecd63e1fd6190c0df1f1
Author: Rui Li <li...@apache.org>
AuthorDate: Sun May 17 21:07:07 2020 +0800
[FLINK-17449][sql-parser-hive] Implement ADD/DROP partitions
This closes #12195
---
.../flink/connectors/hive/HiveDialectTest.java | 35 ++++++--
.../src/main/codegen/data/Parser.tdd | 2 +
.../src/main/codegen/includes/parserImpls.ftl | 79 +++++++++++++++-
.../sql/parser/hive/ddl/SqlAddHivePartitions.java | 85 ++++++++++++++++++
.../parser/hive/FlinkHiveSqlParserImplTest.java | 27 ++++++
.../flink/sql/parser/ddl/SqlAddPartitions.java | 100 +++++++++++++++++++++
.../flink/sql/parser/ddl/SqlDropPartitions.java | 87 ++++++++++++++++++
.../table/api/internal/TableEnvironmentImpl.java | 21 +++++
.../operations/ddl/AddPartitionsOperation.java | 74 +++++++++++++++
.../operations/ddl/DropPartitionsOperation.java | 60 +++++++++++++
.../operations/SqlToOperationConverter.java | 22 +++++
11 files changed, 582 insertions(+), 10 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
index b85309b..9b6df99 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogPartitionImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
@@ -60,7 +59,6 @@ import org.junit.Test;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -302,8 +300,7 @@ public class HiveDialectTest {
// add/replace columns cascade
tableEnv.executeSql("create table tbl2 (x int) partitioned by (dt date,id bigint)");
- ObjectPath tablePath2 = new ObjectPath("default", "tbl2");
- // TODO: use DDL to add partitions once we support it
+ tableEnv.executeSql("alter table tbl2 add partition (dt='2020-01-23',id=1) partition (dt='2020-04-24',id=2)");
CatalogPartitionSpec partitionSpec1 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
put("dt", "2020-01-23");
put("id", "1");
@@ -312,9 +309,8 @@ public class HiveDialectTest {
put("dt", "2020-04-24");
put("id", "2");
}});
- hiveCatalog.createPartition(tablePath2, partitionSpec1, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
- hiveCatalog.createPartition(tablePath2, partitionSpec2, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
tableEnv.executeSql("alter table tbl2 replace columns (ti tinyint,d decimal) cascade");
+ ObjectPath tablePath2 = new ObjectPath("default", "tbl2");
hiveTable = hiveCatalog.getHiveTable(tablePath2);
Partition hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
assertEquals(2, hivePartition.getSd().getColsSize());
@@ -344,7 +340,7 @@ public class HiveDialectTest {
@Test
public void testAlterPartition() throws Exception {
tableEnv.executeSql("create table tbl (x tinyint,y string) partitioned by (p1 bigint,p2 date)");
- // TODO: use DDL to add partitions once we support it
+ tableEnv.executeSql("alter table tbl add partition (p1=1000,p2='2020-05-01') partition (p1=2000,p2='2020-01-01')");
CatalogPartitionSpec spec1 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
put("p1", "1000");
put("p2", "2020-05-01");
@@ -354,8 +350,6 @@ public class HiveDialectTest {
put("p2", "2020-01-01");
}});
ObjectPath tablePath = new ObjectPath("default", "tbl");
- hiveCatalog.createPartition(tablePath, spec1, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
- hiveCatalog.createPartition(tablePath, spec2, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
@@ -438,6 +432,29 @@ public class HiveDialectTest {
assertEquals(DEFAULT_BUILTIN_DATABASE, databases.get(0).toString());
}
+ @Test
+ public void testAddDropPartitions() throws Exception {
+ tableEnv.executeSql("create table tbl (x int,y binary) partitioned by (dt date,country string)");
+ tableEnv.executeSql("alter table tbl add partition (dt='2020-04-30',country='china') partition (dt='2020-04-30',country='us')");
+
+ ObjectPath tablePath = new ObjectPath("default", "tbl");
+ assertEquals(2, hiveCatalog.listPartitions(tablePath).size());
+
+ String partLocation = warehouse + "/part3_location";
+ tableEnv.executeSql(String.format(
+ "alter table tbl add partition (dt='2020-05-01',country='belgium') location '%s'", partLocation));
+ Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+ CatalogPartitionSpec spec = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
+ put("dt", "2020-05-01");
+ put("country", "belgium");
+ }});
+ Partition hivePartition = hiveCatalog.getHivePartition(hiveTable, spec);
+ assertEquals(partLocation, locationPath(hivePartition.getSd().getLocation()));
+
+ tableEnv.executeSql("alter table tbl drop partition (dt='2020-04-30',country='china'),partition (dt='2020-05-01',country='belgium')");
+ assertEquals(1, hiveCatalog.listPartitions(tablePath).size());
+ }
+
private static String locationPath(String locationURI) throws URISyntaxException {
return new URI(locationURI).getPath();
}
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
index 0657f64..e000d34 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
@@ -24,6 +24,7 @@
# Please keep the import classes in alphabetical order if new class is added.
imports: [
"org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils"
+ "org.apache.flink.sql.parser.hive.ddl.SqlAddHivePartitions"
"org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseLocation"
"org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner"
"org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseProps"
@@ -65,6 +66,7 @@
"org.apache.flink.sql.parser.ddl.SqlCreateView"
"org.apache.flink.sql.parser.ddl.SqlDropDatabase"
"org.apache.flink.sql.parser.ddl.SqlDropFunction"
+ "org.apache.flink.sql.parser.ddl.SqlDropPartitions"
"org.apache.flink.sql.parser.ddl.SqlDropTable"
"org.apache.flink.sql.parser.ddl.SqlDropView"
"org.apache.flink.sql.parser.ddl.SqlTableColumn"
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index 4aba880..2ffc233 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -1136,12 +1136,32 @@ SqlAlterTable SqlAlterTable() :
}
)
|
- <ADD> <COLUMNS>
+ <ADD>
+ (
+ <COLUMNS>
+ {
+ EnsureAlterTableOnly(partitionSpec, "Add columns");
+ return SqlAlterHiveTableAddReplaceColumn(startPos, tableIdentifier, false);
+ }
+ |
+ [ <IF> <NOT> <EXISTS> { ifNotExists = true; } ]
+ {
+ EnsureAlterTableOnly(partitionSpec, "Add partitions");
+ return SqlAddHivePartitions(startPos, tableIdentifier, ifNotExists);
+ }
+ )
{
EnsureAlterTableOnly(partitionSpec, "Add columns");
return SqlAlterHiveTableAddReplaceColumn(startPos, tableIdentifier, false);
}
|
+ <DROP>
+ [ <IF> <EXISTS> { ifExists = true; } ]
+ {
+ EnsureAlterTableOnly(partitionSpec, "Drop partitions");
+ return SqlDropPartitions(startPos, tableIdentifier, ifExists);
+ }
+ |
<REPLACE> <COLUMNS>
{
EnsureAlterTableOnly(partitionSpec, "Replace columns");
@@ -1368,3 +1388,60 @@ SqlAlterView SqlAlterView() :
}
)
}
+
+/**
+ * Hive syntax:
+ *
+ * ALTER TABLE table_name ADD [IF NOT EXISTS]
+ * PARTITION partition_spec [LOCATION 'location'][PARTITION partition_spec [LOCATION 'location']][...];
+ */
+SqlAlterTable SqlAddHivePartitions(SqlParserPos startPos, SqlIdentifier tableIdentifier, boolean ifNotExists) :
+{
+ List<SqlNodeList> partSpecs = new ArrayList();
+ List<SqlCharStringLiteral> partLocations = new ArrayList();
+ SqlNodeList partSpec;
+ SqlCharStringLiteral partLocation;
+}
+{
+ (
+ <PARTITION>
+ {
+ partSpec = new SqlNodeList(getPos());
+ partLocation = null;
+ PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec);
+ }
+ [ <LOCATION> <QUOTED_STRING> { partLocation = createStringLiteral(token.image, getPos()); } ]
+ { partSpecs.add(partSpec); partLocations.add(partLocation); }
+ )+
+ { return new SqlAddHivePartitions(startPos.plus(getPos()), tableIdentifier, ifNotExists, partSpecs, partLocations); }
+}
+
+/**
+ * Hive syntax:
+ *
+ * ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec[, PARTITION partition_spec, ...]
+ * [IGNORE PROTECTION] [PURGE];
+ */
+SqlAlterTable SqlDropPartitions(SqlParserPos startPos, SqlIdentifier tableIdentifier, boolean ifExists) :
+{
+ List<SqlNodeList> partSpecs = new ArrayList();
+ SqlNodeList partSpec;
+}
+{
+ <PARTITION>
+ {
+ partSpec = new SqlNodeList(getPos());
+ PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec);
+ partSpecs.add(partSpec);
+ }
+ (
+ <COMMA>
+ <PARTITION>
+ {
+ partSpec = new SqlNodeList(getPos());
+ PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec);
+ partSpecs.add(partSpec);
+ }
+ )*
+ { return new SqlDropPartitions(startPos.plus(getPos()), tableIdentifier, ifExists, partSpecs); }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
new file mode 100644
index 0000000..d003f6e
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlAddPartitions;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Add partitions to a Hive table.
+ *
+ * <p>Hive syntax:
+ * ALTER TABLE table_name ADD [IF NOT EXISTS]
+ * PARTITION partition_spec [LOCATION 'location'][PARTITION partition_spec [LOCATION 'location']][...];
+ */
+public class SqlAddHivePartitions extends SqlAddPartitions {
+
+ private final List<SqlCharStringLiteral> partLocations;
+
+ public SqlAddHivePartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifNotExists,
+ List<SqlNodeList> partSpecs, List<SqlCharStringLiteral> partLocations) {
+ super(pos, tableName, ifNotExists, partSpecs, toProps(partLocations));
+ this.partLocations = partLocations;
+ }
+
+ private static List<SqlNodeList> toProps(List<SqlCharStringLiteral> partLocations) {
+ List<SqlNodeList> res = new ArrayList<>(partLocations.size());
+ for (SqlCharStringLiteral partLocation : partLocations) {
+ SqlNodeList prop = null;
+ if (partLocation != null) {
+ prop = new SqlNodeList(partLocation.getParserPosition());
+ prop.add(HiveDDLUtils.toTableOption(SqlCreateHiveTable.TABLE_LOCATION_URI, partLocation, partLocation.getParserPosition()));
+ }
+ res.add(prop);
+ }
+ return res;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("ALTER TABLE");
+ tableIdentifier.unparse(writer, leftPrec, rightPrec);
+ writer.newlineAndIndent();
+ writer.keyword("ADD");
+ if (ifNotExists()) {
+ writer.keyword("IF NOT EXISTS");
+ }
+ int opLeftPrec = getOperator().getLeftPrec();
+ int opRightPrec = getOperator().getRightPrec();
+ for (int i = 0; i < getPartSpecs().size(); i++) {
+ writer.newlineAndIndent();
+ SqlNodeList partSpec = getPartSpecs().get(i);
+ writer.keyword("PARTITION");
+ partSpec.unparse(writer, opLeftPrec, opRightPrec);
+ SqlCharStringLiteral location = partLocations.get(i);
+ if (location != null) {
+ writer.keyword("LOCATION");
+ location.unparse(writer, opLeftPrec, opRightPrec);
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index 5e5c443..7130713 100644
--- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -383,4 +383,31 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest {
"SELECT `C1`, `C2`\n" +
"FROM `TBL`");
}
+
+ @Test
+ public void testAddPartition() {
+ sql("alter table tbl add partition (p1=1,p2='a') location '/part1/location'")
+ .ok("ALTER TABLE `TBL`\n" +
+ "ADD\n" +
+ "PARTITION (`P1` = 1, `P2` = 'a') LOCATION '/part1/location'");
+ sql("alter table tbl add if not exists partition (p=1) partition (p=2) location '/part2/location'")
+ .ok("ALTER TABLE `TBL`\n" +
+ "ADD IF NOT EXISTS\n" +
+ "PARTITION (`P` = 1)\n" +
+ "PARTITION (`P` = 2) LOCATION '/part2/location'");
+ }
+
+ @Test
+ public void testDropPartition() {
+ sql("alter table tbl drop if exists partition (p=1)")
+ .ok("ALTER TABLE `TBL`\n" +
+ "DROP IF EXISTS\n" +
+ "PARTITION (`P` = 1)");
+ sql("alter table tbl drop partition (p1='a',p2=1), partition(p1='b',p2=2)")
+ .ok("ALTER TABLE `TBL`\n" +
+ "DROP\n" +
+ "PARTITION (`P1` = 'a', `P2` = 1)\n" +
+ "PARTITION (`P1` = 'b', `P2` = 2)");
+ // TODO: support IGNORE PROTECTION, PURGE
+ }
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
new file mode 100644
index 0000000..7aa71ab
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlPartitionUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * ALTER TABLE DDL to add partitions to a table.
+ */
+public class SqlAddPartitions extends SqlAlterTable {
+
+ private final boolean ifNotExists;
+ private final List<SqlNodeList> partSpecs;
+ private final List<SqlNodeList> partProps;
+
+ public SqlAddPartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifNotExists,
+ List<SqlNodeList> partSpecs, List<SqlNodeList> partProps) {
+ super(pos, tableName);
+ this.ifNotExists = ifNotExists;
+ this.partSpecs = partSpecs;
+ this.partProps = partProps;
+ }
+
+ public boolean ifNotExists() {
+ return ifNotExists;
+ }
+
+ public List<SqlNodeList> getPartSpecs() {
+ return partSpecs;
+ }
+
+ public LinkedHashMap<String, String> getPartitionKVs(int i) {
+ return SqlPartitionUtils.getPartitionKVs(getPartSpecs().get(i));
+ }
+
+ public List<SqlNodeList> getPartProps() {
+ return partProps;
+ }
+
+ @Nonnull
+ @Override
+ public List<SqlNode> getOperandList() {
+ List<SqlNode> operands = new ArrayList<>();
+ operands.add(tableIdentifier);
+ operands.addAll(partSpecs);
+ operands.addAll(partProps);
+ return operands;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+ writer.newlineAndIndent();
+ writer.keyword("ADD");
+ if (ifNotExists) {
+ writer.keyword("IF NOT EXISTS");
+ }
+ int opLeftPrec = getOperator().getLeftPrec();
+ int opRightPrec = getOperator().getRightPrec();
+ for (int i = 0; i < partSpecs.size(); i++) {
+ writer.newlineAndIndent();
+ SqlNodeList partSpec = partSpecs.get(i);
+ SqlNodeList partProp = partProps.get(i);
+ writer.keyword("PARTITION");
+ partSpec.unparse(writer, opLeftPrec, opRightPrec);
+ if (partProp != null) {
+ writer.keyword("WITH");
+ partProp.unparse(writer, opLeftPrec, opRightPrec);
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
new file mode 100644
index 0000000..16ed67c
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlPartitionUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * ALTER TABLE DDL to drop partitions of a table.
+ */
+public class SqlDropPartitions extends SqlAlterTable {
+
+ private final boolean ifExists;
+ private final List<SqlNodeList> partSpecs;
+
+ public SqlDropPartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifExists,
+ List<SqlNodeList> partSpecs) {
+ super(pos, tableName);
+ this.ifExists = ifExists;
+ this.partSpecs = partSpecs;
+ }
+
+ public boolean ifExists() {
+ return ifExists;
+ }
+
+ public List<SqlNodeList> getPartSpecs() {
+ return partSpecs;
+ }
+
+ public LinkedHashMap<String, String> getPartitionKVs(int i) {
+ return SqlPartitionUtils.getPartitionKVs(getPartSpecs().get(i));
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+ writer.newlineAndIndent();
+ writer.keyword("DROP");
+ if (ifExists) {
+ writer.keyword("IF EXISTS");
+ }
+ int opLeftPrec = getOperator().getLeftPrec();
+ int opRightPrec = getOperator().getRightPrec();
+ for (SqlNodeList partSpec : partSpecs) {
+ writer.newlineAndIndent();
+ writer.keyword("PARTITION");
+ partSpec.unparse(writer, opLeftPrec, opRightPrec);
+ }
+ }
+
+ @Nonnull
+ @Override
+ public List<SqlNode> getOperandList() {
+ List<SqlNode> operands = new ArrayList<>();
+ operands.add(tableIdentifier);
+ operands.addAll(partSpecs);
+ return operands;
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 6a481928..3b4db5a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -42,12 +42,15 @@ import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -90,6 +93,7 @@ import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.operations.UnregisteredSinkModifyOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
@@ -111,6 +115,7 @@ import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
+import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
@@ -838,6 +843,22 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
catalog.alterTable(alterTableSchemaOperation.getTableIdentifier().toObjectPath(),
alterTableSchemaOperation.getCatalogTable(),
false);
+ } else if (alterTableOperation instanceof AddPartitionsOperation) {
+ AddPartitionsOperation addPartitionsOperation = (AddPartitionsOperation) alterTableOperation;
+ List<CatalogPartitionSpec> specs = addPartitionsOperation.getPartitionSpecs();
+ List<CatalogPartition> partitions = addPartitionsOperation.getCatalogPartitions();
+ boolean ifNotExists = addPartitionsOperation.ifNotExists();
+ ObjectPath tablePath = addPartitionsOperation.getTableIdentifier().toObjectPath();
+ for (int i = 0; i < specs.size(); i++) {
+ catalog.createPartition(tablePath, specs.get(i), partitions.get(i), ifNotExists);
+ }
+ } else if (alterTableOperation instanceof DropPartitionsOperation) {
+ DropPartitionsOperation dropPartitionsOperation = (DropPartitionsOperation) alterTableOperation;
+ ObjectPath tablePath = dropPartitionsOperation.getTableIdentifier().toObjectPath();
+ boolean ifExists = dropPartitionsOperation.ifExists();
+ for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
+ catalog.dropPartition(tablePath, spec, ifExists);
+ }
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (TableAlreadyExistException | TableNotExistException e) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
new file mode 100644
index 0000000..1802375
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Operation to describe ALTER TABLE ADD PARTITION statement.
+ */
+public class AddPartitionsOperation extends AlterTableOperation {
+
+ private final boolean ifNotExists;
+ private final List<CatalogPartitionSpec> partitionSpecs;
+ private final List<CatalogPartition> catalogPartitions;
+
+ public AddPartitionsOperation(ObjectIdentifier tableIdentifier, boolean ifNotExists,
+ List<CatalogPartitionSpec> partitionSpecs, List<CatalogPartition> catalogPartitions) {
+ super(tableIdentifier);
+ this.ifNotExists = ifNotExists;
+ this.partitionSpecs = partitionSpecs;
+ this.catalogPartitions = catalogPartitions;
+ }
+
+ public List<CatalogPartitionSpec> getPartitionSpecs() {
+ return partitionSpecs;
+ }
+
+ public List<CatalogPartition> getCatalogPartitions() {
+ return catalogPartitions;
+ }
+
+ public boolean ifNotExists() {
+ return ifNotExists;
+ }
+
+ @Override
+ public String asSummaryString() {
+ StringBuilder builder = new StringBuilder(String.format("ALTER TABLE %s ADD", tableIdentifier.asSummaryString()));
+ if (ifNotExists) {
+ builder.append(" IF NOT EXISTS");
+ }
+ for (int i = 0; i < partitionSpecs.size(); i++) {
+ String spec = OperationUtils.formatPartitionSpec(partitionSpecs.get(i));
+ builder.append(String.format(" PARTITION (%s)", spec));
+ Map<String, String> properties = catalogPartitions.get(i).getProperties();
+ if (!properties.isEmpty()) {
+ builder.append(String.format(" WITH (%s)", OperationUtils.formatProperties(properties)));
+ }
+ }
+ return builder.toString();
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
new file mode 100644
index 0000000..f3fc4f3
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.List;
+
+/**
+ * Operation to describe ALTER TABLE DROP PARTITION statement.
+ */
+public class DropPartitionsOperation extends AlterTableOperation {
+
+ private final boolean ifExists;
+ private final List<CatalogPartitionSpec> partitionSpecs;
+
+ public DropPartitionsOperation(ObjectIdentifier tableIdentifier, boolean ifExists, List<CatalogPartitionSpec> partitionSpecs) {
+ super(tableIdentifier);
+ this.ifExists = ifExists;
+ this.partitionSpecs = partitionSpecs;
+ }
+
+ public boolean ifExists() {
+ return ifExists;
+ }
+
+ public List<CatalogPartitionSpec> getPartitionSpecs() {
+ return partitionSpecs;
+ }
+
+ @Override
+ public String asSummaryString() {
+ StringBuilder builder = new StringBuilder(String.format("ALTER TABLE %s DROP", tableIdentifier.asSummaryString()));
+ if (ifExists) {
+ builder.append(" IF EXISTS");
+ }
+ for (CatalogPartitionSpec spec : partitionSpecs) {
+ builder.append(String.format(" PARTITION (%s)", OperationUtils.formatPartitionSpec(spec)));
+ }
+ return builder.toString();
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index b496ddf..615ad60 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.operations;
+import org.apache.flink.sql.parser.ddl.SqlAddPartitions;
import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
@@ -38,6 +39,7 @@ import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.ddl.SqlCreateView;
import org.apache.flink.sql.parser.ddl.SqlDropDatabase;
import org.apache.flink.sql.parser.ddl.SqlDropFunction;
+import org.apache.flink.sql.parser.ddl.SqlDropPartitions;
import org.apache.flink.sql.parser.ddl.SqlDropTable;
import org.apache.flink.sql.parser.ddl.SqlDropView;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
@@ -84,6 +86,7 @@ import org.apache.flink.table.operations.ShowTablesOperation;
import org.apache.flink.table.operations.ShowViewsOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
@@ -101,6 +104,7 @@ import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
+import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
@@ -126,6 +130,7 @@ import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -363,6 +368,23 @@ public class SqlToOperationConverter {
(SqlChangeColumn) sqlAlterTable,
(CatalogTable) baseTable,
flinkPlanner.getOrCreateSqlValidator());
+ } else if (sqlAlterTable instanceof SqlAddPartitions) {
+ List<CatalogPartitionSpec> specs = new ArrayList<>();
+ List<CatalogPartition> partitions = new ArrayList<>();
+ SqlAddPartitions addPartitions = (SqlAddPartitions) sqlAlterTable;
+ for (int i = 0; i < addPartitions.getPartSpecs().size(); i++) {
+ specs.add(new CatalogPartitionSpec(addPartitions.getPartitionKVs(i)));
+ Map<String, String> props = OperationConverterUtils.extractProperties(addPartitions.getPartProps().get(i));
+ partitions.add(new CatalogPartitionImpl(props, null));
+ }
+ return new AddPartitionsOperation(tableIdentifier, addPartitions.ifNotExists(), specs, partitions);
+ } else if (sqlAlterTable instanceof SqlDropPartitions) {
+ SqlDropPartitions dropPartitions = (SqlDropPartitions) sqlAlterTable;
+ List<CatalogPartitionSpec> specs = new ArrayList<>();
+ for (int i = 0; i < dropPartitions.getPartSpecs().size(); i++) {
+ specs.add(new CatalogPartitionSpec(dropPartitions.getPartitionKVs(i)));
+ }
+ return new DropPartitionsOperation(tableIdentifier, dropPartitions.ifExists(), specs);
} else {
throw new ValidationException(
String.format("[%s] needs to implement",