You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/17 13:07:43 UTC

[flink] branch master updated: [FLINK-17449][sql-parser-hive] Implement ADD/DROP partitions

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d3c3909  [FLINK-17449][sql-parser-hive] Implement ADD/DROP partitions
d3c3909 is described below

commit d3c39090a73aeb928c78ecd63e1fd6190c0df1f1
Author: Rui Li <li...@apache.org>
AuthorDate: Sun May 17 21:07:07 2020 +0800

    [FLINK-17449][sql-parser-hive] Implement ADD/DROP partitions
    
    
    This closes #12195
---
 .../flink/connectors/hive/HiveDialectTest.java     |  35 ++++++--
 .../src/main/codegen/data/Parser.tdd               |   2 +
 .../src/main/codegen/includes/parserImpls.ftl      |  79 +++++++++++++++-
 .../sql/parser/hive/ddl/SqlAddHivePartitions.java  |  85 ++++++++++++++++++
 .../parser/hive/FlinkHiveSqlParserImplTest.java    |  27 ++++++
 .../flink/sql/parser/ddl/SqlAddPartitions.java     | 100 +++++++++++++++++++++
 .../flink/sql/parser/ddl/SqlDropPartitions.java    |  87 ++++++++++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   |  21 +++++
 .../operations/ddl/AddPartitionsOperation.java     |  74 +++++++++++++++
 .../operations/ddl/DropPartitionsOperation.java    |  60 +++++++++++++
 .../operations/SqlToOperationConverter.java        |  22 +++++
 11 files changed, 582 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
index b85309b..9b6df99 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogPartitionImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -60,7 +59,6 @@ import org.junit.Test;
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 
@@ -302,8 +300,7 @@ public class HiveDialectTest {
 
 		// add/replace columns cascade
 		tableEnv.executeSql("create table tbl2 (x int) partitioned by (dt date,id bigint)");
-		ObjectPath tablePath2 = new ObjectPath("default", "tbl2");
-		// TODO: use DDL to add partitions once we support it
+		tableEnv.executeSql("alter table tbl2 add partition (dt='2020-01-23',id=1) partition (dt='2020-04-24',id=2)");
 		CatalogPartitionSpec partitionSpec1 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
 			put("dt", "2020-01-23");
 			put("id", "1");
@@ -312,9 +309,8 @@ public class HiveDialectTest {
 			put("dt", "2020-04-24");
 			put("id", "2");
 		}});
