You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/17 02:11:19 UTC

[flink] branch master updated: [FLINK-17448][sql-parser-hive] Implement alter DDL for Hive dialect

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bbf8ed  [FLINK-17448][sql-parser-hive] Implement alter DDL for Hive dialect
2bbf8ed is described below

commit 2bbf8ed32a7fc29dd5f0c941f451bd3a7a6d0d1b
Author: Rui Li <li...@apache.org>
AuthorDate: Sun May 17 10:10:27 2020 +0800

    [FLINK-17448][sql-parser-hive] Implement alter DDL for Hive dialect
    
    
    This closes #12108
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 210 ++++++++++-----------
 .../table/catalog/hive/util/HiveTableUtil.java     | 125 ++++++++++++
 .../flink/connectors/hive/HiveDialectTest.java     | 173 ++++++++++++++++-
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  |  30 +++
 .../flink/table/catalog/hive/HiveCatalogTest.java  |   5 +-
 .../src/main/codegen/data/Parser.tdd               |   8 +
 .../src/main/codegen/includes/parserImpls.ftl      | 178 ++++++++++++++++-
 .../flink/sql/parser/hive/ddl/HiveDDLUtils.java    |  21 ++-
 .../hive/ddl/SqlAlterHivePartitionRename.java}     |  52 +++--
 .../sql/parser/hive/ddl/SqlAlterHiveTable.java     |  64 +++++++
 .../ddl/SqlAlterHiveTableAddReplaceColumn.java     |  81 ++++++++
 .../hive/ddl/SqlAlterHiveTableChangeColumn.java    |  78 ++++++++
 .../hive/ddl/SqlAlterHiveTableFileFormat.java      |  55 ++++++
 .../parser/hive/ddl/SqlAlterHiveTableLocation.java |  56 ++++++
 .../parser/hive/ddl/SqlAlterHiveTableProps.java}   |  48 ++---
 .../parser/hive/ddl/SqlAlterHiveTableSerDe.java    |  93 +++++++++
 .../parser/hive/FlinkHiveSqlParserImplTest.java    |  74 ++++++++
 .../apache/flink/sql/parser/SqlPartitionUtils.java |  61 ++++++
 ...leProperties.java => SqlAddReplaceColumns.java} |  67 ++++---
 .../apache/flink/sql/parser/ddl/SqlAlterTable.java |  37 +++-
 .../sql/parser/ddl/SqlAlterTableProperties.java    |   8 +-
 .../flink/sql/parser/ddl/SqlChangeColumn.java      | 109 +++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   |  13 ++
 .../apache/flink/table/catalog/CatalogManager.java |  20 ++
 .../flink/table/operations/OperationUtils.java     |  12 ++
 .../operations/ddl/AlterPartitionOperation.java    |  39 ++++
 .../ddl/AlterPartitionPropertiesOperation.java     |  49 +++++
 .../operations/ddl/AlterTableSchemaOperation.java  |  46 +++++
 .../operations/SqlToOperationConverter.java        | 138 ++++++++------
 .../planner/utils/OperationConverterUtils.java     | 193 +++++++++++++++++++
 30 files changed, 1870 insertions(+), 273 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 815fe41..5bc469f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -20,10 +20,11 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connectors.hive.HiveTableFactory;
 import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp;
 import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.AbstractCatalog;
@@ -37,7 +38,6 @@ import org.apache.flink.table.catalog.CatalogPartitionImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -113,6 +113,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_COL_CASCADE;
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_TABLE_OP;
+import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_COLS;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_CONSTRAINT_TRAITS;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.PK_CONSTRAINT_TRAIT;
@@ -374,7 +377,7 @@ public class HiveCatalog extends AbstractCatalog {
 			throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
 		}
 
-		Table hiveTable = instantiateHiveTable(tablePath, table, hiveConf);
+		Table hiveTable = HiveTableUtil.instantiateHiveTable(tablePath, table, hiveConf);
 
 		UniqueConstraint pkConstraint = null;
 		List<String> notNullCols = new ArrayList<>();
@@ -484,18 +487,30 @@ public class HiveCatalog extends AbstractCatalog {
 					existingTable.getClass().getName(), newCatalogTable.getClass().getName()));
 		}
 
-		Table newTable = instantiateHiveTable(tablePath, newCatalogTable, hiveConf);
-
-		// client.alter_table() requires a valid location
-		// thus, if new table doesn't have that, it reuses location of the old table
-		if (!newTable.getSd().isSetLocation()) {
-			newTable.getSd().setLocation(hiveTable.getSd().getLocation());
+		boolean isGeneric = isGenericForGet(hiveTable.getParameters());
+		if (isGeneric) {
+			hiveTable = HiveTableUtil.alterTableViaCatalogBaseTable(tablePath, newCatalogTable, hiveTable, hiveConf);
+		} else {
+			AlterTableOp op = HiveTableUtil.extractAlterTableOp(newCatalogTable.getOptions());
+			if (op == null) {
+				// the alter operation isn't encoded as properties
+				hiveTable = HiveTableUtil.alterTableViaCatalogBaseTable(tablePath, newCatalogTable, hiveTable, hiveConf);
+			} else {
+				alterTableViaProperties(
+						op,
+						hiveTable,
+						(CatalogTable) newCatalogTable,
+						hiveTable.getParameters(),
+						newCatalogTable.getProperties(),
+						hiveTable.getSd());
+			}
 		}
 
+		disallowChangeIsGeneric(isGeneric, isGenericForGet(hiveTable.getParameters()));
 		try {
-			client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable);
+			client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
 		} catch (TException e) {
-			throw new CatalogException(String.format("Failed to rename table %s", tablePath.getFullName()), e);
+			throw new CatalogException(String.format("Failed to alter table %s", tablePath.getFullName()), e);
 		}
 	}
 
@@ -638,78 +653,6 @@ public class HiveCatalog extends AbstractCatalog {
 		}
 	}
 
-	@VisibleForTesting
-	protected static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, HiveConf hiveConf) {
-		if (!(table instanceof CatalogTableImpl) && !(table instanceof CatalogViewImpl)) {
-			throw new CatalogException(
-					"HiveCatalog only supports CatalogTableImpl and CatalogViewImpl");
-		}
-		// let Hive set default parameters for us, e.g. serialization.format
-		Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
-			tablePath.getObjectName());
-		hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
-
-		Map<String, String> properties = new HashMap<>(table.getProperties());
-		// Table comment
-		if (table.getComment() != null) {
-			properties.put(HiveCatalogConfig.COMMENT, table.getComment());
-		}
-
-		boolean isGeneric = isGenericForCreate(properties);
-
-		// Hive table's StorageDescriptor
-		StorageDescriptor sd = hiveTable.getSd();
-		HiveTableUtil.setDefaultStorageFormat(sd, hiveConf);
-
-		if (isGeneric) {
-			DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
-			tableSchemaProps.putTableSchema(Schema.SCHEMA, table.getSchema());
-
-			if (table instanceof CatalogTable) {
-				tableSchemaProps.putPartitionKeys(((CatalogTable) table).getPartitionKeys());
-			}
-
-			properties.putAll(tableSchemaProps.asMap());
-			properties = maskFlinkProperties(properties);
-			hiveTable.setParameters(properties);
-		} else {
-			HiveTableUtil.initiateTableFromProperties(hiveTable, properties, hiveConf);
-			List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
-			// Table columns and partition keys
-			if (table instanceof CatalogTableImpl) {
-				CatalogTable catalogTable = (CatalogTableImpl) table;
-
-				if (catalogTable.isPartitioned()) {
-					int partitionKeySize = catalogTable.getPartitionKeys().size();
-					List<FieldSchema> regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize);
-					List<FieldSchema> partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
-
-					sd.setCols(regularColumns);
-					hiveTable.setPartitionKeys(partitionColumns);
-				} else {
-					sd.setCols(allColumns);
-					hiveTable.setPartitionKeys(new ArrayList<>());
-				}
-			} else {
-				sd.setCols(allColumns);
-			}
-			// Table properties
-			hiveTable.getParameters().putAll(properties);
-		}
-
-		if (table instanceof CatalogViewImpl) {
-			// TODO: [FLINK-12398] Support partitioned view in catalog API
-			hiveTable.setPartitionKeys(new ArrayList<>());
-
-			CatalogView view = (CatalogView) table;
-			hiveTable.setViewOriginalText(view.getOriginalQuery());
-			hiveTable.setViewExpandedText(view.getExpandedQuery());
-			hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
-		}
-
-		return hiveTable;
-	}
-
 	/**
 	 * Filter out Hive-created properties, and return Flink-created properties.
 	 * Note that 'is_generic' is a special key and this method will leave it as-is.
@@ -720,19 +663,6 @@ public class HiveCatalog extends AbstractCatalog {
 			.collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
 	}
 
-	/**
-	 * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
-	 * Note that 'is_generic' is a special key and this method will leave it as-is.
-	 */
-	private static Map<String, String> maskFlinkProperties(Map<String, String> properties) {
-		return properties.entrySet().stream()
-			.filter(e -> e.getKey() != null && e.getValue() != null)
-			.map(e -> new Tuple2<>(
-				e.getKey().equals(CatalogConfig.IS_GENERIC) ? e.getKey() : FLINK_PROPERTY_PREFIX + e.getKey(),
-				e.getValue()))
-			.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
-	}
-
 	// ------ partitions ------
 
 	@Override
