You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/17 02:11:19 UTC
[flink] branch master updated: [FLINK-17448][sql-parser-hive]
Implement alter DDL for Hive dialect
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2bbf8ed [FLINK-17448][sql-parser-hive] Implement alter DDL for Hive dialect
2bbf8ed is described below
commit 2bbf8ed32a7fc29dd5f0c941f451bd3a7a6d0d1b
Author: Rui Li <li...@apache.org>
AuthorDate: Sun May 17 10:10:27 2020 +0800
[FLINK-17448][sql-parser-hive] Implement alter DDL for Hive dialect
This closes #12108
---
.../flink/table/catalog/hive/HiveCatalog.java | 210 ++++++++++-----------
.../table/catalog/hive/util/HiveTableUtil.java | 125 ++++++++++++
.../flink/connectors/hive/HiveDialectTest.java | 173 ++++++++++++++++-
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 30 +++
.../flink/table/catalog/hive/HiveCatalogTest.java | 5 +-
.../src/main/codegen/data/Parser.tdd | 8 +
.../src/main/codegen/includes/parserImpls.ftl | 178 ++++++++++++++++-
.../flink/sql/parser/hive/ddl/HiveDDLUtils.java | 21 ++-
.../hive/ddl/SqlAlterHivePartitionRename.java} | 52 +++--
.../sql/parser/hive/ddl/SqlAlterHiveTable.java | 64 +++++++
.../ddl/SqlAlterHiveTableAddReplaceColumn.java | 81 ++++++++
.../hive/ddl/SqlAlterHiveTableChangeColumn.java | 78 ++++++++
.../hive/ddl/SqlAlterHiveTableFileFormat.java | 55 ++++++
.../parser/hive/ddl/SqlAlterHiveTableLocation.java | 56 ++++++
.../parser/hive/ddl/SqlAlterHiveTableProps.java} | 48 ++---
.../parser/hive/ddl/SqlAlterHiveTableSerDe.java | 93 +++++++++
.../parser/hive/FlinkHiveSqlParserImplTest.java | 74 ++++++++
.../apache/flink/sql/parser/SqlPartitionUtils.java | 61 ++++++
...leProperties.java => SqlAddReplaceColumns.java} | 67 ++++---
.../apache/flink/sql/parser/ddl/SqlAlterTable.java | 37 +++-
.../sql/parser/ddl/SqlAlterTableProperties.java | 8 +-
.../flink/sql/parser/ddl/SqlChangeColumn.java | 109 +++++++++++
.../table/api/internal/TableEnvironmentImpl.java | 13 ++
.../apache/flink/table/catalog/CatalogManager.java | 20 ++
.../flink/table/operations/OperationUtils.java | 12 ++
.../operations/ddl/AlterPartitionOperation.java | 39 ++++
.../ddl/AlterPartitionPropertiesOperation.java | 49 +++++
.../operations/ddl/AlterTableSchemaOperation.java | 46 +++++
.../operations/SqlToOperationConverter.java | 138 ++++++++------
.../planner/utils/OperationConverterUtils.java | 193 +++++++++++++++++++
30 files changed, 1870 insertions(+), 273 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 815fe41..5bc469f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -20,10 +20,11 @@ package org.apache.flink.table.catalog.hive;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connectors.hive.HiveTableFactory;
import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.AbstractCatalog;
@@ -37,7 +38,6 @@ import org.apache.flink.table.catalog.CatalogPartitionImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectPath;
@@ -113,6 +113,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_COL_CASCADE;
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_TABLE_OP;
+import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_COLS;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_CONSTRAINT_TRAITS;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.PK_CONSTRAINT_TRAIT;
@@ -374,7 +377,7 @@ public class HiveCatalog extends AbstractCatalog {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
- Table hiveTable = instantiateHiveTable(tablePath, table, hiveConf);
+ Table hiveTable = HiveTableUtil.instantiateHiveTable(tablePath, table, hiveConf);
UniqueConstraint pkConstraint = null;
List<String> notNullCols = new ArrayList<>();
@@ -484,18 +487,30 @@ public class HiveCatalog extends AbstractCatalog {
existingTable.getClass().getName(), newCatalogTable.getClass().getName()));
}
- Table newTable = instantiateHiveTable(tablePath, newCatalogTable, hiveConf);
-
- // client.alter_table() requires a valid location
- // thus, if new table doesn't have that, it reuses location of the old table
- if (!newTable.getSd().isSetLocation()) {
- newTable.getSd().setLocation(hiveTable.getSd().getLocation());
+ boolean isGeneric = isGenericForGet(hiveTable.getParameters());
+ if (isGeneric) {
+ hiveTable = HiveTableUtil.alterTableViaCatalogBaseTable(tablePath, newCatalogTable, hiveTable, hiveConf);
+ } else {
+ AlterTableOp op = HiveTableUtil.extractAlterTableOp(newCatalogTable.getOptions());
+ if (op == null) {
+ // the alter operation isn't encoded as properties
+ hiveTable = HiveTableUtil.alterTableViaCatalogBaseTable(tablePath, newCatalogTable, hiveTable, hiveConf);
+ } else {
+ alterTableViaProperties(
+ op,
+ hiveTable,
+ (CatalogTable) newCatalogTable,
+ hiveTable.getParameters(),
+ newCatalogTable.getProperties(),
+ hiveTable.getSd());
+ }
}
+ disallowChangeIsGeneric(isGeneric, isGenericForGet(hiveTable.getParameters()));
try {
- client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable);
+ client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
} catch (TException e) {
- throw new CatalogException(String.format("Failed to rename table %s", tablePath.getFullName()), e);
+ throw new CatalogException(String.format("Failed to alter table %s", tablePath.getFullName()), e);
}
}
@@ -638,78 +653,6 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- @VisibleForTesting
- protected static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, HiveConf hiveConf) {
- if (!(table instanceof CatalogTableImpl) && !(table instanceof CatalogViewImpl)) {
- throw new CatalogException(
- "HiveCatalog only supports CatalogTableImpl and CatalogViewImpl");
- }
- // let Hive set default parameters for us, e.g. serialization.format
- Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
- tablePath.getObjectName());
- hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
-
- Map<String, String> properties = new HashMap<>(table.getProperties());
- // Table comment
- if (table.getComment() != null) {
- properties.put(HiveCatalogConfig.COMMENT, table.getComment());
- }
-
- boolean isGeneric = isGenericForCreate(properties);
-
- // Hive table's StorageDescriptor
- StorageDescriptor sd = hiveTable.getSd();
- HiveTableUtil.setDefaultStorageFormat(sd, hiveConf);
-
- if (isGeneric) {
- DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
- tableSchemaProps.putTableSchema(Schema.SCHEMA, table.getSchema());
-
- if (table instanceof CatalogTable) {
- tableSchemaProps.putPartitionKeys(((CatalogTable) table).getPartitionKeys());
- }
-
- properties.putAll(tableSchemaProps.asMap());
- properties = maskFlinkProperties(properties);
- hiveTable.setParameters(properties);
- } else {
- HiveTableUtil.initiateTableFromProperties(hiveTable, properties, hiveConf);
- List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
- // Table columns and partition keys
- if (table instanceof CatalogTableImpl) {
- CatalogTable catalogTable = (CatalogTableImpl) table;
-
- if (catalogTable.isPartitioned()) {
- int partitionKeySize = catalogTable.getPartitionKeys().size();
- List<FieldSchema> regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize);
- List<FieldSchema> partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
-
- sd.setCols(regularColumns);
- hiveTable.setPartitionKeys(partitionColumns);
- } else {
- sd.setCols(allColumns);
- hiveTable.setPartitionKeys(new ArrayList<>());
- }
- } else {
- sd.setCols(allColumns);
- }
- // Table properties
- hiveTable.getParameters().putAll(properties);
- }
-
- if (table instanceof CatalogViewImpl) {
- // TODO: [FLINK-12398] Support partitioned view in catalog API
- hiveTable.setPartitionKeys(new ArrayList<>());
-
- CatalogView view = (CatalogView) table;
- hiveTable.setViewOriginalText(view.getOriginalQuery());
- hiveTable.setViewExpandedText(view.getExpandedQuery());
- hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
- }
-
- return hiveTable;
- }
-
/**
* Filter out Hive-created properties, and return Flink-created properties.
* Note that 'is_generic' is a special key and this method will leave it as-is.
@@ -720,19 +663,6 @@ public class HiveCatalog extends AbstractCatalog {
.collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
}
- /**
- * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
- * Note that 'is_generic' is a special key and this method will leave it as-is.
- */
- private static Map<String, String> maskFlinkProperties(Map<String, String> properties) {
- return properties.entrySet().stream()
- .filter(e -> e.getKey() != null && e.getValue() != null)
- .map(e -> new Tuple2<>(
- e.getKey().equals(CatalogConfig.IS_GENERIC) ? e.getKey() : FLINK_PROPERTY_PREFIX + e.getKey(),
- e.getValue()))
- .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
- }
-
// ------ partitions ------
@Override
@@ -888,7 +818,7 @@ public class HiveCatalog extends AbstractCatalog {
Map<String, String> properties = hivePartition.getParameters();
- properties.put(HiveCatalogConfig.PARTITION_LOCATION, hivePartition.getSd().getLocation());
+ properties.put(SqlCreateHiveTable.TABLE_LOCATION_URI, hivePartition.getSd().getLocation());
String comment = properties.remove(HiveCatalogConfig.COMMENT);
@@ -919,21 +849,28 @@ public class HiveCatalog extends AbstractCatalog {
try {
Table hiveTable = getHiveTable(tablePath);
ensureTableAndPartitionMatch(hiveTable, newPartition);
- Partition oldHivePartition = getHivePartition(hiveTable, partitionSpec);
- if (oldHivePartition == null) {
+ Partition hivePartition = getHivePartition(hiveTable, partitionSpec);
+ if (hivePartition == null) {
if (ignoreIfNotExists) {
return;
}
throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
}
- Partition newHivePartition = instantiateHivePartition(hiveTable, partitionSpec, newPartition);
- if (newHivePartition.getSd().getLocation() == null) {
- newHivePartition.getSd().setLocation(oldHivePartition.getSd().getLocation());
+ AlterTableOp op = HiveTableUtil.extractAlterTableOp(newPartition.getProperties());
+ if (op == null) {
+ throw new CatalogException(ALTER_TABLE_OP + " is missing for alter table operation");
}
+ alterTableViaProperties(
+ op,
+ null,
+ null,
+ hivePartition.getParameters(),
+ newPartition.getProperties(),
+ hivePartition.getSd());
client.alter_partition(
tablePath.getDatabaseName(),
tablePath.getObjectName(),
- newHivePartition
+ hivePartition
);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
@@ -973,10 +910,13 @@ public class HiveCatalog extends AbstractCatalog {
}
// TODO: handle GenericCatalogPartition
StorageDescriptor sd = hiveTable.getSd().deepCopy();
- sd.setLocation(catalogPartition.getProperties().remove(HiveCatalogConfig.PARTITION_LOCATION));
+ sd.setLocation(catalogPartition.getProperties().remove(SqlCreateHiveTable.TABLE_LOCATION_URI));
Map<String, String> properties = new HashMap<>(catalogPartition.getProperties());
- properties.put(HiveCatalogConfig.COMMENT, catalogPartition.getComment());
+ String comment = catalogPartition.getComment();
+ if (comment != null) {
+ properties.put(HiveCatalogConfig.COMMENT, comment);
+ }
return HiveTableUtil.createHivePartition(
hiveTable.getDbName(),
@@ -1051,7 +991,8 @@ public class HiveCatalog extends AbstractCatalog {
return getHivePartition(getHiveTable(tablePath), partitionSpec);
}
- private Partition getHivePartition(Table hiveTable, CatalogPartitionSpec partitionSpec)
+ @VisibleForTesting
+ public Partition getHivePartition(Table hiveTable, CatalogPartitionSpec partitionSpec)
throws PartitionSpecInvalidException, TException {
return client.getPartition(hiveTable.getDbName(), hiveTable.getTableName(),
getOrderedFullPartitionValues(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()),
@@ -1234,7 +1175,7 @@ public class HiveCatalog extends AbstractCatalog {
);
}
- private boolean isTablePartitioned(Table hiveTable) {
+ private static boolean isTablePartitioned(Table hiveTable) {
return hiveTable.getPartitionKeysSize() != 0;
}
@@ -1425,7 +1366,7 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- static boolean isGenericForCreate(Map<String, String> properties) {
+ public static boolean isGenericForCreate(Map<String, String> properties) {
// When creating an object, a hive object needs explicitly have a key is_generic = false
// otherwise, this is a generic object if 1) the key is missing 2) is_generic = true
// this is opposite to reading an object. See getObjectIsGeneric().
@@ -1443,11 +1384,62 @@ public class HiveCatalog extends AbstractCatalog {
return isGeneric;
}
- static boolean isGenericForGet(Map<String, String> properties) {
+ public static boolean isGenericForGet(Map<String, String> properties) {
// When retrieving an object, a generic object needs explicitly have a key is_generic = true
// otherwise, this is a Hive object if 1) the key is missing 2) is_generic = false
// this is opposite to creating an object. See createObjectIsGeneric()
return properties != null && Boolean.parseBoolean(properties.getOrDefault(CatalogConfig.IS_GENERIC, "false"));
}
+ public static void disallowChangeIsGeneric(boolean oldIsGeneric, boolean newIsGeneric) {
+ checkArgument(oldIsGeneric == newIsGeneric, "Changing whether a metadata object is generic is not allowed");
+ }
+
+ private void alterTableViaProperties(
+ AlterTableOp alterOp,
+ Table hiveTable,
+ CatalogTable catalogTable,
+ Map<String, String> oldProps,
+ Map<String, String> newProps,
+ StorageDescriptor sd) {
+ switch (alterOp) {
+ case CHANGE_TBL_PROPS:
+ oldProps.putAll(newProps);
+ break;
+ case CHANGE_LOCATION:
+ HiveTableUtil.extractLocation(sd, newProps);
+ break;
+ case CHANGE_FILE_FORMAT:
+ String newFileFormat = newProps.remove(STORED_AS_FILE_FORMAT);
+ HiveTableUtil.setStorageFormat(sd, newFileFormat, hiveConf);
+ break;
+ case CHANGE_SERDE_PROPS:
+ HiveTableUtil.extractRowFormat(sd, newProps);
+ break;
+ case ALTER_COLUMNS:
+ if (hiveTable == null) {
+ throw new CatalogException("ALTER COLUMNS cannot be done with ALTER PARTITION");
+ }
+
+ HiveTableUtil.alterColumns(hiveTable.getSd(), catalogTable);
+ boolean cascade = Boolean.parseBoolean(newProps.remove(ALTER_COL_CASCADE));
+ if (cascade) {
+ if (!isTablePartitioned(hiveTable)) {
+ throw new CatalogException("ALTER COLUMNS CASCADE for non-partitioned table");
+ }
+ try {
+ for (CatalogPartitionSpec spec : listPartitions(new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()))) {
+ Partition partition = getHivePartition(hiveTable, spec);
+ HiveTableUtil.alterColumns(partition.getSd(), catalogTable);
+ client.alter_partition(hiveTable.getDbName(), hiveTable.getTableName(), partition);
+ }
+ } catch (Exception e) {
+ throw new CatalogException("Failed to cascade add/replace columns to partitions", e);
+ }
+ }
+ break;
+ default:
+ throw new CatalogException("Unsupported alter table operation " + alterOp);
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index 656a93a..a390fbd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -18,10 +18,24 @@
package org.apache.flink.table.catalog.hive.util;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.CatalogViewImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveCatalogConfig;
import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionVisitor;
@@ -54,6 +68,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_TABLE_OP;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_INFO_PROP_PREFIX;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_LIB_CLASS_NAME;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT;
@@ -61,6 +76,7 @@ import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableS
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_OUTPUT_FORMAT;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI;
+import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
@@ -273,6 +289,115 @@ public class HiveTableUtil {
setStorageFormat(sd, hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT), hiveConf);
}
+ public static void alterColumns(StorageDescriptor sd, CatalogTable catalogTable) {
+ List<FieldSchema> allCols = HiveTableUtil.createHiveColumns(catalogTable.getSchema());
+ List<FieldSchema> nonPartCols = allCols.subList(0, allCols.size() - catalogTable.getPartitionKeys().size());
+ sd.setCols(nonPartCols);
+ }
+
+ public static SqlAlterHiveTable.AlterTableOp extractAlterTableOp(Map<String, String> props) {
+ String opStr = props.remove(ALTER_TABLE_OP);
+ if (opStr != null) {
+ return SqlAlterHiveTable.AlterTableOp.valueOf(opStr);
+ }
+ return null;
+ }
+
+ public static Table alterTableViaCatalogBaseTable(
+ ObjectPath tablePath, CatalogBaseTable baseTable, Table oldHiveTable, HiveConf hiveConf) {
+ Table newHiveTable = instantiateHiveTable(tablePath, baseTable, hiveConf);
+ // client.alter_table() requires a valid location
+ // thus, if new table doesn't have that, it reuses location of the old table
+ if (!newHiveTable.getSd().isSetLocation()) {
+ newHiveTable.getSd().setLocation(oldHiveTable.getSd().getLocation());
+ }
+ return newHiveTable;
+ }
+
+ public static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, HiveConf hiveConf) {
+ if (!(table instanceof CatalogTableImpl) && !(table instanceof CatalogViewImpl)) {
+ throw new CatalogException(
+ "HiveCatalog only supports CatalogTableImpl and CatalogViewImpl");
+ }
+ // let Hive set default parameters for us, e.g. serialization.format
+ Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
+ tablePath.getObjectName());
+ hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
+
+ Map<String, String> properties = new HashMap<>(table.getProperties());
+ // Table comment
+ if (table.getComment() != null) {
+ properties.put(HiveCatalogConfig.COMMENT, table.getComment());
+ }
+
+ boolean isGeneric = HiveCatalog.isGenericForCreate(properties);
+
+ // Hive table's StorageDescriptor
+ StorageDescriptor sd = hiveTable.getSd();
+ HiveTableUtil.setDefaultStorageFormat(sd, hiveConf);
+
+ if (isGeneric) {
+ DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
+ tableSchemaProps.putTableSchema(Schema.SCHEMA, table.getSchema());
+
+ if (table instanceof CatalogTable) {
+ tableSchemaProps.putPartitionKeys(((CatalogTable) table).getPartitionKeys());
+ }
+
+ properties.putAll(tableSchemaProps.asMap());
+ properties = maskFlinkProperties(properties);
+ hiveTable.setParameters(properties);
+ } else {
+ HiveTableUtil.initiateTableFromProperties(hiveTable, properties, hiveConf);
+ List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
+ // Table columns and partition keys
+ if (table instanceof CatalogTableImpl) {
+ CatalogTable catalogTable = (CatalogTableImpl) table;
+
+ if (catalogTable.isPartitioned()) {
+ int partitionKeySize = catalogTable.getPartitionKeys().size();
+ List<FieldSchema> regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize);
+ List<FieldSchema> partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+ sd.setCols(regularColumns);
+ hiveTable.setPartitionKeys(partitionColumns);
+ } else {
+ sd.setCols(allColumns);
+ hiveTable.setPartitionKeys(new ArrayList<>());
+ }
+ } else {
+ sd.setCols(allColumns);
+ }
+ // Table properties
+ hiveTable.getParameters().putAll(properties);
+ }
+
+ if (table instanceof CatalogViewImpl) {
+ // TODO: [FLINK-12398] Support partitioned view in catalog API
+ hiveTable.setPartitionKeys(new ArrayList<>());
+
+ CatalogView view = (CatalogView) table;
+ hiveTable.setViewOriginalText(view.getOriginalQuery());
+ hiveTable.setViewExpandedText(view.getExpandedQuery());
+ hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
+ }
+
+ return hiveTable;
+ }
+
+ /**
+ * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
+ * Note that 'is_generic' is a special key and this method will leave it as-is.
+ */
+ public static Map<String, String> maskFlinkProperties(Map<String, String> properties) {
+ return properties.entrySet().stream()
+ .filter(e -> e.getKey() != null && e.getValue() != null)
+ .map(e -> new Tuple2<>(
+ e.getKey().equals(CatalogConfig.IS_GENERIC) ? e.getKey() : FLINK_PROPERTY_PREFIX + e.getKey(),
+ e.getValue()))
+ .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+ }
+
private static class ExpressionExtractor implements ExpressionVisitor<String> {
// maps a supported function to its name
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
index e90691d..ede4c54 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
@@ -37,13 +39,18 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
import org.junit.After;
import org.junit.Assume;
@@ -53,6 +60,8 @@ import org.junit.Test;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
@@ -73,6 +82,7 @@ public class HiveDialectTest {
@Before
public void setup() {
hiveCatalog = HiveTestUtils.createHiveCatalog();
+ hiveCatalog.getHiveConf().setBoolVar(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, false);
hiveCatalog.open();
warehouse = hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
@@ -101,7 +111,7 @@ public class HiveDialectTest {
String db2Location = warehouse + "/db2_location";
tableEnv.executeSql(String.format("create database db2 location '%s' with dbproperties('k1'='v1')", db2Location));
db = hiveCatalog.getHiveDatabase("db2");
- assertEquals(db2Location, new URI(db.getLocationUri()).getPath());
+ assertEquals(db2Location, locationPath(db.getLocationUri()));
assertEquals("v1", db.getParameters().get("k1"));
}
@@ -217,12 +227,157 @@ public class HiveDialectTest {
assertEquals("[1,0,static, 1,1,a, 1,2,b, 1,3,c, 2,0,static, 2,1,b, 3,0,static, 3,1,c]", results.toString());
}
- private static List<Row> queryResult(org.apache.flink.table.api.Table table) {
- return Lists.newArrayList(table.execute().collect());
+ @Test
+ public void testAlterTable() throws Exception {
+ tableEnv.executeSql("create table tbl (x int) tblproperties('k1'='v1')");
+ tableEnv.executeSql("alter table tbl rename to tbl1");
+
+ ObjectPath tablePath = new ObjectPath("default", "tbl1");
+
+ // change properties
+ tableEnv.executeSql("alter table tbl1 set tblproperties ('k2'='v2')");
+ Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+ assertEquals("v1", hiveTable.getParameters().get("k1"));
+ assertEquals("v2", hiveTable.getParameters().get("k2"));
+
+ // change location
+ String newLocation = warehouse + "/tbl1_new_location";
+ tableEnv.executeSql(String.format("alter table `default`.tbl1 set location '%s'", newLocation));
+ hiveTable = hiveCatalog.getHiveTable(tablePath);
+ assertEquals(newLocation, locationPath(hiveTable.getSd().getLocation()));
+
+ // change file format
+ tableEnv.executeSql("alter table tbl1 set fileformat orc");
+ hiveTable = hiveCatalog.getHiveTable(tablePath);
+ assertEquals(OrcSerde.class.getName(), hiveTable.getSd().getSerdeInfo().getSerializationLib());
+ assertEquals(OrcInputFormat.class.getName(), hiveTable.getSd().getInputFormat());
+ assertEquals(OrcOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat());
+
+ // change serde
+ tableEnv.executeSql(String.format("alter table tbl1 set serde '%s' with serdeproperties('%s'='%s')",
+ LazyBinarySerDe.class.getName(), serdeConstants.FIELD_DELIM, "\u0001"));
+ hiveTable = hiveCatalog.getHiveTable(tablePath);
+ assertEquals(LazyBinarySerDe.class.getName(), hiveTable.getSd().getSerdeInfo().getSerializationLib());
+ assertEquals("\u0001", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.FIELD_DELIM));
+
+ // replace columns
+ tableEnv.executeSql("alter table tbl1 replace columns (t tinyint,s smallint,i int,b bigint,f float,d double,num decimal," +
+ "ts timestamp,dt date,str string,var varchar(10),ch char(123),bool boolean,bin binary)");
+ hiveTable = hiveCatalog.getHiveTable(tablePath);
+ assertEquals(14, hiveTable.getSd().getColsSize());
+ assertEquals("varchar(10)", hiveTable.getSd().getCols().get(10).getType());
+ assertEquals("char(123)", hiveTable.getSd().getCols().get(11).getType());
+
+ tableEnv.executeSql("alter table tbl1 replace columns (a array<array<int>>,s struct<f1:struct<f11:int,f12:binary>, f2:map<double,date>>," +
+ "m map<char(5),map<timestamp,decimal(20,10)>>)");
+ hiveTable = hiveCatalog.getHiveTable(tablePath);
+ assertEquals("array<array<int>>", hiveTable.getSd().getCols().get(0).getType());
+ assertEquals("struct<f1:struct<f11:int,f12:binary>,f2:map<double,date>>", hiveTable.getSd().getCols().get(1).getType());
+ assertEquals("map<char(5),map<timestamp,decimal(20,10)>>", hiveTable.getSd().getCols().get(2).getType());
+
+ // add columns
+ tableEnv.executeSql("alter table tbl1 add columns (x int,y int)");
+ hiveTable = hiveCatalog.getHiveTable(tablePath);
+ assertEquals(5, hiveTable.getSd().getColsSize());
+
+ // change column
+ tableEnv.executeSql("alter table tbl1 change column x x1 string comment 'new x col'");
+ hiveTable = hiveCatalog.getHiveTable(tablePath);
+ assertEquals(5, hiveTable.getSd().getColsSize());
+ FieldSchema newField = hiveTable.getSd().getCols().get(3);
+ assertEquals("x1", newField.getName());
+ assertEquals("string", newField.getType());
+
+ tableEnv.executeSql("alter table tbl1 change column y y int first");
+ hiveTable = hiveCatalog.getHiveTable(tablePath);
+ newField = hiveTable.getSd().getCols().get(0);
+ assertEquals("y", newField.getName());
+ assertEquals("int", newField.getType());
+
+ tableEnv.executeSql("alter table tbl1 change column x1 x2 timestamp after y");
+ hiveTable = hiveCatalog.getHiveTable(tablePath);
+ newField = hiveTable.getSd().getCols().get(1);
+ assertEquals("x2", newField.getName());
+ assertEquals("timestamp", newField.getType());
+
+ // add/replace columns cascade
+ tableEnv.executeSql("create table tbl2 (x int) partitioned by (dt date,id bigint)");
+ ObjectPath tablePath2 = new ObjectPath("default", "tbl2");
+ // TODO: use DDL to add partitions once we support it
+ CatalogPartitionSpec partitionSpec1 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
+ put("dt", "2020-01-23");
+ put("id", "1");
+ }});
+ CatalogPartitionSpec partitionSpec2 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
+ put("dt", "2020-04-24");
+ put("id", "2");
+ }});
+ hiveCatalog.createPartition(tablePath2, partitionSpec1, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
+ hiveCatalog.createPartition(tablePath2, partitionSpec2, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
+ tableEnv.executeSql("alter table tbl2 replace columns (ti tinyint,d decimal) cascade");
+ hiveTable = hiveCatalog.getHiveTable(tablePath2);
+ Partition hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
+ assertEquals(2, hivePartition.getSd().getColsSize());
+ hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec2);
+ assertEquals(2, hivePartition.getSd().getColsSize());
+
+ tableEnv.executeSql("alter table tbl2 add columns (ch char(5),vch varchar(9)) cascade");
+ hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
+ assertEquals(4, hivePartition.getSd().getColsSize());
+ hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec2);
+ assertEquals(4, hivePartition.getSd().getColsSize());
+
+ // change column cascade
+ tableEnv.executeSql("alter table tbl2 change column ch ch char(10) cascade");
+ hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
+ assertEquals("char(10)", hivePartition.getSd().getCols().get(2).getType());
+ hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec2);
+ assertEquals("char(10)", hivePartition.getSd().getCols().get(2).getType());
+
+ tableEnv.executeSql("alter table tbl2 change column vch str string first cascade");
+ hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
+ assertEquals("str", hivePartition.getSd().getCols().get(0).getName());
+ hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec2);
+ assertEquals("str", hivePartition.getSd().getCols().get(0).getName());
}
- private static void waitForJobFinish(TableResult tableResult) throws Exception {
- tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
+ @Test
+ public void testAlterPartition() throws Exception {
+ tableEnv.executeSql("create table tbl (x tinyint,y string) partitioned by (p1 bigint,p2 date)");
+ // TODO: use DDL to add partitions once we support it
+ CatalogPartitionSpec spec1 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
+ put("p1", "1000");
+ put("p2", "2020-05-01");
+ }});
+ CatalogPartitionSpec spec2 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
+ put("p1", "2000");
+ put("p2", "2020-01-01");
+ }});
+ ObjectPath tablePath = new ObjectPath("default", "tbl");
+ hiveCatalog.createPartition(tablePath, spec1, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
+ hiveCatalog.createPartition(tablePath, spec2, new CatalogPartitionImpl(Collections.emptyMap(), null), false);
+
+ Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+
+ // change location
+ String location = warehouse + "/new_part_location";
+ tableEnv.executeSql(String.format("alter table tbl partition (p1=1000,p2='2020-05-01') set location '%s'", location));
+ Partition partition = hiveCatalog.getHivePartition(hiveTable, spec1);
+ assertEquals(location, locationPath(partition.getSd().getLocation()));
+
+ // change file format
+ tableEnv.executeSql("alter table tbl partition (p1=2000,p2='2020-01-01') set fileformat rcfile");
+ partition = hiveCatalog.getHivePartition(hiveTable, spec2);
+ assertEquals(LazyBinaryColumnarSerDe.class.getName(), partition.getSd().getSerdeInfo().getSerializationLib());
+ assertEquals(RCFileInputFormat.class.getName(), partition.getSd().getInputFormat());
+ assertEquals(RCFileOutputFormat.class.getName(), partition.getSd().getOutputFormat());
+
+ // change serde
+ tableEnv.executeSql(String.format("alter table tbl partition (p1=1000,p2='2020-05-01') set serde '%s' with serdeproperties('%s'='%s')",
+ LazyBinarySerDe.class.getName(), serdeConstants.LINE_DELIM, "\n"));
+ partition = hiveCatalog.getHivePartition(hiveTable, spec1);
+ assertEquals(LazyBinarySerDe.class.getName(), partition.getSd().getSerdeInfo().getSerializationLib());
+ assertEquals("\n", partition.getSd().getSerdeInfo().getParameters().get(serdeConstants.LINE_DELIM));
}
@Test
@@ -254,4 +409,12 @@ public class HiveDialectTest {
private static String locationPath(String locationURI) throws URISyntaxException {
return new URI(locationURI).getPath();
}
+
+ private static List<Row> queryResult(org.apache.flink.table.api.Table table) {
+ return Lists.newArrayList(table.execute().collect());
+ }
+
+ private static void waitForJobFinish(TableResult tableResult) throws Exception {
+ tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 015995f..88ef5f6 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -19,14 +19,17 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.AlterHiveDatabaseOp;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.CatalogTestUtil;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
@@ -55,6 +58,7 @@ import java.util.Map;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -189,6 +193,32 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogMetadataTestBase {
hiveCatalog.dropDatabase(db1, false, true);
}
+ @Override
+ @Test
+ public void testAlterPartition() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
+
+ assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+ CatalogPartition cp = catalog.getPartition(path1, createPartitionSpec());
+ CatalogTestUtil.checkEquals(createPartition(), cp);
+ assertNull(cp.getProperties().get("k"));
+
+ CatalogPartition another = createPartition();
+ another.getProperties().put("k", "v");
+ another.getProperties().put(SqlAlterHiveTable.ALTER_TABLE_OP, SqlAlterHiveTable.AlterTableOp.CHANGE_TBL_PROPS.name());
+
+ catalog.alterPartition(path1, createPartitionSpec(), another, false);
+
+ assertEquals(Collections.singletonList(createPartitionSpec()), catalog.listPartitions(path1));
+
+ cp = catalog.getPartition(path1, createPartitionSpec());
+
+ CatalogTestUtil.checkEquals(another, cp);
+ assertEquals("v", cp.getProperties().get("k"));
+ }
+
private void checkStatistics(int inputStat, int expectStat) throws Exception {
catalog.dropTable(path1, true);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
index 44a5e5e..e67655e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -46,7 +47,7 @@ public class HiveCatalogTest {
@Test
public void testCreateGenericTable() {
- Table hiveTable = HiveCatalog.instantiateHiveTable(
+ Table hiveTable = HiveTableUtil.instantiateHiveTable(
new ObjectPath("test", "test"),
new CatalogTableImpl(
schema,
@@ -66,7 +67,7 @@ public class HiveCatalogTest {
map.put(CatalogConfig.IS_GENERIC, String.valueOf(false));
- Table hiveTable = HiveCatalog.instantiateHiveTable(
+ Table hiveTable = HiveTableUtil.instantiateHiveTable(
new ObjectPath("test", "test"),
new CatalogTableImpl(
schema,
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
index 52a9b9c..d19ff12 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
@@ -27,6 +27,13 @@
"org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseLocation"
"org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner"
"org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseProps"
+ "org.apache.flink.sql.parser.hive.ddl.SqlAlterHivePartitionRename"
+ "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableAddReplaceColumn"
+ "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableChangeColumn"
+ "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableFileFormat"
+ "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableLocation"
+ "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableProps"
+ "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableSerDe"
"org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase"
"org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable"
"org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableCreationContext"
@@ -502,6 +509,7 @@
"SqlShowTables()"
"SqlRichDescribeTable()"
"SqlShowFunctions()"
+ "SqlAlterTable()"
]
# List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index d6efd00..d1f83b9 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -771,9 +771,9 @@ SqlTypeNameSpec ExtendedSqlRowTypeName() :
}
/**
-* Parses a partition specifications statement for insert statement.
+* Parses a partition specifications statement that can contain both static and dynamic partitions.
*/
-void InsertPartitionSpec(SqlNodeList allPartKeys, SqlNodeList staticSpec) :
+void PartitionSpecCommaList(SqlNodeList allPartKeys, SqlNodeList staticSpec) :
{
SqlIdentifier key;
SqlNode value;
@@ -861,7 +861,7 @@ SqlNode RichSqlInsert() :
}
]
[
- <PARTITION> InsertPartitionSpec(allPartKeys, staticSpec)
+ <PARTITION> PartitionSpecCommaList(allPartKeys, staticSpec)
]
source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) {
return new RichSqlHiveInsert(s.end(source), keywordList, extendedKeywordList, table, source,
@@ -1090,3 +1090,175 @@ SqlCreate SqlCreateCatalog(Span s, boolean replace) :
propertyList);
}
}
+
+// make sure a feature only applies to table, not partitions
+void EnsureAlterTableOnly(SqlNodeList partitionSpec, String feature) :
+{
+}
+{
+ {
+ if (partitionSpec != null) {
+ throw new ParseException(feature + " is not applicable to partitions.");
+ }
+ }
+}
+
+SqlAlterTable SqlAlterTable() :
+{
+ SqlParserPos startPos;
+ SqlIdentifier tableIdentifier;
+ SqlIdentifier newTableIdentifier = null;
+ SqlNodeList propertyList = SqlNodeList.EMPTY;
+ SqlNodeList partitionSpec = null;
+ boolean ifNotExists = false;
+ boolean ifExists = false;
+}
+{
+ <ALTER> <TABLE> { startPos = getPos(); }
+ tableIdentifier = CompoundIdentifier()
+ [ <PARTITION> { partitionSpec = new SqlNodeList(getPos()); PartitionSpecCommaList(new SqlNodeList(getPos()), partitionSpec); } ]
+ (
+ <RENAME> <TO>
+ (
+ <PARTITION>
+ {
+ SqlNodeList newPartSpec = new SqlNodeList(getPos());
+ PartitionSpecCommaList(new SqlNodeList(getPos()), newPartSpec);
+ return new SqlAlterHivePartitionRename(startPos.plus(getPos()), tableIdentifier, partitionSpec, newPartSpec);
+ }
+ |
+ newTableIdentifier = CompoundIdentifier()
+ {
+ return new SqlAlterTableRename(
+ startPos.plus(getPos()),
+ tableIdentifier,
+ newTableIdentifier);
+ }
+ )
+ |
+ <ADD> <COLUMNS>
+ {
+ EnsureAlterTableOnly(partitionSpec, "Add columns");
+ return SqlAlterHiveTableAddReplaceColumn(startPos, tableIdentifier, false);
+ }
+ |
+ <REPLACE> <COLUMNS>
+ {
+ EnsureAlterTableOnly(partitionSpec, "Replace columns");
+ return SqlAlterHiveTableAddReplaceColumn(startPos, tableIdentifier, true);
+ }
+ |
+ <CHANGE> [ <COLUMN> ]
+ {
+ EnsureAlterTableOnly(partitionSpec, "Change column");
+ return SqlAlterHiveTableChangeColumn(startPos, tableIdentifier);
+ }
+ |
+ <SET>
+ (
+ <TBLPROPERTIES>
+ { EnsureAlterTableOnly(partitionSpec, "Alter table properties"); }
+ propertyList = TableProperties()
+ {
+ return new SqlAlterHiveTableProps(
+ startPos.plus(getPos()),
+ tableIdentifier,
+ propertyList);
+ }
+ |
+ <LOCATION> <QUOTED_STRING>
+ {
+ SqlCharStringLiteral location = createStringLiteral(token.image, getPos());
+ return new SqlAlterHiveTableLocation(startPos.plus(getPos()), tableIdentifier, partitionSpec, location);
+ }
+ |
+ <FILEFORMAT>
+ {
+ SqlIdentifier format = SimpleIdentifier();
+ return new SqlAlterHiveTableFileFormat(startPos.plus(getPos()), tableIdentifier, partitionSpec, format);
+ }
+ |
+ { return SqlAlterHiveTableSerDe(startPos, tableIdentifier, partitionSpec); }
+ )
+ )
+}
+
+SqlAlterTable SqlAlterHiveTableAddReplaceColumn(SqlParserPos startPos, SqlIdentifier tableIdentifier, boolean replace) :
+{
+ SqlNodeList colList;
+ boolean cascade = false;
+}
+{
+ <LPAREN>
+ {
+ List<SqlNode> cols = new ArrayList();
+ }
+ TableColumn2(cols)
+ (
+ <COMMA> TableColumn2(cols)
+ )*
+ <RPAREN>
+ [
+ <CASCADE> { cascade = true; }
+ |
+ <RESTRICT>
+ ]
+ {
+ colList = new SqlNodeList(cols, startPos.plus(getPos()));
+ return new SqlAlterHiveTableAddReplaceColumn(startPos.plus(getPos()),
+ tableIdentifier,
+ cascade,
+ colList,
+ replace);
+ }
+}
+
+SqlAlterTable SqlAlterHiveTableChangeColumn(SqlParserPos startPos, SqlIdentifier tableIdentifier) :
+{
+ boolean cascade = false;
+ SqlIdentifier oldName;
+ SqlIdentifier newName;
+ SqlDataTypeSpec newType;
+ SqlCharStringLiteral comment = null;
+ boolean first = false;
+ SqlIdentifier after = null;
+ SqlNodeList partSpec = null;
+}
+{
+ oldName = SimpleIdentifier()
+ newName = SimpleIdentifier()
+ newType = ExtendedDataType()
+ [ <COMMENT> <QUOTED_STRING> {
+ comment = createStringLiteral(token.image, getPos());
+ }]
+ [ <FIRST> { first = true; } ]
+ [ <AFTER> { after = SimpleIdentifier(); } ]
+ [
+ <CASCADE> { cascade = true; }
+ |
+ <RESTRICT>
+ ]
+ { return new SqlAlterHiveTableChangeColumn(startPos.plus(getPos()),
+ tableIdentifier,
+ cascade,
+ oldName,
+ new SqlTableColumn(newName, newType, null, comment, newName.getParserPosition()),
+ first,
+ after); }
+}
+
+SqlAlterTable SqlAlterHiveTableSerDe(SqlParserPos startPos, SqlIdentifier tableIdentifier, SqlNodeList partitionSpec) :
+{
+ SqlCharStringLiteral serdeLib = null;
+ SqlNodeList propertyList = null;
+}
+{
+ (
+ <SERDE> <QUOTED_STRING>
+ { serdeLib = createStringLiteral(token.image, getPos()); propertyList = new SqlNodeList(getPos()); }
+ [ <WITH> <SERDEPROPERTIES> { propertyList = TableProperties(); } ]
+ |
+ <SERDEPROPERTIES> { propertyList = TableProperties(); }
+ )
+ { return new SqlAlterHiveTableSerDe(startPos.plus(getPos()), tableIdentifier, partitionSpec, propertyList, serdeLib); }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
index bfbd33a..934e76a 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
@@ -42,6 +42,7 @@ import java.util.List;
import java.util.Set;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_TABLE_OP;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase.DATABASE_LOCATION_URI;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_INFO_PROP_PREFIX;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_LIB_CLASS_NAME;
@@ -72,7 +73,7 @@ public class HiveDDLUtils {
static {
RESERVED_DB_PROPERTIES.addAll(Arrays.asList(ALTER_DATABASE_OP, DATABASE_LOCATION_URI));
- RESERVED_TABLE_PROPERTIES.addAll(Arrays.asList(TABLE_LOCATION_URI,
+ RESERVED_TABLE_PROPERTIES.addAll(Arrays.asList(ALTER_TABLE_OP, TABLE_LOCATION_URI,
TABLE_IS_EXTERNAL, PK_CONSTRAINT_TRAIT, NOT_NULL_CONSTRAINT_TRAITS,
STORED_AS_FILE_FORMAT, STORED_AS_INPUT_FORMAT, STORED_AS_OUTPUT_FORMAT, SERDE_LIB_CLASS_NAME));
@@ -302,14 +303,18 @@ public class HiveDDLUtils {
public static SqlNodeList deepCopyColList(SqlNodeList colList) {
SqlNodeList res = new SqlNodeList(colList.getParserPosition());
for (SqlNode node : colList) {
- SqlTableColumn col = (SqlTableColumn) node;
- res.add(new SqlTableColumn(
- col.getName(),
- col.getType(),
- col.getConstraint().orElse(null),
- col.getComment().orElse(null),
- col.getParserPosition()));
+ res.add(deepCopyTableColumn((SqlTableColumn) node));
}
return res;
}
+
+ public static SqlTableColumn deepCopyTableColumn(SqlTableColumn column) {
+ return new SqlTableColumn(
+ column.getName(),
+ column.getType(),
+ column.getConstraint().orElse(null),
+ column.getComment().orElse(null),
+ column.getParserPosition()
+ );
+ }
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHivePartitionRename.java
similarity index 56%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
copy to flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHivePartitionRename.java
index 8c5412c..68d6461 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHivePartitionRename.java
@@ -16,7 +16,10 @@
* limitations under the License.
*/
-package org.apache.flink.sql.parser.ddl;
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTable;
+import org.apache.flink.sql.parser.hive.impl.ParseException;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
@@ -25,52 +28,43 @@ import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;
-import java.util.List;
+import javax.annotation.Nonnull;
-import static java.util.Objects.requireNonNull;
+import java.util.List;
/**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName SET ( name=value [, name=value]*).
+ * ALTER TABLE DDL to change the specifications of a Hive partition.
*/
-public class SqlAlterTableProperties extends SqlAlterTable {
+public class SqlAlterHivePartitionRename extends SqlAlterTable {
- private final SqlNodeList propertyList;
+ private final SqlNodeList newPartSpec;
- public SqlAlterTableProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
- super(pos, tableName);
- this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
+ public SqlAlterHivePartitionRename(SqlParserPos pos, SqlIdentifier tableName,
+ SqlNodeList partSpec, SqlNodeList newPartSpec) throws ParseException {
+ super(pos, tableName, partSpec);
+ if (partSpec == null || newPartSpec == null) {
+ throw new ParseException("Both old and new partition spec have to be specified");
+ }
+ this.newPartSpec = newPartSpec;
}
+ @Nonnull
@Override
public List<SqlNode> getOperandList() {
- return ImmutableNullableList.of(tableIdentifier, propertyList);
- }
-
- public SqlNodeList getPropertyList() {
- return propertyList;
+ return ImmutableNullableList.of(tableIdentifier, getPartitionSpec(), newPartSpec);
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
super.unparse(writer, leftPrec, rightPrec);
- writer.keyword("SET");
- SqlWriter.Frame withFrame = writer.startList("(", ")");
- for (SqlNode property : propertyList) {
- printIndent(writer);
- property.unparse(writer, leftPrec, rightPrec);
- }
writer.newlineAndIndent();
- writer.endList(withFrame);
- }
-
- private void printIndent(SqlWriter writer) {
- writer.sep(",", false);
+ writer.keyword("RENAME TO");
writer.newlineAndIndent();
- writer.print(" ");
+ writer.keyword("PARTITION");
+ newPartSpec.unparse(writer, getOperator().getLeftPrec(), getOperator().getRightPrec());
}
- public String[] fullTableName() {
- return tableIdentifier.names.toArray(new String[0]);
+ public SqlNodeList getNewPartSpec() {
+ return newPartSpec;
}
-
}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTable.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTable.java
new file mode 100644
index 0000000..499c949
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableProperties;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Abstract class for ALTER DDL of a Hive table.
+ * Any ALTER TABLE operations that need to be encoded as table properties should extend this class.
+ */
+public abstract class SqlAlterHiveTable extends SqlAlterTableProperties {
+
+ public static final String ALTER_TABLE_OP = "alter.table.op";
+ public static final String ALTER_COL_CASCADE = "alter.column.cascade";
+
+ public SqlAlterHiveTable(AlterTableOp op, SqlParserPos pos, SqlIdentifier tableName,
+ SqlNodeList partSpec, SqlNodeList propertyList) {
+ super(pos, tableName, partSpec, propertyList);
+ propertyList.add(HiveDDLUtils.toTableOption(ALTER_TABLE_OP, op.name(), pos));
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("ALTER TABLE");
+ tableIdentifier.unparse(writer, leftPrec, rightPrec);
+ SqlNodeList partitionSpec = getPartitionSpec();
+ if (partitionSpec != null && partitionSpec.size() > 0) {
+ writer.keyword("PARTITION");
+ partitionSpec.unparse(writer, getOperator().getLeftPrec(), getOperator().getRightPrec());
+ }
+ }
+
+ /**
+ * Type of ALTER TABLE operation.
+ */
+ public enum AlterTableOp {
+ CHANGE_TBL_PROPS,
+ CHANGE_SERDE_PROPS,
+ CHANGE_FILE_FORMAT,
+ CHANGE_LOCATION,
+ ALTER_COLUMNS
+ }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableAddReplaceColumn.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableAddReplaceColumn.java
new file mode 100644
index 0000000..87b1213
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableAddReplaceColumn.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
+import org.apache.flink.sql.parser.hive.impl.ParseException;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * ALTER DDL to ADD or REPLACE columns for a Hive table.
+ */
+public class SqlAlterHiveTableAddReplaceColumn extends SqlAddReplaceColumns {
+
+ private final SqlNodeList origColumns;
+ private final boolean cascade;
+
+ public SqlAlterHiveTableAddReplaceColumn(SqlParserPos pos, SqlIdentifier tableName,
+ boolean cascade, SqlNodeList columns, boolean replace) throws ParseException {
+ super(pos, tableName, columns, replace, new SqlNodeList(pos));
+ this.origColumns = HiveDDLUtils.deepCopyColList(columns);
+ HiveDDLUtils.convertDataTypes(columns);
+ this.cascade = cascade;
+ // set ALTER OP
+ getProperties().add(HiveDDLUtils.toTableOption(
+ SqlAlterHiveTable.ALTER_TABLE_OP, SqlAlterHiveTable.AlterTableOp.ALTER_COLUMNS.name(), pos));
+ // set cascade
+ if (cascade) {
+ getProperties().add(HiveDDLUtils.toTableOption(SqlAlterHiveTable.ALTER_COL_CASCADE, "true", pos));
+ }
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("ALTER TABLE");
+ tableIdentifier.unparse(writer, leftPrec, rightPrec);
+ SqlNodeList partitionSpec = getPartitionSpec();
+ if (partitionSpec != null && partitionSpec.size() > 0) {
+ writer.keyword("PARTITION");
+ partitionSpec.unparse(writer, getOperator().getLeftPrec(), getOperator().getRightPrec());
+ }
+ if (isReplace()) {
+ writer.keyword("REPLACE");
+ } else {
+ writer.keyword("ADD");
+ }
+ writer.keyword("COLUMNS");
+ SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
+ for (SqlNode column : origColumns) {
+ printIndent(writer);
+ column.unparse(writer, leftPrec, rightPrec);
+ }
+ writer.newlineAndIndent();
+ writer.endList(frame);
+ if (cascade) {
+ writer.keyword("CASCADE");
+ } else {
+ writer.keyword("RESTRICT");
+ }
+ }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableChangeColumn.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableChangeColumn.java
new file mode 100644
index 0000000..413518f
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableChangeColumn.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.hive.impl.ParseException;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * ALTER DDL to change a column's name, type, position, etc.
+ */
+public class SqlAlterHiveTableChangeColumn extends SqlChangeColumn {
+
+ private final SqlTableColumn origNewColumn;
+ private final boolean cascade;
+
+ public SqlAlterHiveTableChangeColumn(SqlParserPos pos, SqlIdentifier tableName, boolean cascade,
+ SqlIdentifier oldName, SqlTableColumn newColumn, boolean first, SqlIdentifier after) throws ParseException {
+ super(pos, tableName, oldName, newColumn, after, first, new SqlNodeList(pos));
+ this.origNewColumn = HiveDDLUtils.deepCopyTableColumn(newColumn);
+ HiveDDLUtils.convertDataTypes(newColumn);
+ this.cascade = cascade;
+ // set ALTER OP
+ getProperties().add(HiveDDLUtils.toTableOption(
+ SqlAlterHiveTable.ALTER_TABLE_OP, SqlAlterHiveTable.AlterTableOp.ALTER_COLUMNS.name(), pos));
+ // set cascade
+ if (cascade) {
+ getProperties().add(HiveDDLUtils.toTableOption(SqlAlterHiveTable.ALTER_COL_CASCADE, "true", pos));
+ }
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("ALTER TABLE");
+ tableIdentifier.unparse(writer, leftPrec, rightPrec);
+ SqlNodeList partitionSpec = getPartitionSpec();
+ if (partitionSpec != null && partitionSpec.size() > 0) {
+ writer.keyword("PARTITION");
+ partitionSpec.unparse(writer, getOperator().getLeftPrec(), getOperator().getRightPrec());
+ }
+ writer.keyword("CHANGE COLUMN");
+ getOldName().unparse(writer, leftPrec, rightPrec);
+ origNewColumn.unparse(writer, leftPrec, rightPrec);
+ if (isFirst()) {
+ writer.keyword("FIRST");
+ }
+ if (getAfter() != null) {
+ writer.keyword("AFTER");
+ getAfter().unparse(writer, leftPrec, rightPrec);
+ }
+ if (cascade) {
+ writer.keyword("CASCADE");
+ } else {
+ writer.keyword("RESTRICT");
+ }
+ }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableFileFormat.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableFileFormat.java
new file mode 100644
index 0000000..2041ca3
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableFileFormat.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_FILE_FORMAT;
+
+/**
+ * ALTER TABLE DDL to change a Hive table/partition's file format.
+ */
+public class SqlAlterHiveTableFileFormat extends SqlAlterHiveTable {
+
+ private final SqlIdentifier format;
+
+ public SqlAlterHiveTableFileFormat(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partSpec, SqlIdentifier format) {
+ super(CHANGE_FILE_FORMAT, pos, tableName, partSpec, createPropList(format));
+ this.format = format;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+ writer.keyword("SET FILEFORMAT");
+ format.unparse(writer, leftPrec, rightPrec);
+ }
+
+ private static SqlNodeList createPropList(SqlIdentifier format) {
+ SqlNodeList res = new SqlNodeList(format.getParserPosition());
+ res.add(HiveDDLUtils.toTableOption(
+ SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT,
+ format.getSimple(),
+ format.getParserPosition()));
+ return res;
+ }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableLocation.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableLocation.java
new file mode 100644
index 0000000..f3a470d
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableLocation.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_LOCATION;
+
+/**
+ * ALTER TABLE DDL to change a Hive table/partition's location.
+ */
+public class SqlAlterHiveTableLocation extends SqlAlterHiveTable {
+
+ private final SqlCharStringLiteral location;
+
+ public SqlAlterHiveTableLocation(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partitionSpec,
+ SqlCharStringLiteral location) {
+ super(CHANGE_LOCATION, pos, tableName, partitionSpec, createPropList(location));
+ this.location = location;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+ writer.keyword("SET LOCATION");
+ location.unparse(writer, leftPrec, rightPrec);
+ }
+
+ private static SqlNodeList createPropList(SqlCharStringLiteral location) {
+ SqlNodeList res = new SqlNodeList(location.getParserPosition());
+ res.add(HiveDDLUtils.toTableOption(
+ SqlCreateHiveTable.TABLE_LOCATION_URI,
+ location, location.getParserPosition()));
+ return res;
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableProps.java
similarity index 56%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
copy to flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableProps.java
index 8c5412c..324d588 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableProps.java
@@ -16,61 +16,43 @@
* limitations under the License.
*/
-package org.apache.flink.sql.parser.ddl;
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.hive.impl.ParseException;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
-
-import java.util.List;
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_TBL_PROPS;
/**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName SET ( name=value [, name=value]*).
+ * ALTER DDL to change properties of a Hive table.
*/
-public class SqlAlterTableProperties extends SqlAlterTable {
+public class SqlAlterHiveTableProps extends SqlAlterHiveTable {
- private final SqlNodeList propertyList;
-
- public SqlAlterTableProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
- super(pos, tableName);
- this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
- }
-
- @Override
- public List<SqlNode> getOperandList() {
- return ImmutableNullableList.of(tableIdentifier, propertyList);
- }
+ private final SqlNodeList origProps;
- public SqlNodeList getPropertyList() {
- return propertyList;
+ public SqlAlterHiveTableProps(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList)
+ throws ParseException {
+ super(CHANGE_TBL_PROPS, pos, tableName, null, HiveDDLUtils.checkReservedTableProperties(propertyList));
+ // remove the last property which is the ALTER_TABLE_OP
+ this.origProps = new SqlNodeList(propertyList.getList().subList(0, propertyList.size() - 1),
+ propertyList.getParserPosition());
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
super.unparse(writer, leftPrec, rightPrec);
- writer.keyword("SET");
+ writer.keyword("SET TBLPROPERTIES");
SqlWriter.Frame withFrame = writer.startList("(", ")");
- for (SqlNode property : propertyList) {
+ for (SqlNode property : getPropertyList()) {
printIndent(writer);
property.unparse(writer, leftPrec, rightPrec);
}
writer.newlineAndIndent();
writer.endList(withFrame);
}
-
- private void printIndent(SqlWriter writer) {
- writer.sep(",", false);
- writer.newlineAndIndent();
- writer.print(" ");
- }
-
- public String[] fullTableName() {
- return tableIdentifier.names.toArray(new String[0]);
- }
-
}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableSerDe.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableSerDe.java
new file mode 100644
index 0000000..f9ce8d2
--- /dev/null
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableSerDe.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat;
+import org.apache.flink.sql.parser.hive.impl.ParseException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_SERDE_PROPS;
+
+/**
+ * ALTER TABLE DDL to change a Hive table's SerDe.
+ */
+public class SqlAlterHiveTableSerDe extends SqlAlterHiveTable {
+
+ private final SqlCharStringLiteral serdeLib;
+ private final SqlNodeList origSerDeProps;
+
+ public SqlAlterHiveTableSerDe(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partitionSpec,
+ SqlNodeList propertyList, SqlCharStringLiteral serdeLib) throws ParseException {
+ super(CHANGE_SERDE_PROPS, pos, tableName, partitionSpec, HiveDDLUtils.checkReservedTableProperties(propertyList));
+ // remove the last property which is the ALTER_TABLE_OP
+ origSerDeProps = new SqlNodeList(propertyList.getList().subList(0, propertyList.size() - 1),
+ propertyList.getParserPosition());
+ appendPrefix(getPropertyList());
+ if (serdeLib != null) {
+ propertyList.add(HiveDDLUtils.toTableOption(
+ HiveTableRowFormat.SERDE_LIB_CLASS_NAME, serdeLib, serdeLib.getParserPosition()));
+ }
+ this.serdeLib = serdeLib;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+ writer.keyword("SET");
+ if (serdeLib != null) {
+ writer.keyword("SERDE");
+ serdeLib.unparse(writer, leftPrec, rightPrec);
+ }
+ if (origSerDeProps != null && origSerDeProps.size() > 0) {
+ if (serdeLib == null) {
+ writer.keyword("SERDEPROPERTIES");
+ } else {
+ writer.keyword("WITH SERDEPROPERTIES");
+ }
+ SqlWriter.Frame withFrame = writer.startList("(", ")");
+ for (SqlNode property : origSerDeProps) {
+ printIndent(writer);
+ property.unparse(writer, leftPrec, rightPrec);
+ }
+ writer.newlineAndIndent();
+ writer.endList(withFrame);
+ }
+ }
+
+ private static SqlNodeList appendPrefix(SqlNodeList propList) {
+ if (propList != null) {
+ for (int i = 0; i < propList.size(); i++) {
+ SqlTableOption tableOption = (SqlTableOption) propList.get(i);
+ if (!tableOption.getKeyString().equals(ALTER_TABLE_OP)) {
+ String key = HiveTableRowFormat.SERDE_INFO_PROP_PREFIX + tableOption.getKeyString();
+ tableOption = HiveDDLUtils.toTableOption(key, tableOption.getValue(), tableOption.getParserPosition());
+ propList.set(i, tableOption);
+ }
+ }
+ }
+ return propList;
+ }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index 61a5fc0..9bc3de7 100644
--- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -264,4 +264,78 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest {
public void testDescribeCatalog() {
sql("describe catalog cat").ok("DESCRIBE CATALOG `CAT`");
}
+
+ @Test
+ public void testAlterTableRename() {
+ sql("alter table tbl rename to tbl1").ok("ALTER TABLE `TBL` RENAME TO `TBL1`");
+ }
+
+ @Test
+ public void testAlterTableSerDe() {
+ sql("alter table tbl set serde 'serde.class' with serdeproperties ('field.delim'='\u0001')")
+ .ok("ALTER TABLE `TBL` SET SERDE 'serde.class' WITH SERDEPROPERTIES (\n" +
+ " 'field.delim' = u&'\\0001'\n" +
+ ")");
+ sql("alter table tbl set serdeproperties('line.delim'='\n')")
+ .ok("ALTER TABLE `TBL` SET SERDEPROPERTIES (\n" +
+ " 'line.delim' = '\n" +
+ "'\n" +
+ ")");
+ }
+
+ @Test
+ public void testAlterTableLocation() {
+ sql("alter table tbl set location '/new/table/path'")
+ .ok("ALTER TABLE `TBL` SET LOCATION '/new/table/path'");
+ sql("alter table tbl partition (p1=1,p2='a') set location '/new/partition/location'")
+ .ok("ALTER TABLE `TBL` PARTITION (`P1` = 1, `P2` = 'a') SET LOCATION '/new/partition/location'");
+ }
+
+ // TODO: support ALTER CLUSTERED BY, SKEWED, STORED AS DIRECTORIES, column constraints
+
+ @Test
+ public void testAlterPartitionRename() {
+ sql("alter table tbl partition (p=1) rename to partition (p=2)")
+ .ok("ALTER TABLE `TBL` PARTITION (`P` = 1)\n" +
+ "RENAME TO\n" +
+ "PARTITION (`P` = 2)");
+ }
+
+ // TODO: support EXCHANGE PARTITION, RECOVER PARTITIONS
+
+ // TODO: support (UN)ARCHIVE PARTITION
+
+ @Test
+ public void testAlterFileFormat() {
+ sql("alter table tbl set fileformat rcfile")
+ .ok("ALTER TABLE `TBL` SET FILEFORMAT `RCFILE`");
+ sql("alter table tbl partition (p=1) set fileformat sequencefile")
+ .ok("ALTER TABLE `TBL` PARTITION (`P` = 1) SET FILEFORMAT `SEQUENCEFILE`");
+ }
+
+ // TODO: support ALTER TABLE/PARTITION TOUCH, PROTECTION, COMPACT, CONCATENATE, UPDATE COLUMNS
+
+ @Test
+ public void testChangeColumn() {
+ sql("alter table tbl change c c1 struct<f0:timestamp,f1:array<char(5)>> restrict")
+ .ok("ALTER TABLE `TBL` CHANGE COLUMN `C` `C1` STRUCT< `F0` TIMESTAMP, `F1` ARRAY< CHAR(5) > > RESTRICT");
+ sql("alter table tbl change column c c decimal(5,2) comment 'new comment' first cascade")
+ .ok("ALTER TABLE `TBL` CHANGE COLUMN `C` `C` DECIMAL(5, 2) COMMENT 'new comment' FIRST CASCADE");
+ }
+
+ @Test
+ public void testAddReplaceColumn() {
+ sql("alter table tbl add columns (a float,b timestamp,c binary) cascade")
+ .ok("ALTER TABLE `TBL` ADD COLUMNS (\n" +
+ " `A` FLOAT,\n" +
+ " `B` TIMESTAMP,\n" +
+ " `C` BINARY\n" +
+ ") CASCADE");
+ sql("alter table tbl replace columns (a char(100),b tinyint comment 'tiny comment',c smallint) restrict")
+ .ok("ALTER TABLE `TBL` REPLACE COLUMNS (\n" +
+ " `A` CHAR(100),\n" +
+ " `B` TINYINT COMMENT 'tiny comment',\n" +
+ " `C` SMALLINT\n" +
+ ") RESTRICT");
+ }
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionUtils.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionUtils.java
new file mode 100644
index 0000000..5a2abac
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.LinkedHashMap;
+
+/**
+ * Utils methods for partition DDLs.
+ */
+public class SqlPartitionUtils {
+
+ private SqlPartitionUtils() {
+ }
+
+ /** Get static partition key value pair as strings.
+ *
+ * <p>For character literals we return the unquoted and unescaped values.
+ * For other types we use {@link SqlLiteral#toString()} to get
+ * the string format of the value literal.
+ *
+ * @return the mapping of column names to values of partition specifications,
+ * returns an empty map if there is no partition specifications.
+ */
+ public static LinkedHashMap<String, String> getPartitionKVs(SqlNodeList partitionSpec) {
+ if (partitionSpec == null) {
+ return null;
+ }
+ LinkedHashMap<String, String> ret = new LinkedHashMap<>();
+ if (partitionSpec.size() == 0) {
+ return ret;
+ }
+ for (SqlNode node : partitionSpec.getList()) {
+ SqlProperty sqlProperty = (SqlProperty) node;
+ Comparable comparable = SqlLiteral.value(sqlProperty.getValue());
+ String value = comparable instanceof NlsString ? ((NlsString) comparable).getValue() : comparable.toString();
+ ret.put(sqlProperty.getKey().getSimple(), value);
+ }
+ return ret;
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
similarity index 53%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
index 8c5412c..b6eefba 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
@@ -25,52 +25,73 @@ import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;
-import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
-import static java.util.Objects.requireNonNull;
+import java.util.List;
/**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName SET ( name=value [, name=value]*).
+ * ALTER DDL to ADD or REPLACE columns for a table.
*/
-public class SqlAlterTableProperties extends SqlAlterTable {
+public class SqlAddReplaceColumns extends SqlAlterTable {
- private final SqlNodeList propertyList;
+ private final SqlNodeList newColumns;
+ // Whether to replace all the existing columns. If false, new columns will be appended to the end of the schema.
+ private final boolean replace;
+ // properties that should be added to the table
+ private final SqlNodeList properties;
- public SqlAlterTableProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
+ public SqlAddReplaceColumns(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList newColumns,
+ boolean replace,
+ @Nullable SqlNodeList properties) {
super(pos, tableName);
- this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
+ this.newColumns = newColumns;
+ this.replace = replace;
+ this.properties = properties;
}
- @Override
- public List<SqlNode> getOperandList() {
- return ImmutableNullableList.of(tableIdentifier, propertyList);
+ public SqlNodeList getNewColumns() {
+ return newColumns;
+ }
+
+ public boolean isReplace() {
+ return replace;
+ }
+
+ public SqlNodeList getProperties() {
+ return properties;
}
- public SqlNodeList getPropertyList() {
- return propertyList;
+ @Nonnull
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(tableIdentifier, partitionSpec, newColumns, properties);
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
super.unparse(writer, leftPrec, rightPrec);
- writer.keyword("SET");
- SqlWriter.Frame withFrame = writer.startList("(", ")");
- for (SqlNode property : propertyList) {
+ if (replace) {
+ writer.keyword("REPLACE");
+ } else {
+ writer.keyword("ADD");
+ }
+ writer.keyword("COLUMNS");
+ SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
+ for (SqlNode column : newColumns) {
printIndent(writer);
- property.unparse(writer, leftPrec, rightPrec);
+ column.unparse(writer, leftPrec, rightPrec);
}
writer.newlineAndIndent();
- writer.endList(withFrame);
+ writer.endList(frame);
}
- private void printIndent(SqlWriter writer) {
+ protected void printIndent(SqlWriter writer) {
writer.sep(",", false);
writer.newlineAndIndent();
writer.print(" ");
}
-
- public String[] fullTableName() {
- return tableIdentifier.names.toArray(new String[0]);
- }
-
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
index b279953..9560626 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
@@ -18,14 +18,21 @@
package org.apache.flink.sql.parser.ddl;
+import org.apache.flink.sql.parser.SqlPartitionUtils;
+
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashMap;
+
import static java.util.Objects.requireNonNull;
/**
@@ -36,12 +43,21 @@ public abstract class SqlAlterTable extends SqlCall {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ALTER TABLE", SqlKind.ALTER_TABLE);
protected final SqlIdentifier tableIdentifier;
+ protected final SqlNodeList partitionSpec;
public SqlAlterTable(
SqlParserPos pos,
- SqlIdentifier tableName) {
+ SqlIdentifier tableName,
+ @Nullable SqlNodeList partitionSpec) {
super(pos);
this.tableIdentifier = requireNonNull(tableName, "tableName should not be null");
+ this.partitionSpec = partitionSpec;
+ }
+
+ public SqlAlterTable(
+ SqlParserPos pos,
+ SqlIdentifier tableName) {
+ this(pos, tableName, null);
}
@Override
@@ -57,9 +73,28 @@ public abstract class SqlAlterTable extends SqlCall {
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("ALTER TABLE");
tableIdentifier.unparse(writer, leftPrec, rightPrec);
+ SqlNodeList partitionSpec = getPartitionSpec();
+ if (partitionSpec != null && partitionSpec.size() > 0) {
+ writer.keyword("PARTITION");
+ partitionSpec.unparse(writer, getOperator().getLeftPrec(), getOperator().getRightPrec());
+ }
}
public String[] fullTableName() {
return tableIdentifier.names.toArray(new String[0]);
}
+
+ /**
+ * Returns the partition spec if the ALTER should be applied to partitions, and null otherwise.
+ */
+ public SqlNodeList getPartitionSpec() {
+ return partitionSpec;
+ }
+
+ /**
+ * Get partition spec as key-value strings.
+ */
+ public LinkedHashMap<String, String> getPartitionKVs() {
+ return SqlPartitionUtils.getPartitionKVs(getPartitionSpec());
+ }
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
index 8c5412c..d18a37e 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableProperties.java
@@ -37,7 +37,11 @@ public class SqlAlterTableProperties extends SqlAlterTable {
private final SqlNodeList propertyList;
public SqlAlterTableProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
- super(pos, tableName);
+ this(pos, tableName, null, propertyList);
+ }
+
+ public SqlAlterTableProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partitionSpec, SqlNodeList propertyList) {
+ super(pos, tableName, partitionSpec);
this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
}
@@ -63,7 +67,7 @@ public class SqlAlterTableProperties extends SqlAlterTable {
writer.endList(withFrame);
}
- private void printIndent(SqlWriter writer) {
+ protected void printIndent(SqlWriter writer) {
writer.sep(",", false);
writer.newlineAndIndent();
writer.print(" ");
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java
new file mode 100644
index 0000000..8f84771
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * ALTER DDL to CHANGE a column for a table.
+ */
+public class SqlChangeColumn extends SqlAlterTable {
+
+ private final SqlIdentifier oldName;
+ private final SqlTableColumn newColumn;
+
+ // Specify the position of the new column. If neither after nor first is set, the column is changed in place.
+ // the name of the column after which the new column should be placed
+ private final SqlIdentifier after;
+ // whether the new column should be placed as the first column of the schema
+ private final boolean first;
+
+ // properties that should be added to the table
+ private final SqlNodeList properties;
+
+ public SqlChangeColumn(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlIdentifier oldName,
+ SqlTableColumn newColumn,
+ @Nullable SqlIdentifier after,
+ boolean first,
+ @Nullable SqlNodeList properties) {
+ super(pos, tableName);
+ if (after != null && first) {
+ throw new IllegalArgumentException("FIRST and AFTER cannot be set at the same time");
+ }
+ this.oldName = oldName;
+ this.newColumn = newColumn;
+ this.after = after;
+ this.first = first;
+ this.properties = properties;
+ }
+
+ public SqlIdentifier getOldName() {
+ return oldName;
+ }
+
+ public SqlTableColumn getNewColumn() {
+ return newColumn;
+ }
+
+ public SqlIdentifier getAfter() {
+ return after;
+ }
+
+ public boolean isFirst() {
+ return first;
+ }
+
+ public SqlNodeList getProperties() {
+ return properties;
+ }
+
+ @Nonnull
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(tableIdentifier, partitionSpec, oldName, newColumn, after);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+ writer.keyword("CHANGE COLUMN");
+ oldName.unparse(writer, leftPrec, rightPrec);
+ newColumn.unparse(writer, leftPrec, rightPrec);
+ if (first) {
+ writer.keyword("FIST");
+ }
+ if (after != null) {
+ writer.keyword("AFTER");
+ after.unparse(writer, leftPrec, rightPrec);
+ }
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index f1aab30..cd34b6b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -92,11 +92,13 @@ import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOperation;
import org.apache.flink.table.operations.ddl.AlterTablePropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
+import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
@@ -821,6 +823,17 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
dropConstraintOperation.getTableIdentifier().toObjectPath(),
newTable,
false);
+ } else if (alterTableOperation instanceof AlterPartitionPropertiesOperation) {
+ AlterPartitionPropertiesOperation alterPartPropsOp = (AlterPartitionPropertiesOperation) operation;
+ catalog.alterPartition(alterPartPropsOp.getTableIdentifier().toObjectPath(),
+ alterPartPropsOp.getPartitionSpec(),
+ alterPartPropsOp.getCatalogPartition(),
+ false);
+ } else if (alterTableOperation instanceof AlterTableSchemaOperation) {
+ AlterTableSchemaOperation alterTableSchemaOperation = (AlterTableSchemaOperation) alterTableOperation;
+ catalog.alterTable(alterTableSchemaOperation.getTableIdentifier().toObjectPath(),
+ alterTableSchemaOperation.getCatalogTable(),
+ false);
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (TableAlreadyExistException | TableNotExistException e) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 73ed2a7..c3c0e7a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.util.StringUtils;
@@ -328,6 +329,25 @@ public final class CatalogManager {
return Optional.empty();
}
+ /**
+ * Retrieves a partition with a fully qualified table path and partition spec.
+ * If the path is not yet fully qualified use{@link #qualifyIdentifier(UnresolvedIdentifier)} first.
+ *
+ * @param tableIdentifier full path of the table to retrieve
+ * @param partitionSpec full partition spec
+ * @return partition in the table.
+ */
+ public Optional<CatalogPartition> getPartition(ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) {
+ Catalog catalog = catalogs.get(tableIdentifier.getCatalogName());
+ if (catalog != null) {
+ try {
+ return Optional.of(catalog.getPartition(tableIdentifier.toObjectPath(), partitionSpec));
+ } catch (PartitionNotExistException ignored) {
+ }
+ }
+ return Optional.empty();
+ }
+
private Optional<TableLookupResult> getPermanentTable(ObjectIdentifier objectIdentifier)
throws TableNotExistException {
Catalog currentCatalog = catalogs.get(objectIdentifier.getCatalogName());
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
index feece6e..7f98646 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.util.StringUtils;
import java.util.Arrays;
@@ -108,6 +109,17 @@ public class OperationUtils {
return stringBuilder.toString();
}
+ public static String formatProperties(Map<String, String> properties) {
+ return properties.entrySet().stream()
+ .map(entry -> formatParameter(entry.getKey(), entry.getValue()))
+ .collect(Collectors.joining(", "));
+ }
+
+ public static String formatPartitionSpec(CatalogPartitionSpec spec) {
+ return spec.getPartitionSpec().entrySet().stream()
+ .map(entry -> entry.getKey() + "=" + entry.getValue()).collect(Collectors.joining(", "));
+ }
+
private OperationUtils() {
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java
new file mode 100644
index 0000000..5376618
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/**
+ * Abstract Operation to describe all ALTER TABLE statements that should be applied to partitions.
+ */
+public abstract class AlterPartitionOperation extends AlterTableOperation {
+
+ protected final CatalogPartitionSpec partitionSpec;
+
+ public AlterPartitionOperation(ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) {
+ super(tableIdentifier);
+ this.partitionSpec = partitionSpec;
+ }
+
+ public CatalogPartitionSpec getPartitionSpec() {
+ return partitionSpec;
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionPropertiesOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionPropertiesOperation.java
new file mode 100644
index 0000000..267ec7d
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionPropertiesOperation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+/**
+ * Operation to alter the properties of partition.
+ */
+public class AlterPartitionPropertiesOperation extends AlterPartitionOperation {
+
+ private final CatalogPartition catalogPartition;
+
+ public AlterPartitionPropertiesOperation(ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec,
+ CatalogPartition catalogPartition) {
+ super(tableIdentifier, partitionSpec);
+ this.catalogPartition = catalogPartition;
+ }
+
+ public CatalogPartition getCatalogPartition() {
+ return catalogPartition;
+ }
+
+ @Override
+ public String asSummaryString() {
+ String spec = OperationUtils.formatPartitionSpec(partitionSpec);
+ String properties = OperationUtils.formatProperties(catalogPartition.getProperties());
+ return String.format("ALTER TABLE %s PARTITION (%s) SET (%s)", tableIdentifier.asSummaryString(), spec, properties);
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java
new file mode 100644
index 0000000..8f8c468
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/**
+ * Operation to describe altering the schema of a table.
+ */
+public class AlterTableSchemaOperation extends AlterTableOperation {
+
+ // the CatalogTable with the updated schema
+ private final CatalogTable catalogTable;
+
+ public AlterTableSchemaOperation(ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+ super(tableIdentifier);
+ this.catalogTable = catalogTable;
+ }
+
+ public CatalogTable getCatalogTable() {
+ return catalogTable;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return String.format("ALTER TABLE %s SET SCHEMA %s",
+ tableIdentifier.asSummaryString(), catalogTable.getSchema().toString());
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 11dece9..3dcdb39 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.operations;
+import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
import org.apache.flink.sql.parser.ddl.SqlAlterTable;
@@ -25,6 +26,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint;
import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
import org.apache.flink.sql.parser.ddl.SqlAlterTableProperties;
import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
+import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
import org.apache.flink.sql.parser.ddl.SqlCreateDatabase;
import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
@@ -49,13 +51,16 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
import org.apache.flink.table.catalog.FunctionLanguage;
@@ -77,6 +82,7 @@ import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTablePropertiesOperation;
@@ -93,6 +99,7 @@ import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.StringUtils;
@@ -115,6 +122,7 @@ import org.apache.calcite.sql.parser.SqlParser;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -231,75 +239,66 @@ public class SqlToOperationConverter {
private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlAlterTable.fullTableName());
ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+ Optional<CatalogManager.TableLookupResult> optionalCatalogTable = catalogManager.getTable(tableIdentifier);
+ if (!optionalCatalogTable.isPresent() || optionalCatalogTable.get().isTemporary()) {
+ throw new ValidationException(String.format("Table %s doesn't exist or is a temporary table.",
+ tableIdentifier.toString()));
+ }
+ CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
if (sqlAlterTable instanceof SqlAlterTableRename) {
UnresolvedIdentifier newUnresolvedIdentifier =
UnresolvedIdentifier.of(((SqlAlterTableRename) sqlAlterTable).fullNewTableName());
ObjectIdentifier newTableIdentifier = catalogManager.qualifyIdentifier(newUnresolvedIdentifier);
return new AlterTableRenameOperation(tableIdentifier, newTableIdentifier);
} else if (sqlAlterTable instanceof SqlAlterTableProperties) {
- Optional<CatalogManager.TableLookupResult> optionalCatalogTable = catalogManager.getTable(tableIdentifier);
- if (optionalCatalogTable.isPresent() && !optionalCatalogTable.get().isTemporary()) {
- CatalogTable originalCatalogTable = (CatalogTable) optionalCatalogTable.get().getTable();
- Map<String, String> properties = new HashMap<>(originalCatalogTable.getOptions());
- ((SqlAlterTableProperties) sqlAlterTable).getPropertyList().getList().forEach(p ->
- properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
- CatalogTable catalogTable = new CatalogTableImpl(
- originalCatalogTable.getSchema(),
- originalCatalogTable.getPartitionKeys(),
- properties,
- originalCatalogTable.getComment());
- return new AlterTablePropertiesOperation(tableIdentifier, catalogTable);
- } else {
- throw new ValidationException(String.format("Table %s doesn't exist or is a temporary table.",
- tableIdentifier.toString()));
- }
+ return convertAlterTableProperties(
+ tableIdentifier,
+ (CatalogTable) baseTable,
+ (SqlAlterTableProperties) sqlAlterTable);
} else if (sqlAlterTable instanceof SqlAlterTableAddConstraint) {
- Optional<CatalogManager.TableLookupResult> optionalCatalogTable =
- catalogManager.getTable(tableIdentifier);
- if (optionalCatalogTable.isPresent() && !optionalCatalogTable.get().isTemporary()) {
- SqlTableConstraint constraint = ((SqlAlterTableAddConstraint) sqlAlterTable)
- .getConstraint();
- validateTableConstraint(constraint);
- TableSchema oriSchema = optionalCatalogTable.get().getTable().getSchema();
- // Sanity check for constraint.
- TableSchema.Builder builder = TableSchemaUtils.builderWithGivenSchema(oriSchema);
- if (constraint.getConstraintName().isPresent()) {
- builder.primaryKey(
- constraint.getConstraintName().get(),
- constraint.getColumnNames());
- } else {
- builder.primaryKey(constraint.getColumnNames());
- }
- builder.build();
- return new AlterTableAddConstraintOperation(
- tableIdentifier,
- constraint.getConstraintName().orElse(null),
+ SqlTableConstraint constraint = ((SqlAlterTableAddConstraint) sqlAlterTable)
+ .getConstraint();
+ validateTableConstraint(constraint);
+ TableSchema oriSchema = baseTable.getSchema();
+ // Sanity check for constraint.
+ TableSchema.Builder builder = TableSchemaUtils.builderWithGivenSchema(oriSchema);
+ if (constraint.getConstraintName().isPresent()) {
+ builder.primaryKey(
+ constraint.getConstraintName().get(),
constraint.getColumnNames());
} else {
- throw new ValidationException(String.format("Table %s doesn't exist or is a temporary table.",
- tableIdentifier.toString()));
+ builder.primaryKey(constraint.getColumnNames());
}
+ builder.build();
+ return new AlterTableAddConstraintOperation(
+ tableIdentifier,
+ constraint.getConstraintName().orElse(null),
+ constraint.getColumnNames());
} else if (sqlAlterTable instanceof SqlAlterTableDropConstraint) {
- Optional<CatalogManager.TableLookupResult> optionalCatalogTable =
- catalogManager.getTable(tableIdentifier);
- if (optionalCatalogTable.isPresent() && !optionalCatalogTable.get().isTemporary()) {
- SqlAlterTableDropConstraint dropConstraint = ((SqlAlterTableDropConstraint) sqlAlterTable);
- String constraintName = dropConstraint.getConstraintName().getSimple();
- CatalogTable oriCatalogTable = (CatalogTable) optionalCatalogTable.get().getTable();
- TableSchema oriSchema = oriCatalogTable.getSchema();
- if (!oriSchema.getPrimaryKey()
- .filter(pk -> pk.getName().equals(constraintName))
- .isPresent()) {
- throw new ValidationException(
- String.format("CONSTRAINT [%s] does not exist", constraintName));
- }
- return new AlterTableDropConstraintOperation(
- tableIdentifier,
- constraintName);
- } else {
- throw new ValidationException(String.format("Table %s doesn't exist or is a temporary table.",
- tableIdentifier.toString()));
+ SqlAlterTableDropConstraint dropConstraint = ((SqlAlterTableDropConstraint) sqlAlterTable);
+ String constraintName = dropConstraint.getConstraintName().getSimple();
+ TableSchema oriSchema = baseTable.getSchema();
+ if (!oriSchema.getPrimaryKey()
+ .filter(pk -> pk.getName().equals(constraintName))
+ .isPresent()) {
+ throw new ValidationException(
+ String.format("CONSTRAINT [%s] does not exist", constraintName));
}
+ return new AlterTableDropConstraintOperation(
+ tableIdentifier,
+ constraintName);
+ } else if (sqlAlterTable instanceof SqlAddReplaceColumns) {
+ return OperationConverterUtils.convertAddReplaceColumns(
+ tableIdentifier,
+ (SqlAddReplaceColumns) sqlAlterTable,
+ (CatalogTable) baseTable,
+ flinkPlanner.getOrCreateSqlValidator());
+ } else if (sqlAlterTable instanceof SqlChangeColumn) {
+ return OperationConverterUtils.convertChangeColumn(
+ tableIdentifier,
+ (SqlChangeColumn) sqlAlterTable,
+ (CatalogTable) baseTable,
+ flinkPlanner.getOrCreateSqlValidator());
} else {
throw new ValidationException(
String.format("[%s] needs to implement",
@@ -307,6 +306,29 @@ public class SqlToOperationConverter {
}
}
+ private Operation convertAlterTableProperties(ObjectIdentifier tableIdentifier, CatalogTable oldTable,
+ SqlAlterTableProperties alterTableProperties) {
+ LinkedHashMap<String, String> partitionKVs = alterTableProperties.getPartitionKVs();
+ // it's altering partitions
+ if (partitionKVs != null) {
+ CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(partitionKVs);
+ CatalogPartition catalogPartition = catalogManager.getPartition(tableIdentifier, partitionSpec)
+ .orElseThrow(() -> new ValidationException(String.format("Partition %s of table %s doesn't exist",
+ partitionSpec.getPartitionSpec(), tableIdentifier)));
+ Map<String, String> newProps = new HashMap<>(catalogPartition.getProperties());
+ newProps.putAll(OperationConverterUtils.extractProperties(alterTableProperties.getPropertyList()));
+ return new AlterPartitionPropertiesOperation(
+ tableIdentifier,
+ partitionSpec,
+ new CatalogPartitionImpl(newProps, catalogPartition.getComment()));
+ } else {
+ // it's altering a table
+ Map<String, String> newProperties = new HashMap<>(oldTable.getOptions());
+ newProperties.putAll(OperationConverterUtils.extractProperties(alterTableProperties.getPropertyList()));
+ return new AlterTablePropertiesOperation(tableIdentifier, oldTable.copy(newProperties));
+ }
+ }
+
/** Convert CREATE FUNCTION statement. */
private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
UnresolvedIdentifier unresolvedIdentifier =
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
new file mode 100644
index 0000000..d36bd6f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
+import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utils methods for converting sql to operations.
+ */
+public class OperationConverterUtils {
+
+ private OperationConverterUtils() {
+ }
+
+ public static Operation convertAddReplaceColumns(
+ ObjectIdentifier tableIdentifier,
+ SqlAddReplaceColumns addReplaceColumns,
+ CatalogTable catalogTable,
+ SqlValidator sqlValidator) {
+ // This is only used by the Hive dialect at the moment. In Hive, only non-partition columns can be
+ // added/replaced and users will only define non-partition columns in the new column list. Therefore, we require
+ // that partitions columns must appear last in the schema (which is inline with Hive). Otherwise, we won't be
+ // able to determine the column positions after the non-partition columns are replaced.
+ TableSchema oldSchema = catalogTable.getSchema();
+ int numPartCol = catalogTable.getPartitionKeys().size();
+ Set<String> lastCols = oldSchema.getTableColumns()
+ .subList(oldSchema.getFieldCount() - numPartCol, oldSchema.getFieldCount())
+ .stream().map(TableColumn::getName).collect(Collectors.toSet());
+ if (!lastCols.equals(new HashSet<>(catalogTable.getPartitionKeys()))) {
+ throw new ValidationException("ADD/REPLACE COLUMNS on partitioned tables requires partition columns to appear last");
+ }
+
+ // set non-partition columns
+ TableSchema.Builder builder = TableSchema.builder();
+ if (!addReplaceColumns.isReplace()) {
+ List<TableColumn> nonPartCols = oldSchema.getTableColumns().subList(0, oldSchema.getFieldCount() - numPartCol);
+ for (TableColumn column : nonPartCols) {
+ builder.add(column);
+ }
+ setWatermarkAndPK(builder, catalogTable.getSchema());
+ }
+ for (SqlNode sqlNode : addReplaceColumns.getNewColumns()) {
+ builder.add(toTableColumn((SqlTableColumn) sqlNode, sqlValidator));
+ }
+
+ // set partition columns
+ List<TableColumn> partCols = oldSchema.getTableColumns().subList(oldSchema.getFieldCount() - numPartCol, oldSchema.getFieldCount());
+ for (TableColumn column : partCols) {
+ builder.add(column);
+ }
+
+ // set properties
+ Map<String, String> newProperties = new HashMap<>(catalogTable.getOptions());
+ newProperties.putAll(extractProperties(addReplaceColumns.getProperties()));
+
+ return new AlterTableSchemaOperation(
+ tableIdentifier,
+ new CatalogTableImpl(
+ builder.build(),
+ catalogTable.getPartitionKeys(),
+ newProperties,
+ catalogTable.getComment())
+ );
+ }
+
+ public static Operation convertChangeColumn(
+ ObjectIdentifier tableIdentifier,
+ SqlChangeColumn changeColumn,
+ CatalogTable catalogTable,
+ SqlValidator sqlValidator) {
+ String oldName = changeColumn.getOldName().getSimple();
+ if (catalogTable.getPartitionKeys().indexOf(oldName) >= 0) {
+ // disallow changing partition columns
+ throw new ValidationException("CHANGE COLUMN cannot be applied to partition columns");
+ }
+ TableSchema oldSchema = catalogTable.getSchema();
+ int oldIndex = Arrays.asList(oldSchema.getFieldNames()).indexOf(oldName);
+ if (oldIndex < 0) {
+ throw new ValidationException(String.format("Old column %s not found for CHANGE COLUMN", oldName));
+ }
+ boolean first = changeColumn.isFirst();
+ String after = changeColumn.getAfter() == null ? null : changeColumn.getAfter().getSimple();
+ List<TableColumn> tableColumns = oldSchema.getTableColumns();
+ TableColumn newTableColumn = toTableColumn(changeColumn.getNewColumn(), sqlValidator);
+ if ((!first && after == null) || oldName.equals(after)) {
+ tableColumns.set(oldIndex, newTableColumn);
+ } else {
+ // need to change column position
+ tableColumns.remove(oldIndex);
+ if (first) {
+ tableColumns.add(0, newTableColumn);
+ } else {
+ int newIndex = tableColumns
+ .stream()
+ .map(TableColumn::getName)
+ .collect(Collectors.toList())
+ .indexOf(after);
+ if (newIndex < 0) {
+ throw new ValidationException(String.format("After column %s not found for CHANGE COLUMN", after));
+ }
+ tableColumns.add(newIndex + 1, newTableColumn);
+ }
+ }
+ TableSchema.Builder builder = TableSchema.builder();
+ for (TableColumn column : tableColumns) {
+ builder.add(column);
+ }
+ setWatermarkAndPK(builder, oldSchema);
+ TableSchema newSchema = builder.build();
+ Map<String, String> newProperties = new HashMap<>(catalogTable.getOptions());
+ newProperties.putAll(extractProperties(changeColumn.getProperties()));
+ return new AlterTableSchemaOperation(
+ tableIdentifier,
+ new CatalogTableImpl(
+ newSchema,
+ catalogTable.getPartitionKeys(),
+ newProperties,
+ catalogTable.getComment()));
+ // TODO: handle watermark and constraints
+ }
+
+ private static TableColumn toTableColumn(SqlTableColumn sqlTableColumn, SqlValidator sqlValidator) {
+ String name = sqlTableColumn.getName().getSimple();
+ SqlDataTypeSpec typeSpec = sqlTableColumn.getType();
+ LogicalType logicalType = FlinkTypeFactory.toLogicalType(
+ typeSpec.deriveType(sqlValidator, typeSpec.getNullable()));
+ DataType dataType = TypeConversions.fromLogicalToDataType(logicalType);
+ return TableColumn.of(name, dataType);
+ }
+
+ private static void setWatermarkAndPK(TableSchema.Builder builder, TableSchema schema) {
+ for (WatermarkSpec watermarkSpec : schema.getWatermarkSpecs()) {
+ builder.watermark(watermarkSpec);
+ }
+ schema.getPrimaryKey().ifPresent(pk -> {
+ builder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0]));
+ });
+ }
+
+ public static Map<String, String> extractProperties(SqlNodeList propList) {
+ Map<String, String> properties = new HashMap<>();
+ if (propList != null) {
+ propList.getList().forEach(p ->
+ properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
+ }
+ return properties;
+ }
+}