-		hiveCatalog.createPartition(tablePath2, partitionSpec1, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
-		hiveCatalog.createPartition(tablePath2, partitionSpec2, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
 		tableEnv.executeSql("alter table tbl2 replace columns (ti tinyint,d decimal) cascade");
+		ObjectPath tablePath2 = new ObjectPath("default", "tbl2");
 		hiveTable = hiveCatalog.getHiveTable(tablePath2);
 		Partition hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
 		assertEquals(2, hivePartition.getSd().getColsSize());
@@ -344,7 +340,7 @@ public class HiveDialectTest {
 	@Test
 	public void testAlterPartition() throws Exception {
 		tableEnv.executeSql("create table tbl (x tinyint,y string) partitioned by (p1 bigint,p2 date)");
-		// TODO: use DDL to add partitions once we support it
+		tableEnv.executeSql("alter table tbl add partition (p1=1000,p2='2020-05-01') partition (p1=2000,p2='2020-01-01')");
 		CatalogPartitionSpec spec1 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
 			put("p1", "1000");
 			put("p2", "2020-05-01");
@@ -354,8 +350,6 @@ public class HiveDialectTest {
 			put("p2", "2020-01-01");
 		}});
 		ObjectPath tablePath = new ObjectPath("default", "tbl");
-		hiveCatalog.createPartition(tablePath, spec1, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
-		hiveCatalog.createPartition(tablePath, spec2, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
 
 		Table hiveTable = hiveCatalog.getHiveTable(tablePath);
 
@@ -438,6 +432,29 @@ public class HiveDialectTest {
 		assertEquals(DEFAULT_BUILTIN_DATABASE, databases.get(0).toString());
 	}
 
+	@Test
+	public void testAddDropPartitions() throws Exception {
+		tableEnv.executeSql("create table tbl (x int,y binary) partitioned by (dt date,country string)");
+		tableEnv.executeSql("alter table tbl add partition (dt='2020-04-30',country='china') partition (dt='2020-04-30',country='us')");
+
+		ObjectPath tablePath = new ObjectPath("default", "tbl");
+		assertEquals(2, hiveCatalog.listPartitions(tablePath).size());
+
+		String partLocation = warehouse + "/part3_location";
+		tableEnv.executeSql(String.format(
+				"alter table tbl add partition (dt='2020-05-01',country='belgium') location '%s'", partLocation));
+		Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+		CatalogPartitionSpec spec = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
+			put("dt", "2020-05-01");
+			put("country", "belgium");
+		}});
+		Partition hivePartition = hiveCatalog.getHivePartition(hiveTable, spec);
+		assertEquals(partLocation, locationPath(hivePartition.getSd().getLocation()));
+
+		tableEnv.executeSql("alter table tbl drop partition (dt='2020-04-30',country='china'),partition (dt='2020-05-01',country='belgium')");
+		assertEquals(1, hiveCatalog.listPartitions(tablePath).size());
+	}
+
 	private static String locationPath(String locationURI) throws URISyntaxException {
 		return new URI(locationURI).getPath();
 	}
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
index 0657f64..e000d34 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
@@ -24,6 +24,7 @@
   # Please keep the import classes in alphabetical order if new class is added.
   imports: [
     "org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils"
+    "org.apache.flink.sql.parser.hive.ddl.SqlAddHivePartitions"
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseLocation"
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner"
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseProps"
@@ -65,6 +66,7 @@
     "org.apache.flink.sql.parser.ddl.SqlCreateView"
     "org.apache.flink.sql.parser.ddl.SqlDropDatabase"
     "org.apache.flink.sql.parser.ddl.SqlDropFunction"
+    "org.apache.flink.sql.parser.ddl.SqlDropPartitions"
     "org.apache.flink.sql.parser.ddl.SqlDropTable"
     "org.apache.flink.sql.parser.ddl.SqlDropView"
     "org.apache.flink.sql.parser.ddl.SqlTableColumn"
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index 4aba880..2ffc233 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -1136,12 +1136,32 @@ SqlAlterTable SqlAlterTable() :
             }
         )
     |
-        <ADD> <COLUMNS>
+        <ADD>
+        (
+            <COLUMNS>
+            {
+                EnsureAlterTableOnly(partitionSpec, "Add columns");
+                return SqlAlterHiveTableAddReplaceColumn(startPos, tableIdentifier, false);
+            }
+        |
+            [ <IF> <NOT> <EXISTS> { ifNotExists = true; } ]
+            {
+                EnsureAlterTableOnly(partitionSpec, "Add partitions");
+                return SqlAddHivePartitions(startPos, tableIdentifier, ifNotExists);
+            }
+        )
         {
             EnsureAlterTableOnly(partitionSpec, "Add columns");
             return SqlAlterHiveTableAddReplaceColumn(startPos, tableIdentifier, false);
         }
     |
+        <DROP>
+        [ <IF> <EXISTS> { ifExists = true; } ]
+        {
+            EnsureAlterTableOnly(partitionSpec, "Drop partitions");
+            return SqlDropPartitions(startPos, tableIdentifier, ifExists);
+        }
+    |
         <REPLACE> <COLUMNS>
         {
             EnsureAlterTableOnly(partitionSpec, "Replace columns");
@@ -1368,3 +1388,60 @@ SqlAlterView SqlAlterView() :
       }
   )
 }