@@ -888,7 +818,7 @@ public class HiveCatalog extends AbstractCatalog {
 
 			Map<String, String> properties = hivePartition.getParameters();
 
-			properties.put(HiveCatalogConfig.PARTITION_LOCATION, hivePartition.getSd().getLocation());
+			properties.put(SqlCreateHiveTable.TABLE_LOCATION_URI, hivePartition.getSd().getLocation());
 
 			String comment = properties.remove(HiveCatalogConfig.COMMENT);
 
@@ -919,21 +849,28 @@ public class HiveCatalog extends AbstractCatalog {
 		try {
 			Table hiveTable = getHiveTable(tablePath);
 			ensureTableAndPartitionMatch(hiveTable, newPartition);
-			Partition oldHivePartition = getHivePartition(hiveTable, partitionSpec);
-			if (oldHivePartition == null) {
+			Partition hivePartition = getHivePartition(hiveTable, partitionSpec);
+			if (hivePartition == null) {
 				if (ignoreIfNotExists) {
 					return;
 				}
 				throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
 			}
-			Partition newHivePartition = instantiateHivePartition(hiveTable, partitionSpec, newPartition);
-			if (newHivePartition.getSd().getLocation() == null) {
-				newHivePartition.getSd().setLocation(oldHivePartition.getSd().getLocation());
+			AlterTableOp op = HiveTableUtil.extractAlterTableOp(newPartition.getProperties());
+			if (op == null) {
+				throw new CatalogException(ALTER_TABLE_OP + " is missing for alter table operation");
 			}
+			alterTableViaProperties(
+					op,
+					null,
+					null,
+					hivePartition.getParameters(),
+					newPartition.getProperties(),
+					hivePartition.getSd());
 			client.alter_partition(
 				tablePath.getDatabaseName(),
 				tablePath.getObjectName(),
-				newHivePartition
+				hivePartition
 			);
 		} catch (NoSuchObjectException e) {
 			if (!ignoreIfNotExists) {
@@ -973,10 +910,13 @@ public class HiveCatalog extends AbstractCatalog {
 		}
 		// TODO: handle GenericCatalogPartition
 		StorageDescriptor sd = hiveTable.getSd().deepCopy();
-		sd.setLocation(catalogPartition.getProperties().remove(HiveCatalogConfig.PARTITION_LOCATION));
+		sd.setLocation(catalogPartition.getProperties().remove(SqlCreateHiveTable.TABLE_LOCATION_URI));
 
 		Map<String, String> properties = new HashMap<>(catalogPartition.getProperties());
-		properties.put(HiveCatalogConfig.COMMENT, catalogPartition.getComment());
+		String comment = catalogPartition.getComment();
+		if (comment != null) {
+			properties.put(HiveCatalogConfig.COMMENT, comment);
+		}
 
 		return HiveTableUtil.createHivePartition(
 				hiveTable.getDbName(),
@@ -1051,7 +991,8 @@ public class HiveCatalog extends AbstractCatalog {
 		return getHivePartition(getHiveTable(tablePath), partitionSpec);
 	}
 
-	private Partition getHivePartition(Table hiveTable, CatalogPartitionSpec partitionSpec)
+	@VisibleForTesting
+	public Partition getHivePartition(Table hiveTable, CatalogPartitionSpec partitionSpec)
 			throws PartitionSpecInvalidException, TException {
 		return client.getPartition(hiveTable.getDbName(), hiveTable.getTableName(),
 			getOrderedFullPartitionValues(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()),
@@ -1234,7 +1175,7 @@ public class HiveCatalog extends AbstractCatalog {
 		);
 	}
 
-	private boolean isTablePartitioned(Table hiveTable) {
+	private static boolean isTablePartitioned(Table hiveTable) {
 		return hiveTable.getPartitionKeysSize() != 0;
 	}
 
@@ -1425,7 +1366,7 @@ public class HiveCatalog extends AbstractCatalog {
 		}
 	}
 
-	static boolean isGenericForCreate(Map<String, String> properties) {
+	public static boolean isGenericForCreate(Map<String, String> properties) {
 		// When creating an object, a hive object needs explicitly have a key is_generic = false
 		// otherwise, this is a generic object if 1) the key is missing 2) is_generic = true
 		// this is opposite to reading an object. See getObjectIsGeneric().
@@ -1443,11 +1384,62 @@ public class HiveCatalog extends AbstractCatalog {
 		return isGeneric;
 	}
 
-	static boolean isGenericForGet(Map<String, String> properties) {
+	public static boolean isGenericForGet(Map<String, String> properties) {
 		// When retrieving an object, a generic object needs explicitly have a key is_generic = true
 		// otherwise, this is a Hive object if 1) the key is missing 2) is_generic = false
 		// this is opposite to creating an object. See createObjectIsGeneric()
 		return properties != null && Boolean.parseBoolean(properties.getOrDefault(CatalogConfig.IS_GENERIC, "false"));
 	}
 
+	public static void disallowChangeIsGeneric(boolean oldIsGeneric, boolean newIsGeneric) {
+		checkArgument(oldIsGeneric == newIsGeneric, "Changing whether a metadata object is generic is not allowed");
+	}
+
+	private void alterTableViaProperties(
+			AlterTableOp alterOp,
+			Table hiveTable,
+			CatalogTable catalogTable,
+			Map<String, String> oldProps,
+			Map<String, String> newProps,
+			StorageDescriptor sd) {
+		switch (alterOp) {
+			case CHANGE_TBL_PROPS:
+				oldProps.putAll(newProps);
+				break;
+			case CHANGE_LOCATION:
+				HiveTableUtil.extractLocation(sd, newProps);
+				break;
+			case CHANGE_FILE_FORMAT:
+				String newFileFormat = newProps.remove(STORED_AS_FILE_FORMAT);
+				HiveTableUtil.setStorageFormat(sd, newFileFormat, hiveConf);
+				break;
+			case CHANGE_SERDE_PROPS:
+				HiveTableUtil.extractRowFormat(sd, newProps);
+				break;
+			case ALTER_COLUMNS:
+				if (hiveTable == null) {
+					throw new CatalogException("ALTER COLUMNS cannot be done with ALTER PARTITION");
+				}
+
+				HiveTableUtil.alterColumns(hiveTable.getSd(), catalogTable);
+				boolean cascade = Boolean.parseBoolean(newProps.remove(ALTER_COL_CASCADE));
+				if (cascade) {
+					if (!isTablePartitioned(hiveTable)) {
+						throw new CatalogException("ALTER COLUMNS CASCADE for non-partitioned table");
+					}
+					try {
+						for (CatalogPartitionSpec spec : listPartitions(new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()))) {
+							Partition partition = getHivePartition(hiveTable, spec);
+							HiveTableUtil.alterColumns(partition.getSd(), catalogTable);
+							client.alter_partition(hiveTable.getDbName(), hiveTable.getTableName(), partition);
+						}
+					} catch (Exception e) {
+						throw new CatalogException("Failed to cascade add/replace columns to partitions", e);
+					}
+				}
+				break;
+			default:
+				throw new CatalogException("Unsupported alter table operation " + alterOp);
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index 656a93a..a390fbd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -18,10 +18,24 @@
 
 package org.apache.flink.table.catalog.hive.util;
 
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable;
 import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.CatalogViewImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveCatalogConfig;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionVisitor;
@@ -54,6 +68,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_TABLE_OP;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_INFO_PROP_PREFIX;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_LIB_CLASS_NAME;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT;
@@ -61,6 +76,7 @@ import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableS
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_OUTPUT_FORMAT;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI;
+import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -273,6 +289,115 @@ public class HiveTableUtil {
 		setStorageFormat(sd, hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT), hiveConf);
 	}
 
+	public static void alterColumns(StorageDescriptor sd, CatalogTable catalogTable) {
+		List<FieldSchema> allCols = HiveTableUtil.createHiveColumns(catalogTable.getSchema());
+		List<FieldSchema> nonPartCols = allCols.subList(0, allCols.size() - catalogTable.getPartitionKeys().size());
+		sd.setCols(nonPartCols);
+	}
+
+	public static SqlAlterHiveTable.AlterTableOp extractAlterTableOp(Map<String, String> props) {
+		String opStr = props.remove(ALTER_TABLE_OP);
+		if (opStr != null) {
+			return SqlAlterHiveTable.AlterTableOp.valueOf(opStr);
+		}
+		return null;
+	}
+
+	public static Table alterTableViaCatalogBaseTable(
+			ObjectPath tablePath, CatalogBaseTable baseTable, Table oldHiveTable, HiveConf hiveConf) {
+		Table newHiveTable = instantiateHiveTable(tablePath, baseTable, hiveConf);
+		// client.alter_table() requires a valid location
+		// thus, if new table doesn't have that, it reuses location of the old table
+		if (!newHiveTable.getSd().isSetLocation()) {
+			newHiveTable.getSd().setLocation(oldHiveTable.getSd().getLocation());
+		}
+		return newHiveTable;
+	}
+
+	public static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, HiveConf hiveConf) {
+		if (!(table instanceof CatalogTableImpl) && !(table instanceof CatalogViewImpl)) {
+			throw new CatalogException(
+					"HiveCatalog only supports CatalogTableImpl and CatalogViewImpl");
+		}
+		// let Hive set default parameters for us, e.g. serialization.format
+		Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
+				tablePath.getObjectName());
+		hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
+
+		Map<String, String> properties = new HashMap<>(table.getProperties());
+		// Table comment
+		if (table.getComment() != null) {
+			properties.put(HiveCatalogConfig.COMMENT, table.getComment());
+		}
+
+		boolean isGeneric = HiveCatalog.isGenericForCreate(properties);
+
+		// Hive table's StorageDescriptor
+		StorageDescriptor sd = hiveTable.getSd();
+		HiveTableUtil.setDefaultStorageFormat(sd, hiveConf);
+
+		if (isGeneric) {
+			DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
+			tableSchemaProps.putTableSchema(Schema.SCHEMA, table.getSchema());
+
+			if (table instanceof CatalogTable) {
+				tableSchemaProps.putPartitionKeys(((CatalogTable) table).getPartitionKeys());
+			}
+
+			properties.putAll(tableSchemaProps.asMap());
+			properties = maskFlinkProperties(properties);
+			hiveTable.setParameters(properties);
+		} else {
+			HiveTableUtil.initiateTableFromProperties(hiveTable, properties, hiveConf);
+			List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
+			// Table columns and partition keys
+			if (table instanceof CatalogTableImpl) {
+				CatalogTable catalogTable = (CatalogTableImpl) table;
+
+				if (catalogTable.isPartitioned()) {
+					int partitionKeySize = catalogTable.getPartitionKeys().size();
+					List<FieldSchema> regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize);
+					List<FieldSchema> partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+					sd.setCols(regularColumns);
+					hiveTable.setPartitionKeys(partitionColumns);
+				} else {
+					sd.setCols(allColumns);
+					hiveTable.setPartitionKeys(new ArrayList<>());
+				}
+			} else {
+				sd.setCols(allColumns);
+			}
+			// Table properties
+			hiveTable.getParameters().putAll(properties);
+		}
+
+		if (table instanceof CatalogViewImpl) {
+			// TODO: [FLINK-12398] Support partitioned view in catalog API
+			hiveTable.setPartitionKeys(new ArrayList<>());
+
+			CatalogView view = (CatalogView) table;
+			hiveTable.setViewOriginalText(view.getOriginalQuery());
+			hiveTable.setViewExpandedText(view.getExpandedQuery());
+			hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
+		}
+
+		return hiveTable;
+	}
+
+	/**
+	 * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
+	 * Note that 'is_generic' is a special key and this method will leave it as-is.
+	 */
+	public static Map<String, String> maskFlinkProperties(Map<String, String> properties) {
+		return properties.entrySet().stream()
+				.filter(e -> e.getKey() != null && e.getValue() != null)
+				.map(e -> new Tuple2<>(
+						e.getKey().equals(CatalogConfig.IS_GENERIC) ? e.getKey() : FLINK_PROPERTY_PREFIX + e.getKey(),
+						e.getValue()))
+				.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+	}
+
 	private static class ExpressionExtractor implements ExpressionVisitor<String> {
 
 		// maps a supported function to its name
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
index e90691d..ede4c54 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
@@ -37,13 +39,18 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
 import org.junit.After;
 import org.junit.Assume;
@@ -53,6 +60,8 @@ import org.junit.Test;
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 
 import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
@@ -73,6 +82,7 @@ public class HiveDialectTest {
 	@Before
 	public void setup() {
 		hiveCatalog = HiveTestUtils.createHiveCatalog();
+		hiveCatalog.getHiveConf().setBoolVar(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, false);
 		hiveCatalog.open();
 		warehouse = hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
 		tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
@@ -101,7 +111,7 @@ public class HiveDialectTest {
 		String db2Location = warehouse + "/db2_location";
 		tableEnv.executeSql(String.format("create database db2 location '%s' with dbproperties('k1'='v1')", db2Location));
 		db = hiveCatalog.getHiveDatabase("db2");
-		assertEquals(db2Location, new URI(db.getLocationUri()).getPath());
+		assertEquals(db2Location, locationPath(db.getLocationUri()));
 		assertEquals("v1", db.getParameters().get("k1"));
 	}
 
@@ -217,12 +227,157 @@ public class HiveDialectTest {
 		assertEquals("[1,0,static, 1,1,a, 1,2,b, 1,3,c, 2,0,static, 2,1,b, 3,0,static, 3,1,c]", results.toString());
 	}
 
-	private static List<Row> queryResult(org.apache.flink.table.api.Table table) {
-		return Lists.newArrayList(table.execute().collect());
+	@Test
+	public void testAlterTable() throws Exception {
+		tableEnv.executeSql("create table tbl (x int) tblproperties('k1'='v1')");
+		tableEnv.executeSql("alter table tbl rename to tbl1");
+
+		ObjectPath tablePath = new ObjectPath("default", "tbl1");
+
+		// change properties
+		tableEnv.executeSql("alter table tbl1 set tblproperties ('k2'='v2')");
+		Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+		assertEquals("v1", hiveTable.getParameters().get("k1"));
+		assertEquals("v2", hiveTable.getParameters().get("k2"));
+
+		// change location
+		String newLocation = warehouse + "/tbl1_new_location";
+		tableEnv.executeSql(String.format("alter table `default`.tbl1 set location '%s'", newLocation));
+		hiveTable = hiveCatalog.getHiveTable(tablePath);
+		assertEquals(newLocation, locationPath(hiveTable.getSd().getLocation()));
+
+		// change file format
+		tableEnv.executeSql("alter table tbl1 set fileformat orc");
+		hiveTable = hiveCatalog.getHiveTable(tablePath);
+		assertEquals(OrcSerde.class.getName(), hiveTable.getSd().getSerdeInfo().getSerializationLib());
+		assertEquals(OrcInputFormat.class.getName(), hiveTable.getSd().getInputFormat());
+		assertEquals(OrcOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat());
+
+		// change serde
+		tableEnv.executeSql(String.format("alter table tbl1 set serde '%s' with serdeproperties('%s'='%s')",
+				LazyBinarySerDe.class.getName(), serdeConstants.FIELD_DELIM, "\u0001"));
+		hiveTable = hiveCatalog.getHiveTable(tablePath);
+		assertEquals(LazyBinarySerDe.class.getName(), hiveTable.getSd().getSerdeInfo().getSerializationLib());
+		assertEquals("\u0001", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.FIELD_DELIM));
+
+		// replace columns
+		tableEnv.executeSql("alter table tbl1 replace columns (t tinyint,s smallint,i int,b bigint,f float,d double,num decimal," +
+				"ts timestamp,dt date,str string,var varchar(10),ch char(123),bool boolean,bin binary)");
+		hiveTable = hiveCatalog.getHiveTable(tablePath);
+		assertEquals(14, hiveTable.getSd().getColsSize());
+		assertEquals("varchar(10)", hiveTable.getSd().getCols().get(10).getType());
+		assertEquals("char(123)", hiveTable.getSd().getCols().get(11).getType());
+
+		tableEnv.executeSql("alter table tbl1 replace columns (a array<array<int>>,s struct<f1:struct<f11:int,f12:binary>, f2:map<double,date>>," +
+				"m map<char(5),map<timestamp,decimal(20,10)>>)");
+		hiveTable = hiveCatalog.getHiveTable(tablePath);
+		assertEquals("array<array<int>>", hiveTable.getSd().getCols().get(0).getType());
+		assertEquals("struct<f1:struct<f11:int,f12:binary>,f2:map<double,date>>", hiveTable.getSd().getCols().get(1).getType());
+		assertEquals("map<char(5),map<timestamp,decimal(20,10)>>", hiveTable.getSd().getCols().get(2).getType());
+
+		// add columns
+		tableEnv.executeSql("alter table tbl1 add columns (x int,y int)");
+		hiveTable = hiveCatalog.getHiveTable(tablePath);
+		assertEquals(5, hiveTable.getSd().getColsSize());
+
+		// change column
+		tableEnv.executeSql("alter table tbl1 change column x x1 string comment 'new x col'");
+		hiveTable = hiveCatalog.getHiveTable(tablePath);
+		assertEquals(5, hiveTable.getSd().getColsSize());
+		FieldSchema newField = hiveTable.getSd().getCols().get(3);
+		assertEquals("x1", newField.getName());
+		assertEquals("string", newField.getType());
+
+		tableEnv.executeSql("alter table tbl1 change column y y int first");
+		hiveTable = hiveCatalog.getHiveTable(tablePath);
+		newField = hiveTable.getSd().getCols().get(0);
+		assertEquals("y", newField.getName());
+		assertEquals("int", newField.getType());
+
+		tableEnv.executeSql("alter table tbl1 change column x1 x2 timestamp after y");
+		hiveTable = hiveCatalog.getHiveTable(tablePath);
+		newField = hiveTable.getSd().getCols().get(1);
+		assertEquals("x2", newField.getName());
+		assertEquals("timestamp", newField.getType());
+
+		// add/replace columns cascade
+		tableEnv.executeSql("create table tbl2 (x int) partitioned by (dt date,id bigint)");
+		ObjectPath tablePath2 = new ObjectPath("default", "tbl2");
+		// TODO: use DDL to add partitions once we support it
+		CatalogPartitionSpec partitionSpec1 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
+			put("dt", "2020-01-23");
+			put("id", "1");
+		}});
+		CatalogPartitionSpec partitionSpec2 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
+			put("dt", "2020-04-24");
+			put("id", "2");
+		}});
+		hiveCatalog.createPartition(tablePath2, partitionSpec1, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
+		hiveCatalog.createPartition(tablePath2, partitionSpec2, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
+		tableEnv.executeSql("alter table tbl2 replace columns (ti tinyint,d decimal) cascade");
+		hiveTable = hiveCatalog.getHiveTable(tablePath2);
+		Partition hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
+		assertEquals(2, hivePartition.getSd().getColsSize());
+		hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec2);
+		assertEquals(2, hivePartition.getSd().getColsSize());
+
+		tableEnv.executeSql("alter table tbl2 add columns (ch char(5),vch varchar(9)) cascade");
+		hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
+		assertEquals(4, hivePartition.getSd().getColsSize());
+		hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec2);
+		assertEquals(4, hivePartition.getSd().getColsSize());
+
+		// change column cascade
+		tableEnv.executeSql("alter table tbl2 change column ch ch char(10) cascade");
+		hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
+		assertEquals("char(10)", hivePartition.getSd().getCols().get(2).getType());
+		hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec2);
+		assertEquals("char(10)", hivePartition.getSd().getCols().get(2).getType());
+
+		tableEnv.executeSql("alter table tbl2 change column vch str string first cascade");
+		hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
+		assertEquals("str", hivePartition.getSd().getCols().get(0).getName());
+		hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec2);
+		assertEquals("str", hivePartition.getSd().getCols().get(0).getName());
 	}
 
-	private static void waitForJobFinish(TableResult tableResult) throws Exception {
-		tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
+	@Test
+	public void testAlterPartition() throws Exception {
+		tableEnv.executeSql("create table tbl (x tinyint,y string) partitioned by (p1 bigint,p2 date)");
+		// TODO: use DDL to add partitions once we support it
+		CatalogPartitionSpec spec1 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
+			put("p1", "1000");
+			put("p2", "2020-05-01");
+		}});
+		CatalogPartitionSpec spec2 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
+			put("p1", "2000");
+			put("p2", "2020-01-01");
+		}});
+		ObjectPath tablePath = new ObjectPath("default", "tbl");
+		hiveCatalog.createPartition(tablePath, spec1, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
+		hiveCatalog.createPartition(tablePath, spec2, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
+
+		Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+
+		// change location
+		String location = warehouse + "/new_part_location";
+		tableEnv.executeSql(String.format("alter table tbl partition (p1=1000,p2='2020-05-01') set location '%s'", location));
+		Partition partition = hiveCatalog.getHivePartition(hiveTable, spec1);
+		assertEquals(location, locationPath(partition.getSd().getLocation()));
+
+		// change file format
+		tableEnv.executeSql("alter table tbl partition (p1=2000,p2='2020-01-01') set fileformat rcfile");
+		partition = hiveCatalog.getHivePartition(hiveTable, spec2);
+		assertEquals(LazyBinaryColumnarSerDe.class.getName(), partition.getSd().getSerdeInfo().getSerializationLib());
+		assertEquals(RCFileInputFormat.class.getName(), partition.getSd().getInputFormat());
+		assertEquals(RCFileOutputFormat.class.getName(), partition.getSd().getOutputFormat());
+
+		// change serde
+		tableEnv.executeSql(String.format("alter table tbl partition (p1=1000,p2='2020-05-01') set serde '%s' with serdeproperties('%s'='%s')",
+				LazyBinarySerDe.class.getName(), serdeConstants.LINE_DELIM, "\n"));
+		partition = hiveCatalog.getHivePartition(hiveTable, spec1);
+		assertEquals(LazyBinarySerDe.class.getName(), partition.getSd().getSerdeInfo().getSerializationLib());
+		assertEquals("\n", partition.getSd().getSerdeInfo().getParameters().get(serdeConstants.LINE_DELIM));
 	}
 
 	@Test
@@ -254,4 +409,12 @@ public class HiveDialectTest {
 	private static String locationPath(String locationURI) throws URISyntaxException {
 		return new URI(locationURI).getPath();
 	}
+
+	private static List<Row> queryResult(org.apache.flink.table.api.Table table) {
+		return Lists.newArrayList(table.execute().collect());
+	}
+
+	private static void waitForJobFinish(TableResult tableResult) throws Exception {
+		tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 015995f..88ef5f6 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -19,14 +19,17 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.AlterHiveDatabaseOp;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable;
 import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.CatalogTestUtil;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
@@ -55,6 +58,7 @@ import java.util.Map;
 import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -189,6 +193,32 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogMetadataTestBase {
 		hiveCatalog.dropDatabase(db1, false, true);
 	}
 
+	@Override
+	@Test
+	public void testAlterPartition() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createPartitionedTable(), false);
+		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+
+		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+		CatalogPartition cp = catalog.getPartition(path1, createPartitionSpec());
+		CatalogTestUtil.checkEquals(createPartition(), cp);
+		assertNull(cp.getProperties().get("k"));
+
+		CatalogPartition another = createPartition();
+		another.getProperties().put("k", "v");
+		another.getProperties().put(SqlAlterHiveTable.ALTER_TABLE_OP, SqlAlterHiveTable.AlterTableOp.CHANGE_TBL_PROPS.name());
+
+		catalog.alterPartition(path1, createPartitionSpec(), another, false);
+
+		assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+
+		cp = catalog.getPartition(path1, createPartitionSpec());
+
+		CatalogTestUtil.checkEquals(another, cp);
+		assertEquals("v", cp.getProperties().get("k"));
+	}
+
 	private void checkStatistics(int inputStat, int expectStat) throws Exception {
 		catalog.dropTable(path1, true);
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
index 44a5e5e..e67655e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.descriptors.FileSystem;
 
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -46,7 +47,7 @@ public class HiveCatalogTest {
 
 	@Test
 	public void testCreateGenericTable() {
-		Table hiveTable = HiveCatalog.instantiateHiveTable(
+		Table hiveTable = HiveTableUtil.instantiateHiveTable(
 			new ObjectPath("test", "test"),
 			new CatalogTableImpl(
 				schema,
@@ -66,7 +67,7 @@ public class HiveCatalogTest {
 
 		map.put(CatalogConfig.IS_GENERIC, String.valueOf(false));
 
-		Table hiveTable = HiveCatalog.instantiateHiveTable(
+		Table hiveTable = HiveTableUtil.instantiateHiveTable(
 			new ObjectPath("test", "test"),
 			new CatalogTableImpl(
 				schema,
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
index 52a9b9c..d19ff12 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
@@ -27,6 +27,13 @@
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseLocation"
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner"
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseProps"
+    "org.apache.flink.sql.parser.hive.ddl.SqlAlterHivePartitionRename"
+    "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableAddReplaceColumn"
+    "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableChangeColumn"
+    "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableFileFormat"
+    "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableLocation"
+    "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableProps"
+    "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableSerDe"
     "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase"
     "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable"
     "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableCreationContext"
@@ -502,6 +509,7 @@
     "SqlShowTables()"
     "SqlRichDescribeTable()"
     "SqlShowFunctions()"
+    "SqlAlterTable()"
   ]
 
   # List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index d6efd00..d1f83b9 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -771,9 +771,9 @@ SqlTypeNameSpec ExtendedSqlRowTypeName() :
 }
 
 /**
-* Parses a partition specifications statement for insert statement.
+* Parses a partition specifications statement that can contain both static and dynamic partitions.
 */
-void InsertPartitionSpec(SqlNodeList allPartKeys, SqlNodeList staticSpec) :
+void PartitionSpecCommaList(SqlNodeList allPartKeys, SqlNodeList staticSpec) :
 {
     SqlIdentifier key;
     SqlNode value;
@@ -861,7 +861,7 @@ SqlNode RichSqlInsert() :
         }
     ]
     [
-        <PARTITION> InsertPartitionSpec(allPartKeys, staticSpec)
+        <PARTITION> PartitionSpecCommaList(allPartKeys, staticSpec)
     ]
     source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) {
         return new RichSqlHiveInsert(s.end(source), keywordList, extendedKeywordList, table, source,
@@ -1090,3 +1090,175 @@ SqlCreate SqlCreateCatalog(Span s, boolean replace) :
             propertyList);
     }
 }
+
+// make sure a feature only applies to table, not partitions
+void EnsureAlterTableOnly(SqlNodeList partitionSpec, String feature) :
+{
+}
+{
+  {
+    if (partitionSpec != null) {
+      throw new ParseException(feature + " is not applicable to partitions.");
+    }
+  }
+}
+
+SqlAlterTable SqlAlterTable() :
+{
+    SqlParserPos startPos;
+    SqlIdentifier tableIdentifier;
+    SqlIdentifier newTableIdentifier = null;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+    SqlNodeList partitionSpec = null;
+    boolean ifNotExists = false;
+    boolean ifExists = false;
+}
+{
+    <ALTER> <TABLE> { startPos = getPos(); }
+        tableIdentifier = CompoundIdentifier()
+    [ <PARTITION> { partitionSpec = new SqlNodeList(getPos()); PartitionSpecCommaList(new SqlNodeList(getPos()), partitionSpec); } ]
+    (
+        <RENAME> <TO>
+        (
+            <PARTITION>
+            {
+              SqlNodeList newPartSpec = new SqlNodeList(getPos());
+              PartitionSpecCommaList(new SqlNodeList(getPos()), newPartSpec);
+              return new SqlAlterHivePartitionRename(startPos.plus(getPos()), tableIdentifier, partitionSpec, newPartSpec);
+            }
+        |
+            newTableIdentifier = CompoundIdentifier()
+            {
+              return new SqlAlterTableRename(
+                          startPos.plus(getPos()),
+                          tableIdentifier,
+                          newTableIdentifier);
+            }
+        )
+    |
+        <ADD> <COLUMNS>
+        {
+            EnsureAlterTableOnly(partitionSpec, "Add columns");
+            return SqlAlterHiveTableAddReplaceColumn(startPos, tableIdentifier, false);
+        }
+    |
+        <REPLACE> <COLUMNS>
+        {
+            EnsureAlterTableOnly(partitionSpec, "Replace columns");
+            return SqlAlterHiveTableAddReplaceColumn(startPos, tableIdentifier, true);
+        }
+    |
+        <CHANGE> [ <COLUMN> ]
+        {
+            EnsureAlterTableOnly(partitionSpec, "Change column");
+            return SqlAlterHiveTableChangeColumn(startPos, tableIdentifier);
+        }
+    |
+        <SET>
+        (
+            <TBLPROPERTIES>
+            { EnsureAlterTableOnly(partitionSpec, "Alter table properties"); }
+            propertyList = TableProperties()
+            {
+                return new SqlAlterHiveTableProps(
+                            startPos.plus(getPos()),
+                            tableIdentifier,
+                            propertyList);
+            }
+        |
+            <LOCATION> <QUOTED_STRING>
+            {
+              SqlCharStringLiteral location = createStringLiteral(token.image, getPos());
+              return new SqlAlterHiveTableLocation(startPos.plus(getPos()), tableIdentifier, partitionSpec, location);
+            }
+        |
+            <FILEFORMAT>
+            {
+                SqlIdentifier format = SimpleIdentifier();
+                return new SqlAlterHiveTableFileFormat(startPos.plus(getPos()), tableIdentifier, partitionSpec, format);
+            }
+        |
+            { return SqlAlterHiveTableSerDe(startPos, tableIdentifier, partitionSpec); }
+        )
+    )
+}
+
+SqlAlterTable SqlAlterHiveTableAddReplaceColumn(SqlParserPos startPos, SqlIdentifier tableIdentifier, boolean replace) :
+{
+  SqlNodeList colList;
+  boolean cascade = false;
+}
+{
+  <LPAREN>
+    {
+      List<SqlNode> cols = new ArrayList();
+    }
+    TableColumn2(cols)
+    (
+      <COMMA> TableColumn2(cols)
+    )*
+  <RPAREN>
+  [
+    <CASCADE> { cascade = true; }
+    |
+    <RESTRICT>
+  ]
+  {
+    colList = new SqlNodeList(cols, startPos.plus(getPos()));
+    return new SqlAlterHiveTableAddReplaceColumn(startPos.plus(getPos()),
+                                                 tableIdentifier,
+                                                 cascade,
+                                                 colList,
+                                                 replace);
+  }
+}
+
+SqlAlterTable SqlAlterHiveTableChangeColumn(SqlParserPos startPos, SqlIdentifier tableIdentifier) :
+{
+  boolean cascade = false;
+  SqlIdentifier oldName;
+  SqlIdentifier newName;
+  SqlDataTypeSpec newType;
+  SqlCharStringLiteral comment = null;
+  boolean first = false;
+  SqlIdentifier after = null;
+  SqlNodeList partSpec = null;
+}
+{
+  oldName = SimpleIdentifier()
+  newName = SimpleIdentifier()
+  newType = ExtendedDataType()
+  [ <COMMENT> <QUOTED_STRING> {
+      comment = createStringLiteral(token.image, getPos());
+  }]
+  [ <FIRST> { first = true; } ]
+  [ <AFTER> { after = SimpleIdentifier(); } ]
+  [
+    <CASCADE> { cascade = true; }
+    |
+    <RESTRICT>
+  ]
+  { return new SqlAlterHiveTableChangeColumn(startPos.plus(getPos()),
+                                             tableIdentifier,
+                                             cascade,
+                                             oldName,
+                                             new SqlTableColumn(newName, newType, null, comment, newName.getParserPosition()),
+                                             first,
+                                             after); }
+}
+
+SqlAlterTable SqlAlterHiveTableSerDe(SqlParserPos startPos, SqlIdentifier tableIdentifier, SqlNodeList partitionSpec) :
+{
+  SqlCharStringLiteral serdeLib = null;
+  SqlNodeList propertyList = null;
+}
+{
+  (
+    <SERDE> <QUOTED_STRING>
+    { serdeLib = createStringLiteral(token.image, getPos()); propertyList = new SqlNodeList(getPos()); }
+    [ <WITH> <SERDEPROPERTIES> { propertyList = TableProperties(); } ]
+    |
+    <SERDEPROPERTIES> { propertyList = TableProperties(); }
+  )
+  { return new SqlAlterHiveTableSerDe(startPos.plus(getPos()), tableIdentifier, partitionSpec, propertyList, serdeLib); }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
index bfbd33a..934e76a 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.Set;
 
 import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_TABLE_OP;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase.DATABASE_LOCATION_URI;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_INFO_PROP_PREFIX;
 import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_LIB_CLASS_NAME;
@@ -72,7 +73,7 @@ public class HiveDDLUtils {
 	static {
 		RESERVED_DB_PROPERTIES.addAll(Arrays.asList(ALTER_DATABASE_OP, DATABASE_LOCATION_URI));
 
-		RESERVED_TABLE_PROPERTIES.addAll(Arrays.asList(TABLE_LOCATION_URI,
+		RESERVED_TABLE_PROPERTIES.addAll(Arrays.asList(ALTER_TABLE_OP, TABLE_LOCATION_URI,
 				TABLE_IS_EXTERNAL, PK_CONSTRAINT_TRAIT, NOT_NULL_CONSTRAINT_TRAITS,
 				STORED_AS_FILE_FORMAT, STORED_AS_INPUT_FORMAT, STORED_AS_OUTPUT_FORMAT, SERDE_LIB_CLASS_NAME));
 
@@ -302,14 +303,18 @@ public class HiveDDLUtils {
 	public static SqlNodeList deepCopyColList(SqlNodeList colList) {
 		SqlNodeList res = new SqlNodeList(colList.getParserPosition());
 		for (SqlNode node : colList) {
-			SqlTableColumn col = (SqlTableColumn) node;
-			res.add(new SqlTableColumn(
-					col.getName(),
-					col.getType(),
-					col.getConstraint().orElse(null),
-					col.getComment().orElse(null),
-					col.getParserPosition()));
+			res.add(deepCopyTableColumn((SqlTableColumn) node));
 		}
 		return res;
 	}
+
+	public static SqlTableColumn deepCopyTableColumn(SqlTableColumn column) {
+		return new SqlTableColumn(
+				column.getName(),
+				column.getType(),
+				column.getConstraint().orElse(null),
+				column.getComment().orElse(null),
+				column.getParserPosition()
+		);
+	}
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHivePartitionRename.java
similarity index 56%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
copy to flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHivePartitionRename.java
index 8c5412c..68d6461 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHivePartitionRename.java
@@ -16,7 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.sql.parser.ddl;
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTable;
+import org.apache.flink.sql.parser.hive.impl.ParseException;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
@@ -25,52 +28,43 @@ import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.ImmutableNullableList;
 
-import java.util.List;
+import javax.annotation.Nonnull;
 
-import static java.util.Objects.requireNonNull;
+import java.util.List;
 
 /**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName SET ( name=value [, name=value]*).
+ * ALTER TABLE DDL to change the specifications of a Hive partition.
  */
-public class SqlAlterTableProperties extends SqlAlterTable {
+public class SqlAlterHivePartitionRename extends SqlAlterTable {
 
-	private final SqlNodeList propertyList;
+	private final SqlNodeList newPartSpec;
 
-	public SqlAlterTableProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
-		super(pos, tableName);
-		this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
+	public SqlAlterHivePartitionRename(SqlParserPos pos, SqlIdentifier tableName,
+			SqlNodeList partSpec, SqlNodeList newPartSpec) throws ParseException {
+		super(pos, tableName, partSpec);
+		if (partSpec == null || newPartSpec == null) {
+			throw new ParseException("Both old and new partition spec have to be specified");
+		}
+		this.newPartSpec = newPartSpec;
 	}
 
+	@Nonnull
 	@Override
 	public List<SqlNode> getOperandList() {
-		return ImmutableNullableList.of(tableIdentifier, propertyList);
-	}
-
-	public SqlNodeList getPropertyList() {
-		return propertyList;
+		return ImmutableNullableList.of(tableIdentifier, getPartitionSpec(), newPartSpec);
 	}
 
 	@Override
 	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
 		super.unparse(writer, leftPrec, rightPrec);
-		writer.keyword("SET");
-		SqlWriter.Frame withFrame = writer.startList("(", ")");
-		for (SqlNode property : propertyList) {
-			printIndent(writer);
-			property.unparse(writer, leftPrec, rightPrec);
-		}
 		writer.newlineAndIndent();
-		writer.endList(withFrame);
-	}
-
-	private void printIndent(SqlWriter writer) {
-		writer.sep(",", false);
+		writer.keyword("RENAME TO");
 		writer.newlineAndIndent();
-		writer.print("  ");
+		writer.keyword("PARTITION");
+		newPartSpec.unparse(writer, getOperator().getLeftPrec(), getOperator().getRightPrec());
 	}
 
-	public String[] fullTableName() {
-		return tableIdentifier.names.toArray(new String[0]);
+	public SqlNodeList getNewPartSpec() {
+		return newPartSpec;
 	}
-
 }
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTable.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTable.java
new file mode 100644
index 0000000..499c949
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableProperties;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Abstract class for ALTER DDL of a Hive table.
+ * Any ALTER TABLE operations that need to be encoded as table properties should extend this class.
+ */
+public abstract class SqlAlterHiveTable extends SqlAlterTableProperties {
+
+	public static final String ALTER_TABLE_OP = "alter.table.op";
+	public static final String ALTER_COL_CASCADE = "alter.column.cascade";
+
+	public SqlAlterHiveTable(AlterTableOp op, SqlParserPos pos, SqlIdentifier tableName,
+			SqlNodeList partSpec, SqlNodeList propertyList) {
+		super(pos, tableName, partSpec, propertyList);
+		propertyList.add(HiveDDLUtils.toTableOption(ALTER_TABLE_OP, op.name(), pos));
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword("ALTER TABLE");
+		tableIdentifier.unparse(writer, leftPrec, rightPrec);
+		SqlNodeList partitionSpec = getPartitionSpec();
+		if (partitionSpec != null && partitionSpec.size() > 0) {
+			writer.keyword("PARTITION");
+			partitionSpec.unparse(writer, getOperator().getLeftPrec(), getOperator().getRightPrec());
+		}
+	}
+
+	/**
+	 * Type of ALTER TABLE operation.
+	 */
+	public enum AlterTableOp {
+		CHANGE_TBL_PROPS,
+		CHANGE_SERDE_PROPS,
+		CHANGE_FILE_FORMAT,
+		CHANGE_LOCATION,
+		ALTER_COLUMNS
+	}
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableAddReplaceColumn.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableAddReplaceColumn.java
new file mode 100644
index 0000000..87b1213
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableAddReplaceColumn.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
+import org.apache.flink.sql.parser.hive.impl.ParseException;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * ALTER DDL to ADD or REPLACE columns for a Hive table.
+ */
+public class SqlAlterHiveTableAddReplaceColumn extends SqlAddReplaceColumns {
+
+	private final SqlNodeList origColumns;
+	private final boolean cascade;
+
+	public SqlAlterHiveTableAddReplaceColumn(SqlParserPos pos, SqlIdentifier tableName,
+			boolean cascade, SqlNodeList columns, boolean replace) throws ParseException {
+		super(pos, tableName, columns, replace, new SqlNodeList(pos));
+		this.origColumns = HiveDDLUtils.deepCopyColList(columns);
+		HiveDDLUtils.convertDataTypes(columns);
+		this.cascade = cascade;
+		// set ALTER OP
+		getProperties().add(HiveDDLUtils.toTableOption(
+				SqlAlterHiveTable.ALTER_TABLE_OP, SqlAlterHiveTable.AlterTableOp.ALTER_COLUMNS.name(), pos));
+		// set cascade
+		if (cascade) {
+			getProperties().add(HiveDDLUtils.toTableOption(SqlAlterHiveTable.ALTER_COL_CASCADE, "true", pos));
+		}
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword("ALTER TABLE");
+		tableIdentifier.unparse(writer, leftPrec, rightPrec);
+		SqlNodeList partitionSpec = getPartitionSpec();
+		if (partitionSpec != null && partitionSpec.size() > 0) {
+			writer.keyword("PARTITION");
+			partitionSpec.unparse(writer, getOperator().getLeftPrec(), getOperator().getRightPrec());
+		}
+		if (isReplace()) {
+			writer.keyword("REPLACE");
+		} else {
+			writer.keyword("ADD");
+		}
+		writer.keyword("COLUMNS");
+		SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
+		for (SqlNode column : origColumns) {
+			printIndent(writer);
+			column.unparse(writer, leftPrec, rightPrec);
+		}
+		writer.newlineAndIndent();
+		writer.endList(frame);
+		if (cascade) {
+			writer.keyword("CASCADE");
+		} else {
+			writer.keyword("RESTRICT");
+		}
+	}
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableChangeColumn.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableChangeColumn.java
new file mode 100644
index 0000000..413518f
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableChangeColumn.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.hive.impl.ParseException;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * ALTER DDL to change a column's name, type, position, etc.
+ */
+public class SqlAlterHiveTableChangeColumn extends SqlChangeColumn {
+
+	private final SqlTableColumn origNewColumn;
+	private final boolean cascade;
+
+	public SqlAlterHiveTableChangeColumn(SqlParserPos pos, SqlIdentifier tableName, boolean cascade,
+			SqlIdentifier oldName, SqlTableColumn newColumn, boolean first, SqlIdentifier after) throws ParseException {
+		super(pos, tableName, oldName, newColumn, after, first, new SqlNodeList(pos));
+		this.origNewColumn = HiveDDLUtils.deepCopyTableColumn(newColumn);
+		HiveDDLUtils.convertDataTypes(newColumn);
+		this.cascade = cascade;
+		// set ALTER OP
+		getProperties().add(HiveDDLUtils.toTableOption(
+				SqlAlterHiveTable.ALTER_TABLE_OP, SqlAlterHiveTable.AlterTableOp.ALTER_COLUMNS.name(), pos));
+		// set cascade
+		if (cascade) {
+			getProperties().add(HiveDDLUtils.toTableOption(SqlAlterHiveTable.ALTER_COL_CASCADE, "true", pos));
+		}
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword("ALTER TABLE");
+		tableIdentifier.unparse(writer, leftPrec, rightPrec);
+		SqlNodeList partitionSpec = getPartitionSpec();
+		if (partitionSpec != null && partitionSpec.size() > 0) {
+			writer.keyword("PARTITION");
+			partitionSpec.unparse(writer, getOperator().getLeftPrec(), getOperator().getRightPrec());
+		}
+		writer.keyword("CHANGE COLUMN");
+		getOldName().unparse(writer, leftPrec, rightPrec);
+		origNewColumn.unparse(writer, leftPrec, rightPrec);
+		if (isFirst()) {
+			writer.keyword("FIRST");
+		}
+		if (getAfter() != null) {
+			writer.keyword("AFTER");
+			getAfter().unparse(writer, leftPrec, rightPrec);
+		}
+		if (cascade) {
+			writer.keyword("CASCADE");
+		} else {
+			writer.keyword("RESTRICT");
+		}
+	}
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableFileFormat.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableFileFormat.java
new file mode 100644
index 0000000..2041ca3
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableFileFormat.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_FILE_FORMAT;
+
+/**
+ * ALTER TABLE DDL to change a Hive table/partition's file format.
+ */
+public class SqlAlterHiveTableFileFormat extends SqlAlterHiveTable {
+
+	private final SqlIdentifier format;
+
+	public SqlAlterHiveTableFileFormat(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partSpec, SqlIdentifier format) {
+		super(CHANGE_FILE_FORMAT, pos, tableName, partSpec, createPropList(format));
+		this.format = format;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		super.unparse(writer, leftPrec, rightPrec);
+		writer.keyword("SET FILEFORMAT");
+		format.unparse(writer, leftPrec, rightPrec);
+	}
+
+	private static SqlNodeList createPropList(SqlIdentifier format) {
+		SqlNodeList res = new SqlNodeList(format.getParserPosition());
+		res.add(HiveDDLUtils.toTableOption(
+				SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT,
+				format.getSimple(),
+				format.getParserPosition()));
+		return res;
+	}
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableLocation.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableLocation.java
new file mode 100644
index 0000000..f3a470d
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableLocation.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_LOCATION;
+
+/**
+ * ALTER TABLE DDL to change a Hive table/partition's location.
+ */
+public class SqlAlterHiveTableLocation extends SqlAlterHiveTable {
+
+	private final SqlCharStringLiteral location;
+
+	public SqlAlterHiveTableLocation(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partitionSpec,
+			SqlCharStringLiteral location) {
+		super(CHANGE_LOCATION, pos, tableName, partitionSpec, createPropList(location));
+		this.location = location;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		super.unparse(writer, leftPrec, rightPrec);
+		writer.keyword("SET LOCATION");
+		location.unparse(writer, leftPrec, rightPrec);
+	}
+
+	private static SqlNodeList createPropList(SqlCharStringLiteral location) {
+		SqlNodeList res = new SqlNodeList(location.getParserPosition());
+		res.add(HiveDDLUtils.toTableOption(
+				SqlCreateHiveTable.TABLE_LOCATION_URI,
+				location, location.getParserPosition()));
+		return res;
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableProps.java
similarity index 56%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
copy to flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableProps.java
index 8c5412c..324d588 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableProps.java
@@ -16,61 +16,43 @@
  * limitations under the License.
  */
 
-package org.apache.flink.sql.parser.ddl;
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.hive.impl.ParseException;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
-
-import java.util.List;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_TBL_PROPS;
 
 /**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName SET ( name=value [, name=value]*).
+ * ALTER DDL to change properties of a Hive table.
  */
-public class SqlAlterTableProperties extends SqlAlterTable {
+public class SqlAlterHiveTableProps extends SqlAlterHiveTable {
 
-	private final SqlNodeList propertyList;
-
-	public SqlAlterTableProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
-		super(pos, tableName);
-		this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
-	}
-
-	@Override
-	public List<SqlNode> getOperandList() {
-		return ImmutableNullableList.of(tableIdentifier, propertyList);
-	}
+	private final SqlNodeList origProps;
 
-	public SqlNodeList getPropertyList() {
-		return propertyList;
+	public SqlAlterHiveTableProps(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList)
+			throws ParseException {
+		super(CHANGE_TBL_PROPS, pos, tableName, null, HiveDDLUtils.checkReservedTableProperties(propertyList));
+		// remove the last property which is the ALTER_TABLE_OP
+		this.origProps = new SqlNodeList(propertyList.getList().subList(0, propertyList.size() - 1),
+				propertyList.getParserPosition());
 	}
 
 	@Override
 	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
 		super.unparse(writer, leftPrec, rightPrec);
-		writer.keyword("SET");
+		writer.keyword("SET TBLPROPERTIES");
 		SqlWriter.Frame withFrame = writer.startList("(", ")");
-		for (SqlNode property : propertyList) {
+		for (SqlNode property : getPropertyList()) {
 			printIndent(writer);
 			property.unparse(writer, leftPrec, rightPrec);
 		}
 		writer.newlineAndIndent();
 		writer.endList(withFrame);
 	}
-
-	private void printIndent(SqlWriter writer) {
-		writer.sep(",", false);
-		writer.newlineAndIndent();
-		writer.print("  ");
-	}
-
-	public String[] fullTableName() {
-		return tableIdentifier.names.toArray(new String[0]);
-	}
-
 }
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableSerDe.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableSerDe.java
new file mode 100644
index 0000000..f9ce8d2
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableSerDe.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat;
+import org.apache.flink.sql.parser.hive.impl.ParseException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_SERDE_PROPS;
+
+/**
+ * ALTER TABLE DDL to change a Hive table's SerDe.
+ */
+public class SqlAlterHiveTableSerDe extends SqlAlterHiveTable {
+
+	private final SqlCharStringLiteral serdeLib;
+	private final SqlNodeList origSerDeProps;
+
+	public SqlAlterHiveTableSerDe(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partitionSpec,
+			SqlNodeList propertyList, SqlCharStringLiteral serdeLib) throws ParseException {
+		super(CHANGE_SERDE_PROPS, pos, tableName, partitionSpec, HiveDDLUtils.checkReservedTableProperties(propertyList));
+		// remove the last property which is the ALTER_TABLE_OP
+		origSerDeProps = new SqlNodeList(propertyList.getList().subList(0, propertyList.size() - 1),
+				propertyList.getParserPosition());
+		appendPrefix(getPropertyList());
+		if (serdeLib != null) {
+			propertyList.add(HiveDDLUtils.toTableOption(
+					HiveTableRowFormat.SERDE_LIB_CLASS_NAME, serdeLib, serdeLib.getParserPosition()));
+		}
+		this.serdeLib = serdeLib;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		super.unparse(writer, leftPrec, rightPrec);
+		writer.keyword("SET");
+		if (serdeLib != null) {
+			writer.keyword("SERDE");
+			serdeLib.unparse(writer, leftPrec, rightPrec);
+		}
+		if (origSerDeProps != null && origSerDeProps.size() > 0) {
+			if (serdeLib == null) {
+				writer.keyword("SERDEPROPERTIES");
+			} else {
+				writer.keyword("WITH SERDEPROPERTIES");
+			}
+			SqlWriter.Frame withFrame = writer.startList("(", ")");
+			for (SqlNode property : origSerDeProps) {
+				printIndent(writer);
+				property.unparse(writer, leftPrec, rightPrec);
+			}
+			writer.newlineAndIndent();
+			writer.endList(withFrame);
+		}
+	}
+
+	private static SqlNodeList appendPrefix(SqlNodeList propList) {
+		if (propList != null) {
+			for (int i = 0; i < propList.size(); i++) {
+				SqlTableOption tableOption = (SqlTableOption) propList.get(i);
+				if (!tableOption.getKeyString().equals(ALTER_TABLE_OP)) {
+					String key = HiveTableRowFormat.SERDE_INFO_PROP_PREFIX + tableOption.getKeyString();
+					tableOption = HiveDDLUtils.toTableOption(key, tableOption.getValue(), tableOption.getParserPosition());
+					propList.set(i, tableOption);
+				}
+			}
+		}
+		return propList;
+	}
+}
diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index 61a5fc0..9bc3de7 100644
--- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -264,4 +264,78 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest {
 	public void testDescribeCatalog() {
 		sql("describe catalog cat").ok("DESCRIBE CATALOG `CAT`");
 	}
+
+	@Test
+	public void testAlterTableRename() {
+		sql("alter table tbl rename to tbl1").ok("ALTER TABLE `TBL` RENAME TO `TBL1`");
+	}
+
+	@Test
+	public void testAlterTableSerDe() {
+		sql("alter table tbl set serde 'serde.class' with serdeproperties ('field.delim'='\u0001')")
+				.ok("ALTER TABLE `TBL` SET SERDE 'serde.class' WITH SERDEPROPERTIES (\n" +
+						"  'field.delim' = u&'\\0001'\n" +
+						")");
+		sql("alter table tbl set serdeproperties('line.delim'='\n')")
+				.ok("ALTER TABLE `TBL` SET SERDEPROPERTIES (\n" +
+						"  'line.delim' = '\n" +
+						"'\n" +
+						")");
+	}
+
+	@Test
+	public void testAlterTableLocation() {
+		sql("alter table tbl set location '/new/table/path'")
+				.ok("ALTER TABLE `TBL` SET LOCATION '/new/table/path'");
+		sql("alter table tbl partition (p1=1,p2='a') set location '/new/partition/location'")
+				.ok("ALTER TABLE `TBL` PARTITION (`P1` = 1, `P2` = 'a') SET LOCATION '/new/partition/location'");
+	}
+
+	// TODO: support ALTER CLUSTERED BY, SKEWED, STORED AS DIRECTORIES, column constraints
+
+	@Test
+	public void testAlterPartitionRename() {
+		sql("alter table tbl partition (p=1) rename to partition (p=2)")
+				.ok("ALTER TABLE `TBL` PARTITION (`P` = 1)\n" +
+						"RENAME TO\n" +
+						"PARTITION (`P` = 2)");
+	}
+
+	// TODO: support EXCHANGE PARTITION, RECOVER PARTITIONS
+
+	// TODO: support (UN)ARCHIVE PARTITION
+
+	@Test
+	public void testAlterFileFormat() {
+		sql("alter table tbl set fileformat rcfile")
+				.ok("ALTER TABLE `TBL` SET FILEFORMAT `RCFILE`");
+		sql("alter table tbl partition (p=1) set fileformat sequencefile")
+				.ok("ALTER TABLE `TBL` PARTITION (`P` = 1) SET FILEFORMAT `SEQUENCEFILE`");
+	}
+
+	// TODO: support ALTER TABLE/PARTITION TOUCH, PROTECTION, COMPACT, CONCATENATE, UPDATE COLUMNS
+
+	@Test
+	public void testChangeColumn() {
+		sql("alter table tbl change c c1 struct<f0:timestamp,f1:array<char(5)>> restrict")
+				.ok("ALTER TABLE `TBL` CHANGE COLUMN `C` `C1`  STRUCT< `F0` TIMESTAMP, `F1` ARRAY< CHAR(5) > > RESTRICT");
+		sql("alter table tbl change column c c decimal(5,2) comment 'new comment' first cascade")
+				.ok("ALTER TABLE `TBL` CHANGE COLUMN `C` `C`  DECIMAL(5, 2)  COMMENT 'new comment' FIRST CASCADE");
+	}
+
+	@Test
+	public void testAddReplaceColumn() {
+		sql("alter table tbl add columns (a float,b timestamp,c binary) cascade")
+				.ok("ALTER TABLE `TBL` ADD COLUMNS (\n" +
+						"  `A`  FLOAT,\n" +
+						"  `B`  TIMESTAMP,\n" +
+						"  `C`  BINARY\n" +
+						") CASCADE");
+		sql("alter table tbl replace columns (a char(100),b tinyint comment 'tiny comment',c smallint) restrict")
+				.ok("ALTER TABLE `TBL` REPLACE COLUMNS (\n" +
+						"  `A`  CHAR(100),\n" +
+						"  `B`  TINYINT  COMMENT 'tiny comment',\n" +
+						"  `C`  SMALLINT\n" +
+						") RESTRICT");
+	}
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionUtils.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionUtils.java
new file mode 100644
index 0000000..5a2abac
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.LinkedHashMap;
+
+/**
+ * Utils methods for partition DDLs.
+ */
+public class SqlPartitionUtils {
+
+	private SqlPartitionUtils() {
+	}
+
+	/** Get static partition key value pair as strings.
+	 *
+	 * <p>For character literals we return the unquoted and unescaped values.
+	 * For other types we use {@link SqlLiteral#toString()} to get
+	 * the string format of the value literal.
+	 *
+	 * @return the mapping of column names to values of partition specifications,
+	 * returns an empty map if there is no partition specifications.
+	 */
+	public static LinkedHashMap<String, String> getPartitionKVs(SqlNodeList partitionSpec) {
+		if (partitionSpec == null) {
+			return null;
+		}
+		LinkedHashMap<String, String> ret = new LinkedHashMap<>();
+		if (partitionSpec.size() == 0) {
+			return ret;
+		}
+		for (SqlNode node : partitionSpec.getList()) {
+			SqlProperty sqlProperty = (SqlProperty) node;
+			Comparable comparable = SqlLiteral.value(sqlProperty.getValue());
+			String value = comparable instanceof NlsString ? ((NlsString) comparable).getValue() : comparable.toString();
+			ret.put(sqlProperty.getKey().getSimple(), value);
+		}
+		return ret;
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
similarity index 53%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
index 8c5412c..b6eefba 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
@@ -25,52 +25,73 @@ import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.ImmutableNullableList;
 
-import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
-import static java.util.Objects.requireNonNull;
+import java.util.List;
 
 /**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName SET ( name=value [, name=value]*).
+ * ALTER DDL to ADD or REPLACE columns for a table.
  */
-public class SqlAlterTableProperties extends SqlAlterTable {
+public class SqlAddReplaceColumns extends SqlAlterTable {
 
-	private final SqlNodeList propertyList;
+	private final SqlNodeList newColumns;
+	// Whether to replace all the existing columns. If false, new columns will be appended to the end of the schema.
+	private final boolean replace;
+	// properties that should be added to the table
+	private final SqlNodeList properties;
 
-	public SqlAlterTableProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
+	public SqlAddReplaceColumns(
+			SqlParserPos pos,
+			SqlIdentifier tableName,
+			SqlNodeList newColumns,
+			boolean replace,
+			@Nullable SqlNodeList properties) {
 		super(pos, tableName);
-		this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
+		this.newColumns = newColumns;
+		this.replace = replace;
+		this.properties = properties;
 	}
 
-	@Override
-	public List<SqlNode> getOperandList() {
-		return ImmutableNullableList.of(tableIdentifier, propertyList);
+	public SqlNodeList getNewColumns() {
+		return newColumns;
+	}
+
+	public boolean isReplace() {
+		return replace;
+	}
+
+	public SqlNodeList getProperties() {
+		return properties;
 	}
 
-	public SqlNodeList getPropertyList() {
-		return propertyList;
+	@Nonnull
+	@Override
+	public List<SqlNode> getOperandList() {
+		return ImmutableNullableList.of(tableIdentifier, partitionSpec, newColumns, properties);
 	}
 
 	@Override
 	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
 		super.unparse(writer, leftPrec, rightPrec);
-		writer.keyword("SET");
-		SqlWriter.Frame withFrame = writer.startList("(", ")");
-		for (SqlNode property : propertyList) {
+		if (replace) {
+			writer.keyword("REPLACE");
+		} else {
+			writer.keyword("ADD");
+		}
+		writer.keyword("COLUMNS");
+		SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
+		for (SqlNode column : newColumns) {
 			printIndent(writer);
-			property.unparse(writer, leftPrec, rightPrec);
+			column.unparse(writer, leftPrec, rightPrec);
 		}
 		writer.newlineAndIndent();
-		writer.endList(withFrame);
+		writer.endList(frame);
 	}
 
-	private void printIndent(SqlWriter writer) {
+	protected void printIndent(SqlWriter writer) {
 		writer.sep(",", false);
 		writer.newlineAndIndent();
 		writer.print("  ");
 	}
-
-	public String[] fullTableName() {
-		return tableIdentifier.names.toArray(new String[0]);
-	}
-
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
index b279953..9560626 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
@@ -18,14 +18,21 @@
 
 package org.apache.flink.sql.parser.ddl;
 
+import org.apache.flink.sql.parser.SqlPartitionUtils;
+
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashMap;
+
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -36,12 +43,21 @@ public abstract class SqlAlterTable extends SqlCall {
 	public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ALTER TABLE", SqlKind.ALTER_TABLE);
 
 	protected final SqlIdentifier tableIdentifier;
+	protected final SqlNodeList partitionSpec;
 
 	public SqlAlterTable(
 			SqlParserPos pos,
-			SqlIdentifier tableName) {
+			SqlIdentifier tableName,
+			@Nullable SqlNodeList partitionSpec) {
 		super(pos);
 		this.tableIdentifier = requireNonNull(tableName, "tableName should not be null");
+		this.partitionSpec = partitionSpec;
+	}
+
+	public SqlAlterTable(
+			SqlParserPos pos,
+			SqlIdentifier tableName) {
+		this(pos, tableName, null);
 	}
 
 	@Override
@@ -57,9 +73,28 @@ public abstract class SqlAlterTable extends SqlCall {
 	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
 		writer.keyword("ALTER TABLE");
 		tableIdentifier.unparse(writer, leftPrec, rightPrec);
+		SqlNodeList partitionSpec = getPartitionSpec();
+		if (partitionSpec != null && partitionSpec.size() > 0) {
+			writer.keyword("PARTITION");
+			partitionSpec.unparse(writer, getOperator().getLeftPrec(), getOperator().getRightPrec());
+		}
 	}
 
 	public String[] fullTableName() {
 		return tableIdentifier.names.toArray(new String[0]);
 	}
+
+	/**
+	 * Returns the partition spec if the ALTER should be applied to partitions, and null otherwise.
+	 */
+	public SqlNodeList getPartitionSpec() {
+		return partitionSpec;
+	}
+
+	/**
+	 * Get partition spec as key-value strings.
+	 */
+	public LinkedHashMap<String, String> getPartitionKVs() {
+		return SqlPartitionUtils.getPartitionKVs(getPartitionSpec());
+	}
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
index 8c5412c..d18a37e 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
@@ -37,7 +37,11 @@ public class SqlAlterTableProperties extends SqlAlterTable {
 	private final SqlNodeList propertyList;
 
 	public SqlAlterTableProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
-		super(pos, tableName);
+		this(pos, tableName, null, propertyList);
+	}
+
+	public SqlAlterTableProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partitionSpec, SqlNodeList propertyList) {
+		super(pos, tableName, partitionSpec);
 		this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
 	}
 
@@ -63,7 +67,7 @@ public class SqlAlterTableProperties extends SqlAlterTable {
 		writer.endList(withFrame);
 	}
 
-	private void printIndent(SqlWriter writer) {
+	protected void printIndent(SqlWriter writer) {
 		writer.sep(",", false);
 		writer.newlineAndIndent();
 		writer.print("  ");
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java
new file mode 100644
index 0000000..8f84771
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * ALTER DDL to CHANGE a column for a table.
+ */
+public class SqlChangeColumn extends SqlAlterTable {
+
+	private final SqlIdentifier oldName;
+	private final SqlTableColumn newColumn;
+
+	// Specify the position of the new column. If neither after nor first is set, the column is changed in place.
+	// the name of the column after which the new column should be placed
+	private final SqlIdentifier after;
+	// whether the new column should be placed as the first column of the schema
+	private final boolean first;
+
+	// properties that should be added to the table
+	private final SqlNodeList properties;
+
+	public SqlChangeColumn(
+			SqlParserPos pos,
+			SqlIdentifier tableName,
+			SqlIdentifier oldName,
+			SqlTableColumn newColumn,
+			@Nullable SqlIdentifier after,
+			boolean first,
+			@Nullable SqlNodeList properties) {
+		super(pos, tableName);
+		if (after != null && first) {
+			throw new IllegalArgumentException("FIRST and AFTER cannot be set at the same time");
+		}
+		this.oldName = oldName;
+		this.newColumn = newColumn;
+		this.after = after;
+		this.first = first;
+		this.properties = properties;
+	}
+
+	public SqlIdentifier getOldName() {
+		return oldName;
+	}
+
+	public SqlTableColumn getNewColumn() {
+		return newColumn;
+	}
+
+	public SqlIdentifier getAfter() {
+		return after;
+	}
+
+	public boolean isFirst() {
+		return first;
+	}
+
+	public SqlNodeList getProperties() {
+		return properties;
+	}
+
+	@Nonnull
+	@Override
+	public List<SqlNode> getOperandList() {
+		return ImmutableNullableList.of(tableIdentifier, partitionSpec, oldName, newColumn, after);
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		super.unparse(writer, leftPrec, rightPrec);
+		writer.keyword("CHANGE COLUMN");
+		oldName.unparse(writer, leftPrec, rightPrec);
+		newColumn.unparse(writer, leftPrec, rightPrec);
+		if (first) {
+			writer.keyword("FIST");
+		}
+		if (after != null) {
+			writer.keyword("AFTER");
+			after.unparse(writer, leftPrec, rightPrec);
+		}
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index f1aab30..cd34b6b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -92,11 +92,13 @@ import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableOperation;
 import org.apache.flink.table.operations.ddl.AlterTablePropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
+import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
 import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
@@ -821,6 +823,17 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 							dropConstraintOperation.getTableIdentifier().toObjectPath(),
 							newTable,
 							false);
+				} else if (alterTableOperation instanceof AlterPartitionPropertiesOperation) {
+					AlterPartitionPropertiesOperation alterPartPropsOp = (AlterPartitionPropertiesOperation) operation;
+					catalog.alterPartition(alterPartPropsOp.getTableIdentifier().toObjectPath(),
+							alterPartPropsOp.getPartitionSpec(),
+							alterPartPropsOp.getCatalogPartition(),
+							false);
+				} else if (alterTableOperation instanceof AlterTableSchemaOperation) {
+					AlterTableSchemaOperation alterTableSchemaOperation = (AlterTableSchemaOperation) alterTableOperation;
+					catalog.alterTable(alterTableSchemaOperation.getTableIdentifier().toObjectPath(),
+							alterTableSchemaOperation.getCatalogTable(),
+							false);
 				}
 				return TableResultImpl.TABLE_RESULT_OK;
 			} catch (TableAlreadyExistException | TableNotExistException e) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 73ed2a7..c3c0e7a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.util.StringUtils;
@@ -328,6 +329,25 @@ public final class CatalogManager {
 		return Optional.empty();
 	}
 
+	/**
+	 * Retrieves a partition with a fully qualified table path and partition spec.
+	 * If the path is not yet fully qualified use{@link #qualifyIdentifier(UnresolvedIdentifier)} first.
+	 *
+	 * @param tableIdentifier full path of the table to retrieve
+	 * @param partitionSpec full partition spec
+	 * @return partition in the table.
+	 */
+	public Optional<CatalogPartition> getPartition(ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) {
+		Catalog catalog = catalogs.get(tableIdentifier.getCatalogName());
+		if (catalog != null) {
+			try {
+				return Optional.of(catalog.getPartition(tableIdentifier.toObjectPath(), partitionSpec));
+			} catch (PartitionNotExistException ignored) {
+			}
+		}
+		return Optional.empty();
+	}
+
 	private Optional<TableLookupResult> getPermanentTable(ObjectIdentifier objectIdentifier)
 			throws TableNotExistException {
 		Catalog currentCatalog = catalogs.get(objectIdentifier.getCatalogName());
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
index feece6e..7f98646 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Arrays;
@@ -108,6 +109,17 @@ public class OperationUtils {
 		return stringBuilder.toString();
 	}
 
+	public static String formatProperties(Map<String, String> properties) {
+		return properties.entrySet().stream()
+				.map(entry -> formatParameter(entry.getKey(), entry.getValue()))
+				.collect(Collectors.joining(", "));
+	}
+
+	public static String formatPartitionSpec(CatalogPartitionSpec spec) {
+		return spec.getPartitionSpec().entrySet().stream()
+				.map(entry -> entry.getKey() + "=" + entry.getValue()).collect(Collectors.joining(", "));
+	}
+
 	private OperationUtils() {
 	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java
new file mode 100644
index 0000000..5376618
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/**
+ * Abstract Operation to describe all ALTER TABLE statements that should be applied to partitions.
+ */
+public abstract class AlterPartitionOperation extends AlterTableOperation {
+
+	protected final CatalogPartitionSpec partitionSpec;
+
+	public AlterPartitionOperation(ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) {
+		super(tableIdentifier);
+		this.partitionSpec = partitionSpec;
+	}
+
+	public CatalogPartitionSpec getPartitionSpec() {
+		return partitionSpec;
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionPropertiesOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionPropertiesOperation.java
new file mode 100644
index 0000000..267ec7d
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionPropertiesOperation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+/**
+ * Operation to alter the properties of partition.
+ */
+public class AlterPartitionPropertiesOperation extends AlterPartitionOperation {
+
+	private final CatalogPartition catalogPartition;
+
+	public AlterPartitionPropertiesOperation(ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec,
+			CatalogPartition catalogPartition) {
+		super(tableIdentifier, partitionSpec);
+		this.catalogPartition = catalogPartition;
+	}
+
+	public CatalogPartition getCatalogPartition() {
+		return catalogPartition;
+	}
+
+	@Override
+	public String asSummaryString() {
+		String spec = OperationUtils.formatPartitionSpec(partitionSpec);
+		String properties = OperationUtils.formatProperties(catalogPartition.getProperties());
+		return String.format("ALTER TABLE %s PARTITION (%s) SET (%s)", tableIdentifier.asSummaryString(), spec, properties);
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java
new file mode 100644
index 0000000..8f8c468
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/**
+ * Operation to describe altering the schema of a table.
+ */
+public class AlterTableSchemaOperation extends AlterTableOperation {
+
+	// the CatalogTable with the updated schema
+	private final CatalogTable catalogTable;
+
+	public AlterTableSchemaOperation(ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+		super(tableIdentifier);
+		this.catalogTable = catalogTable;
+	}
+
+	public CatalogTable getCatalogTable() {
+		return catalogTable;
+	}
+
+	@Override
+	public String asSummaryString() {
+		return String.format("ALTER TABLE %s SET SCHEMA %s",
+				tableIdentifier.asSummaryString(), catalogTable.getSchema().toString());
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 11dece9..3dcdb39 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.operations;
 
+import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
 import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
 import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
 import org.apache.flink.sql.parser.ddl.SqlAlterTable;
@@ -25,6 +26,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableProperties;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
+import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
 import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
 import org.apache.flink.sql.parser.ddl.SqlCreateDatabase;
 import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
@@ -49,13 +51,16 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
 import org.apache.flink.table.catalog.FunctionLanguage;
@@ -77,6 +82,7 @@ import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTablePropertiesOperation;
@@ -93,6 +99,7 @@ import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.util.StringUtils;
@@ -115,6 +122,7 @@ import org.apache.calcite.sql.parser.SqlParser;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -231,75 +239,66 @@ public class SqlToOperationConverter {
 	private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
 		UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlAlterTable.fullTableName());
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+		Optional<CatalogManager.TableLookupResult> optionalCatalogTable = catalogManager.getTable(tableIdentifier);
+		if (!optionalCatalogTable.isPresent() || optionalCatalogTable.get().isTemporary()) {
+			throw new ValidationException(String.format("Table %s doesn't exist or is a temporary table.",
+					tableIdentifier.toString()));
+		}
+		CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
 		if (sqlAlterTable instanceof SqlAlterTableRename) {
 			UnresolvedIdentifier newUnresolvedIdentifier =
 				UnresolvedIdentifier.of(((SqlAlterTableRename) sqlAlterTable).fullNewTableName());
 			ObjectIdentifier newTableIdentifier = catalogManager.qualifyIdentifier(newUnresolvedIdentifier);
 			return new AlterTableRenameOperation(tableIdentifier, newTableIdentifier);
 		} else if (sqlAlterTable instanceof SqlAlterTableProperties) {
-			Optional<CatalogManager.TableLookupResult> optionalCatalogTable = catalogManager.getTable(tableIdentifier);
-			if (optionalCatalogTable.isPresent() && !optionalCatalogTable.get().isTemporary()) {
-				CatalogTable originalCatalogTable = (CatalogTable) optionalCatalogTable.get().getTable();
-				Map<String, String> properties = new HashMap<>(originalCatalogTable.getOptions());
-				((SqlAlterTableProperties) sqlAlterTable).getPropertyList().getList().forEach(p ->
-					properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
-				CatalogTable catalogTable = new CatalogTableImpl(
-					originalCatalogTable.getSchema(),
-					originalCatalogTable.getPartitionKeys(),
-					properties,
-					originalCatalogTable.getComment());
-				return new AlterTablePropertiesOperation(tableIdentifier, catalogTable);
-			} else {
-				throw new ValidationException(String.format("Table %s doesn't exist or is a temporary table.",
-					tableIdentifier.toString()));
-			}
+			return convertAlterTableProperties(
+					tableIdentifier,
+					(CatalogTable) baseTable,
+					(SqlAlterTableProperties) sqlAlterTable);
 		} else if (sqlAlterTable instanceof SqlAlterTableAddConstraint) {
-			Optional<CatalogManager.TableLookupResult> optionalCatalogTable =
-					catalogManager.getTable(tableIdentifier);
-			if (optionalCatalogTable.isPresent() && !optionalCatalogTable.get().isTemporary()) {
-				SqlTableConstraint constraint = ((SqlAlterTableAddConstraint) sqlAlterTable)
-						.getConstraint();
-				validateTableConstraint(constraint);
-				TableSchema oriSchema = optionalCatalogTable.get().getTable().getSchema();
-				// Sanity check for constraint.
-				TableSchema.Builder builder = TableSchemaUtils.builderWithGivenSchema(oriSchema);
-				if (constraint.getConstraintName().isPresent()) {
-					builder.primaryKey(
-							constraint.getConstraintName().get(),
-							constraint.getColumnNames());
-				} else {
-					builder.primaryKey(constraint.getColumnNames());
-				}
-				builder.build();
-				return new AlterTableAddConstraintOperation(
-						tableIdentifier,
-						constraint.getConstraintName().orElse(null),
+			SqlTableConstraint constraint = ((SqlAlterTableAddConstraint) sqlAlterTable)
+					.getConstraint();
+			validateTableConstraint(constraint);
+			TableSchema oriSchema = baseTable.getSchema();
+			// Sanity check for constraint.
+			TableSchema.Builder builder = TableSchemaUtils.builderWithGivenSchema(oriSchema);
+			if (constraint.getConstraintName().isPresent()) {
+				builder.primaryKey(
+						constraint.getConstraintName().get(),
 						constraint.getColumnNames());
 			} else {
-				throw new ValidationException(String.format("Table %s doesn't exist or is a temporary table.",
-						tableIdentifier.toString()));
+				builder.primaryKey(constraint.getColumnNames());
 			}
+			builder.build();
+			return new AlterTableAddConstraintOperation(
+					tableIdentifier,
+					constraint.getConstraintName().orElse(null),
+					constraint.getColumnNames());
 		} else if (sqlAlterTable instanceof SqlAlterTableDropConstraint) {
-			Optional<CatalogManager.TableLookupResult> optionalCatalogTable =
-					catalogManager.getTable(tableIdentifier);
-			if (optionalCatalogTable.isPresent() && !optionalCatalogTable.get().isTemporary()) {
-				SqlAlterTableDropConstraint dropConstraint = ((SqlAlterTableDropConstraint) sqlAlterTable);
-				String constraintName = dropConstraint.getConstraintName().getSimple();
-				CatalogTable oriCatalogTable = (CatalogTable) optionalCatalogTable.get().getTable();
-				TableSchema oriSchema = oriCatalogTable.getSchema();
-				if (!oriSchema.getPrimaryKey()
-						.filter(pk -> pk.getName().equals(constraintName))
-						.isPresent()) {
-					throw new ValidationException(
-							String.format("CONSTRAINT [%s] does not exist", constraintName));
-				}
-				return new AlterTableDropConstraintOperation(
-						tableIdentifier,
-						constraintName);
-			} else {
-				throw new ValidationException(String.format("Table %s doesn't exist or is a temporary table.",
-						tableIdentifier.toString()));
+			SqlAlterTableDropConstraint dropConstraint = ((SqlAlterTableDropConstraint) sqlAlterTable);
+			String constraintName = dropConstraint.getConstraintName().getSimple();
+			TableSchema oriSchema = baseTable.getSchema();
+			if (!oriSchema.getPrimaryKey()
+					.filter(pk -> pk.getName().equals(constraintName))
+					.isPresent()) {
+				throw new ValidationException(
+						String.format("CONSTRAINT [%s] does not exist", constraintName));
 			}
+			return new AlterTableDropConstraintOperation(
+					tableIdentifier,
+					constraintName);
+		} else if (sqlAlterTable instanceof SqlAddReplaceColumns) {
+			return OperationConverterUtils.convertAddReplaceColumns(
+					tableIdentifier,
+					(SqlAddReplaceColumns) sqlAlterTable,
+					(CatalogTable) baseTable,
+					flinkPlanner.getOrCreateSqlValidator());
+		} else if (sqlAlterTable instanceof SqlChangeColumn) {
+			return OperationConverterUtils.convertChangeColumn(
+					tableIdentifier,
+					(SqlChangeColumn) sqlAlterTable,
+					(CatalogTable) baseTable,
+					flinkPlanner.getOrCreateSqlValidator());
 		} else {
 			throw new ValidationException(
 					String.format("[%s] needs to implement",
@@ -307,6 +306,29 @@ public class SqlToOperationConverter {
 		}
 	}
 
+	private Operation convertAlterTableProperties(ObjectIdentifier tableIdentifier, CatalogTable oldTable,
+			SqlAlterTableProperties alterTableProperties) {
+		LinkedHashMap<String, String> partitionKVs = alterTableProperties.getPartitionKVs();
+		// it's altering partitions
+		if (partitionKVs != null) {
+			CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(partitionKVs);
+			CatalogPartition catalogPartition = catalogManager.getPartition(tableIdentifier, partitionSpec)
+					.orElseThrow(() -> new ValidationException(String.format("Partition %s of table %s doesn't exist",
+							partitionSpec.getPartitionSpec(), tableIdentifier)));
+			Map<String, String> newProps = new HashMap<>(catalogPartition.getProperties());
+			newProps.putAll(OperationConverterUtils.extractProperties(alterTableProperties.getPropertyList()));
+			return new AlterPartitionPropertiesOperation(
+					tableIdentifier,
+					partitionSpec,
+					new CatalogPartitionImpl(newProps, catalogPartition.getComment()));
+		} else {
+			// it's altering a table
+			Map<String, String> newProperties = new HashMap<>(oldTable.getOptions());
+			newProperties.putAll(OperationConverterUtils.extractProperties(alterTableProperties.getPropertyList()));
+			return new AlterTablePropertiesOperation(tableIdentifier, oldTable.copy(newProperties));
+		}
+	}
+
 	/** Convert CREATE FUNCTION statement. */
 	private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
 		UnresolvedIdentifier unresolvedIdentifier =
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
new file mode 100644
index 0000000..d36bd6f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
+import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utils methods for converting sql to operations.
+ */
+public class OperationConverterUtils {
+
+	private OperationConverterUtils() {
+	}
+
+	public static Operation convertAddReplaceColumns(
+			ObjectIdentifier tableIdentifier,
+			SqlAddReplaceColumns addReplaceColumns,
+			CatalogTable catalogTable,
+			SqlValidator sqlValidator) {
+		// This is only used by the Hive dialect at the moment. In Hive, only non-partition columns can be
+		// added/replaced and users will only define non-partition columns in the new column list. Therefore, we require
+		// that partitions columns must appear last in the schema (which is inline with Hive). Otherwise, we won't be
+		// able to determine the column positions after the non-partition columns are replaced.
+		TableSchema oldSchema = catalogTable.getSchema();
+		int numPartCol = catalogTable.getPartitionKeys().size();
+		Set<String> lastCols = oldSchema.getTableColumns()
+				.subList(oldSchema.getFieldCount() - numPartCol, oldSchema.getFieldCount())
+				.stream().map(TableColumn::getName).collect(Collectors.toSet());
+		if (!lastCols.equals(new HashSet<>(catalogTable.getPartitionKeys()))) {
+			throw new ValidationException("ADD/REPLACE COLUMNS on partitioned tables requires partition columns to appear last");
+		}
+
+		// set non-partition columns
+		TableSchema.Builder builder = TableSchema.builder();
+		if (!addReplaceColumns.isReplace()) {
+			List<TableColumn> nonPartCols = oldSchema.getTableColumns().subList(0, oldSchema.getFieldCount() - numPartCol);
+			for (TableColumn column : nonPartCols) {
+				builder.add(column);
+			}
+			setWatermarkAndPK(builder, catalogTable.getSchema());
+		}
+		for (SqlNode sqlNode : addReplaceColumns.getNewColumns()) {
+			builder.add(toTableColumn((SqlTableColumn) sqlNode, sqlValidator));
+		}
+
+		// set partition columns
+		List<TableColumn> partCols = oldSchema.getTableColumns().subList(oldSchema.getFieldCount() - numPartCol, oldSchema.getFieldCount());
+		for (TableColumn column : partCols) {
+			builder.add(column);
+		}
+
+		// set properties
+		Map<String, String> newProperties = new HashMap<>(catalogTable.getOptions());
+		newProperties.putAll(extractProperties(addReplaceColumns.getProperties()));
+
+		return new AlterTableSchemaOperation(
+				tableIdentifier,
+				new CatalogTableImpl(
+						builder.build(),
+						catalogTable.getPartitionKeys(),
+						newProperties,
+						catalogTable.getComment())
+		);
+	}
+
+	public static Operation convertChangeColumn(
+			ObjectIdentifier tableIdentifier,
+			SqlChangeColumn changeColumn,
+			CatalogTable catalogTable,
+			SqlValidator sqlValidator) {
+		String oldName = changeColumn.getOldName().getSimple();
+		if (catalogTable.getPartitionKeys().indexOf(oldName) >= 0) {
+			// disallow changing partition columns
+			throw new ValidationException("CHANGE COLUMN cannot be applied to partition columns");
+		}
+		TableSchema oldSchema = catalogTable.getSchema();
+		int oldIndex = Arrays.asList(oldSchema.getFieldNames()).indexOf(oldName);
+		if (oldIndex < 0) {
+			throw new ValidationException(String.format("Old column %s not found for CHANGE COLUMN", oldName));
+		}
+		boolean first = changeColumn.isFirst();
+		String after = changeColumn.getAfter() == null ? null : changeColumn.getAfter().getSimple();
+		List<TableColumn> tableColumns = oldSchema.getTableColumns();
+		TableColumn newTableColumn = toTableColumn(changeColumn.getNewColumn(), sqlValidator);
+		if ((!first && after == null) || oldName.equals(after)) {
+			tableColumns.set(oldIndex, newTableColumn);
+		} else {
+			// need to change column position
+			tableColumns.remove(oldIndex);
+			if (first) {
+				tableColumns.add(0, newTableColumn);
+			} else {
+				int newIndex = tableColumns
+						.stream()
+						.map(TableColumn::getName)
+						.collect(Collectors.toList())
+						.indexOf(after);
+				if (newIndex < 0) {
+					throw new ValidationException(String.format("After column %s not found for CHANGE COLUMN", after));
+				}
+				tableColumns.add(newIndex + 1, newTableColumn);
+			}
+		}
+		TableSchema.Builder builder = TableSchema.builder();
+		for (TableColumn column : tableColumns) {
+			builder.add(column);
+		}
+		setWatermarkAndPK(builder, oldSchema);
+		TableSchema newSchema = builder.build();
+		Map<String, String> newProperties = new HashMap<>(catalogTable.getOptions());
+		newProperties.putAll(extractProperties(changeColumn.getProperties()));
+		return new AlterTableSchemaOperation(
+				tableIdentifier,
+				new CatalogTableImpl(
+						newSchema,
+						catalogTable.getPartitionKeys(),
+						newProperties,
+						catalogTable.getComment()));
+		// TODO: handle watermark and constraints
+	}
+
+	private static TableColumn toTableColumn(SqlTableColumn sqlTableColumn, SqlValidator sqlValidator) {
+		String name = sqlTableColumn.getName().getSimple();
+		SqlDataTypeSpec typeSpec = sqlTableColumn.getType();
+		LogicalType logicalType = FlinkTypeFactory.toLogicalType(
+				typeSpec.deriveType(sqlValidator, typeSpec.getNullable()));
+		DataType dataType = TypeConversions.fromLogicalToDataType(logicalType);
+		return TableColumn.of(name, dataType);
+	}
+
+	private static void setWatermarkAndPK(TableSchema.Builder builder, TableSchema schema) {
+		for (WatermarkSpec watermarkSpec : schema.getWatermarkSpecs()) {
+			builder.watermark(watermarkSpec);
+		}
+		schema.getPrimaryKey().ifPresent(pk -> {
+			builder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0]));
+		});
+	}
+
+	public static Map<String, String> extractProperties(SqlNodeList propList) {
+		Map<String, String> properties = new HashMap<>();
+		if (propList != null) {
+			propList.getList().forEach(p ->
+					properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
+		}
+		return properties;
+	}
+}