+
+/**
+ * Hive syntax:
+ *
+ * ALTER TABLE table_name ADD [IF NOT EXISTS]
+ *     PARTITION partition_spec [LOCATION 'location'][PARTITION partition_spec [LOCATION 'location']][...];
+ */
+SqlAlterTable SqlAddHivePartitions(SqlParserPos startPos, SqlIdentifier tableIdentifier, boolean ifNotExists) :
+{
+  List<SqlNodeList> partSpecs = new ArrayList();
+  List<SqlCharStringLiteral> partLocations = new ArrayList();
+  SqlNodeList partSpec;
+  SqlCharStringLiteral partLocation;
+}
+{
+  (
+    <PARTITION>
+    {
+      partSpec = new SqlNodeList(getPos());
+      partLocation = null;
+      PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec);
+    }
+    [ <LOCATION> <QUOTED_STRING> { partLocation = createStringLiteral(token.image, getPos()); } ]
+    { partSpecs.add(partSpec); partLocations.add(partLocation); }
+  )+
+  { return new SqlAddHivePartitions(startPos.plus(getPos()), tableIdentifier, ifNotExists, partSpecs, partLocations); }
+}
+
+/**
+ * Hive syntax:
+ *
+ * ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec[, PARTITION partition_spec, ...]
+ *    [IGNORE PROTECTION] [PURGE];
+ */
+SqlAlterTable SqlDropPartitions(SqlParserPos startPos, SqlIdentifier tableIdentifier, boolean ifExists) :
+{
+  List<SqlNodeList> partSpecs = new ArrayList();
+  SqlNodeList partSpec;
+}
+{
+  <PARTITION>
+  {
+    partSpec = new SqlNodeList(getPos());
+    PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec);
+    partSpecs.add(partSpec);
+  }
+  (
+    <COMMA>
+    <PARTITION>
+    {
+      partSpec = new SqlNodeList(getPos());
+      PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec);
+      partSpecs.add(partSpec);
+    }
+  )*
+  { return new SqlDropPartitions(startPos.plus(getPos()), tableIdentifier, ifExists, partSpecs); }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
new file mode 100644
index 0000000..d003f6e
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlAddPartitions;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Add partitions to a Hive table.
+ *
+ * <p>Hive syntax:
+ * ALTER TABLE table_name ADD [IF NOT EXISTS]
+ *     PARTITION partition_spec [LOCATION 'location'][PARTITION partition_spec [LOCATION 'location']][...];
+ */
+public class SqlAddHivePartitions extends SqlAddPartitions {
+
+	private final List<SqlCharStringLiteral> partLocations;
+
+	public SqlAddHivePartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifNotExists,
+			List<SqlNodeList> partSpecs, List<SqlCharStringLiteral> partLocations) {
+		super(pos, tableName, ifNotExists, partSpecs, toProps(partLocations));
+		this.partLocations = partLocations;
+	}
+
+	private static List<SqlNodeList> toProps(List<SqlCharStringLiteral> partLocations) {
+		List<SqlNodeList> res = new ArrayList<>(partLocations.size());
+		for (SqlCharStringLiteral partLocation : partLocations) {
+			SqlNodeList prop = null;
+			if (partLocation != null) {
+				prop = new SqlNodeList(partLocation.getParserPosition());
+				prop.add(HiveDDLUtils.toTableOption(SqlCreateHiveTable.TABLE_LOCATION_URI, partLocation, partLocation.getParserPosition()));
+			}
+			res.add(prop);
+		}
+		return res;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword("ALTER TABLE");
+		tableIdentifier.unparse(writer, leftPrec, rightPrec);
+		writer.newlineAndIndent();
+		writer.keyword("ADD");
+		if (ifNotExists()) {
+			writer.keyword("IF NOT EXISTS");
+		}
+		int opLeftPrec = getOperator().getLeftPrec();
+		int opRightPrec = getOperator().getRightPrec();
+		for (int i = 0; i < getPartSpecs().size(); i++) {
+			writer.newlineAndIndent();
+			SqlNodeList partSpec = getPartSpecs().get(i);
+			writer.keyword("PARTITION");
+			partSpec.unparse(writer, opLeftPrec, opRightPrec);
+			SqlCharStringLiteral location = partLocations.get(i);
+			if (location != null) {
+				writer.keyword("LOCATION");
+				location.unparse(writer, opLeftPrec, opRightPrec);
+			}
+		}
+	}
+}
diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index 5e5c443..7130713 100644
--- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -383,4 +383,31 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest {
 						"SELECT `C1`, `C2`\n" +
 						"FROM `TBL`");
 	}
+
+	@Test
+	public void testAddPartition() {
+		sql("alter table tbl add partition (p1=1,p2='a') location '/part1/location'")
+				.ok("ALTER TABLE `TBL`\n" +
+						"ADD\n" +
+						"PARTITION (`P1` = 1, `P2` = 'a') LOCATION '/part1/location'");
+		sql("alter table tbl add if not exists partition (p=1) partition (p=2) location '/part2/location'")
+				.ok("ALTER TABLE `TBL`\n" +
+						"ADD IF NOT EXISTS\n" +
+						"PARTITION (`P` = 1)\n" +
+						"PARTITION (`P` = 2) LOCATION '/part2/location'");
+	}
+
+	@Test
+	public void testDropPartition() {
+		sql("alter table tbl drop if exists partition (p=1)")
+				.ok("ALTER TABLE `TBL`\n" +
+						"DROP IF EXISTS\n" +
+						"PARTITION (`P` = 1)");
+		sql("alter table tbl drop partition (p1='a',p2=1), partition(p1='b',p2=2)")
+				.ok("ALTER TABLE `TBL`\n" +
+						"DROP\n" +
+						"PARTITION (`P1` = 'a', `P2` = 1)\n" +
+						"PARTITION (`P1` = 'b', `P2` = 2)");
+		// TODO: support IGNORE PROTECTION, PURGE
+	}
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
new file mode 100644
index 0000000..7aa71ab
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlPartitionUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * ALTER TABLE DDL to add partitions to a table.
+ */
+public class SqlAddPartitions extends SqlAlterTable {
+
+	private final boolean ifNotExists;
+	private final List<SqlNodeList> partSpecs;
+	private final List<SqlNodeList> partProps;
+
+	public SqlAddPartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifNotExists,
+			List<SqlNodeList> partSpecs, List<SqlNodeList> partProps) {
+		super(pos, tableName);
+		this.ifNotExists = ifNotExists;
+		this.partSpecs = partSpecs;
+		this.partProps = partProps;
+	}
+
+	public boolean ifNotExists() {
+		return ifNotExists;
+	}
+
+	public List<SqlNodeList> getPartSpecs() {
+		return partSpecs;
+	}
+
+	public LinkedHashMap<String, String> getPartitionKVs(int i) {
+		return SqlPartitionUtils.getPartitionKVs(getPartSpecs().get(i));
+	}
+
+	public List<SqlNodeList> getPartProps() {
+		return partProps;
+	}
+
+	@Nonnull
+	@Override
+	public List<SqlNode> getOperandList() {
+		List<SqlNode> operands = new ArrayList<>();
+		operands.add(tableIdentifier);
+		operands.addAll(partSpecs);
+		operands.addAll(partProps);
+		return operands;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		super.unparse(writer, leftPrec, rightPrec);
+		writer.newlineAndIndent();
+		writer.keyword("ADD");
+		if (ifNotExists) {
+			writer.keyword("IF NOT EXISTS");
+		}
+		int opLeftPrec = getOperator().getLeftPrec();
+		int opRightPrec = getOperator().getRightPrec();
+		for (int i = 0; i < partSpecs.size(); i++) {
+			writer.newlineAndIndent();
+			SqlNodeList partSpec = partSpecs.get(i);
+			SqlNodeList partProp = partProps.get(i);
+			writer.keyword("PARTITION");
+			partSpec.unparse(writer, opLeftPrec, opRightPrec);
+			if (partProp != null) {
+				writer.keyword("WITH");
+				partProp.unparse(writer, opLeftPrec, opRightPrec);
+			}
+		}
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
new file mode 100644
index 0000000..16ed67c
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlPartitionUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * ALTER TABLE DDL to drop partitions of a table.
+ */
+public class SqlDropPartitions extends SqlAlterTable {
+
+	private final boolean ifExists;
+	private final List<SqlNodeList> partSpecs;
+
+	public SqlDropPartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifExists,
+			List<SqlNodeList> partSpecs) {
+		super(pos, tableName);
+		this.ifExists = ifExists;
+		this.partSpecs = partSpecs;
+	}
+
+	public boolean ifExists() {
+		return ifExists;
+	}
+
+	public List<SqlNodeList> getPartSpecs() {
+		return partSpecs;
+	}
+
+	public LinkedHashMap<String, String> getPartitionKVs(int i) {
+		return SqlPartitionUtils.getPartitionKVs(getPartSpecs().get(i));
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		super.unparse(writer, leftPrec, rightPrec);
+		writer.newlineAndIndent();
+		writer.keyword("DROP");
+		if (ifExists) {
+			writer.keyword("IF EXISTS");
+		}
+		int opLeftPrec = getOperator().getLeftPrec();
+		int opRightPrec = getOperator().getRightPrec();
+		for (SqlNodeList partSpec : partSpecs) {
+			writer.newlineAndIndent();
+			writer.keyword("PARTITION");
+			partSpec.unparse(writer, opLeftPrec, opRightPrec);
+		}
+	}
+
+	@Nonnull
+	@Override
+	public List<SqlNode> getOperandList() {
+		List<SqlNode> operands = new ArrayList<>();
+		operands.add(tableIdentifier);
+		operands.addAll(partSpecs);
+		return operands;
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 6a481928..3b4db5a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -42,12 +42,15 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -90,6 +93,7 @@ import org.apache.flink.table.operations.TableSourceQueryOperation;
 import org.apache.flink.table.operations.UnregisteredSinkModifyOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
@@ -111,6 +115,7 @@ import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateViewOperation;
 import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
+import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
@@ -838,6 +843,22 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 					catalog.alterTable(alterTableSchemaOperation.getTableIdentifier().toObjectPath(),
 							alterTableSchemaOperation.getCatalogTable(),
 							false);
+				} else if (alterTableOperation instanceof AddPartitionsOperation) {
+					AddPartitionsOperation addPartitionsOperation = (AddPartitionsOperation) alterTableOperation;
+					List<CatalogPartitionSpec> specs = addPartitionsOperation.getPartitionSpecs();
+					List<CatalogPartition> partitions = addPartitionsOperation.getCatalogPartitions();
+					boolean ifNotExists = addPartitionsOperation.ifNotExists();
+					ObjectPath tablePath = addPartitionsOperation.getTableIdentifier().toObjectPath();
+					for (int i = 0; i < specs.size(); i++) {
+						catalog.createPartition(tablePath, specs.get(i), partitions.get(i), ifNotExists);
+					}
+				} else if (alterTableOperation instanceof DropPartitionsOperation) {
+					DropPartitionsOperation dropPartitionsOperation = (DropPartitionsOperation) alterTableOperation;
+					ObjectPath tablePath = dropPartitionsOperation.getTableIdentifier().toObjectPath();
+					boolean ifExists = dropPartitionsOperation.ifExists();
+					for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
+						catalog.dropPartition(tablePath, spec, ifExists);
+					}
 				}
 				return TableResultImpl.TABLE_RESULT_OK;
 			} catch (TableAlreadyExistException | TableNotExistException e) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
new file mode 100644
index 0000000..1802375
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Operation to describe ALTER TABLE ADD PARTITION statement.
+ */
+public class AddPartitionsOperation extends AlterTableOperation {
+
+	private final boolean ifNotExists;
+	private final List<CatalogPartitionSpec> partitionSpecs;
+	private final List<CatalogPartition> catalogPartitions;
+
+	public AddPartitionsOperation(ObjectIdentifier tableIdentifier, boolean ifNotExists,
+			List<CatalogPartitionSpec> partitionSpecs, List<CatalogPartition> catalogPartitions) {
+		super(tableIdentifier);
+		this.ifNotExists = ifNotExists;
+		this.partitionSpecs = partitionSpecs;
+		this.catalogPartitions = catalogPartitions;
+	}
+
+	public List<CatalogPartitionSpec> getPartitionSpecs() {
+		return partitionSpecs;
+	}
+
+	public List<CatalogPartition> getCatalogPartitions() {
+		return catalogPartitions;
+	}
+
+	public boolean ifNotExists() {
+		return ifNotExists;
+	}
+
+	@Override
+	public String asSummaryString() {
+		StringBuilder builder = new StringBuilder(String.format("ALTER TABLE %s ADD", tableIdentifier.asSummaryString()));
+		if (ifNotExists) {
+			builder.append(" IF NOT EXISTS");
+		}
+		for (int i = 0; i < partitionSpecs.size(); i++) {
+			String spec = OperationUtils.formatPartitionSpec(partitionSpecs.get(i));
+			builder.append(String.format(" PARTITION (%s)", spec));
+			Map<String, String> properties = catalogPartitions.get(i).getProperties();
+			if (!properties.isEmpty()) {
+				builder.append(String.format(" WITH (%s)", OperationUtils.formatProperties(properties)));
+			}
+		}
+		return builder.toString();
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
new file mode 100644
index 0000000..f3fc4f3
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.List;
+
+/**
+ * Operation to describe ALTER TABLE DROP PARTITION statement.
+ */
+public class DropPartitionsOperation extends AlterTableOperation {
+
+	private final boolean ifExists;
+	private final List<CatalogPartitionSpec> partitionSpecs;
+
+	public DropPartitionsOperation(ObjectIdentifier tableIdentifier, boolean ifExists, List<CatalogPartitionSpec> partitionSpecs) {
+		super(tableIdentifier);
+		this.ifExists = ifExists;
+		this.partitionSpecs = partitionSpecs;
+	}
+
+	public boolean ifExists() {
+		return ifExists;
+	}
+
+	public List<CatalogPartitionSpec> getPartitionSpecs() {
+		return partitionSpecs;
+	}
+
+	@Override
+	public String asSummaryString() {
+		StringBuilder builder = new StringBuilder(String.format("ALTER TABLE %s DROP", tableIdentifier.asSummaryString()));
+		if (ifExists) {
+			builder.append(" IF EXISTS");
+		}
+		for (CatalogPartitionSpec spec : partitionSpecs) {
+			builder.append(String.format(" PARTITION (%s)", OperationUtils.formatPartitionSpec(spec)));
+		}
+		return builder.toString();
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index b496ddf..615ad60 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.operations;
 
+import org.apache.flink.sql.parser.ddl.SqlAddPartitions;
 import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
 import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
 import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
@@ -38,6 +39,7 @@ import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.ddl.SqlCreateView;
 import org.apache.flink.sql.parser.ddl.SqlDropDatabase;
 import org.apache.flink.sql.parser.ddl.SqlDropFunction;
+import org.apache.flink.sql.parser.ddl.SqlDropPartitions;
 import org.apache.flink.sql.parser.ddl.SqlDropTable;
 import org.apache.flink.sql.parser.ddl.SqlDropView;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
@@ -84,6 +86,7 @@ import org.apache.flink.table.operations.ShowTablesOperation;
 import org.apache.flink.table.operations.ShowViewsOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
@@ -101,6 +104,7 @@ import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateViewOperation;
 import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
+import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
@@ -126,6 +130,7 @@ import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.dialect.CalciteSqlDialect;
 import org.apache.calcite.sql.parser.SqlParser;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -363,6 +368,23 @@ public class SqlToOperationConverter {
 					(SqlChangeColumn) sqlAlterTable,
 					(CatalogTable) baseTable,
 					flinkPlanner.getOrCreateSqlValidator());
+		} else if (sqlAlterTable instanceof SqlAddPartitions) {
+			List<CatalogPartitionSpec> specs = new ArrayList<>();
+			List<CatalogPartition> partitions = new ArrayList<>();
+			SqlAddPartitions addPartitions = (SqlAddPartitions) sqlAlterTable;
+			for (int i = 0; i < addPartitions.getPartSpecs().size(); i++) {
+				specs.add(new CatalogPartitionSpec(addPartitions.getPartitionKVs(i)));
+				Map<String, String> props = OperationConverterUtils.extractProperties(addPartitions.getPartProps().get(i));
+				partitions.add(new CatalogPartitionImpl(props, null));
+			}
+			return new AddPartitionsOperation(tableIdentifier, addPartitions.ifNotExists(), specs, partitions);
+		} else if (sqlAlterTable instanceof SqlDropPartitions) {
+			SqlDropPartitions dropPartitions = (SqlDropPartitions) sqlAlterTable;
+			List<CatalogPartitionSpec> specs = new ArrayList<>();
+			for (int i = 0; i < dropPartitions.getPartSpecs().size(); i++) {
+				specs.add(new CatalogPartitionSpec(dropPartitions.getPartitionKVs(i)));
+			}
+			return new DropPartitionsOperation(tableIdentifier, dropPartitions.ifExists(), specs);
 		} else {
 			throw new ValidationException(
 					String.format("[%s] needs to implement",