You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/07/17 10:45:00 UTC
[flink] branch release-1.11 updated: [FLINK-18588][hive] hive ddl
create table support 'if not exists'
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new d2911e8 [FLINK-18588][hive] hive ddl create table support 'if not exists'
d2911e8 is described below
commit d2911e8ca8ae33b5aec9adf44141ce67b32f29e7
Author: wtog <39...@users.noreply.github.com>
AuthorDate: Fri Jul 17 18:43:40 2020 +0800
[FLINK-18588][hive] hive ddl create table support 'if not exists'
This closes #12888
---
.../flink/connectors/hive/HiveDialectITCase.java | 5 ++++
.../src/main/codegen/includes/parserImpls.ftl | 9 ++++++-
.../sql/parser/hive/ddl/SqlCreateHiveTable.java | 27 ++++++++++---------
.../parser/hive/FlinkHiveSqlParserImplTest.java | 4 +++
.../flink/sql/parser/ddl/SqlCreateTable.java | 31 ++++++++++++++++++++--
5 files changed, 60 insertions(+), 16 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 4b588bf..ded35fb 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -180,6 +180,11 @@ public class HiveDialectITCase {
hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", "tbl5"));
assertEquals(";", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.COLLECTION_DELIM));
assertEquals(":", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.MAPKEY_DELIM));
+
+ int createdTimeForTableExists = hiveTable.getCreateTime();
+ tableEnv.executeSql("create table if not exists tbl5 (m map<bigint,string>)");
+ hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", "tbl5"));
+ assertEquals(createdTimeForTableExists, hiveTable.getCreateTime());
}
@Test
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index 2ffc233..b2f2b50 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -279,6 +279,7 @@ SqlRichDescribeTable SqlRichDescribeTable() :
SqlCreate SqlCreateTable(Span s, boolean isTemporary) :
{
final SqlParserPos startPos = s.pos();
+ boolean ifNotExists = false;
SqlIdentifier tableName;
SqlNodeList primaryKeyList = SqlNodeList.EMPTY;
List<SqlNodeList> uniqueKeysList = new ArrayList<SqlNodeList>();
@@ -298,6 +299,11 @@ SqlCreate SqlCreateTable(Span s, boolean isTemporary) :
[ <EXTERNAL> { isExternal = true; } ]
<TABLE> { propertyList = new SqlNodeList(getPos()); }
+ [
+ LOOKAHEAD(3)
+ <IF> <NOT> <EXISTS> { ifNotExists = true; }
+ ]
+
tableName = CompoundIdentifier()
[
<LPAREN> { pos = getPos(); }
@@ -365,7 +371,8 @@ SqlCreate SqlCreateTable(Span s, boolean isTemporary) :
isExternal,
rowFormat,
storedAs,
- location);
+ location,
+ ifNotExists);
}
}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
index ee03b2e..dfc0737 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
@@ -62,22 +62,23 @@ public class SqlCreateHiveTable extends SqlCreateTable {
public SqlCreateHiveTable(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList columnList,
HiveTableCreationContext creationContext, SqlNodeList propertyList,
SqlNodeList partColList, @Nullable SqlCharStringLiteral comment, boolean isTemporary, boolean isExternal,
- HiveTableRowFormat rowFormat, HiveTableStoredAs storedAs, SqlCharStringLiteral location) throws ParseException {
+ HiveTableRowFormat rowFormat, HiveTableStoredAs storedAs, SqlCharStringLiteral location, boolean ifNotExists) throws ParseException {
super(
- pos,
- tableName,
- columnList,
- creationContext.constraints,
- HiveDDLUtils.checkReservedTableProperties(propertyList),
- extractPartColIdentifiers(partColList),
- null,
- HiveDDLUtils.unescapeStringLiteral(comment),
- null,
- isTemporary
- );
- HiveDDLUtils.unescapeProperties(propertyList);
+ pos,
+ tableName,
+ columnList,
+ creationContext.constraints,
+ HiveDDLUtils.checkReservedTableProperties(propertyList),
+ extractPartColIdentifiers(partColList),
+ null,
+ HiveDDLUtils.unescapeStringLiteral(comment),
+ null,
+ isTemporary,
+ ifNotExists
+ );
+ HiveDDLUtils.unescapeProperties(propertyList);
this.origColList = HiveDDLUtils.deepCopyColList(columnList);
this.origPartColList = partColList != null ?
HiveDDLUtils.deepCopyColList(partColList) :
diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index fd11047..c7cf54a 100644
--- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -173,6 +173,10 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest {
" `P` TIMESTAMP\n" +
")");
sql("create table tbl (v varchar)").fails("VARCHAR precision is mandatory");
+
+ sql("create table if not exists tbl (x int)").ok("CREATE TABLE IF NOT EXISTS `TBL` (\n"
+ + " `X` INTEGER\n"
+ + ")");
// TODO: support CLUSTERED BY, SKEWED BY, STORED BY, col constraints
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 2e84982..a81c275 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -85,8 +85,35 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
@Nullable SqlTableLike tableLike,
- boolean isTemporary) {
- super(OPERATOR, pos, false, false);
+ boolean isTemporary
+ ) {
+ this(
+ pos,
+ tableName,
+ columnList,
+ tableConstraints,
+ propertyList,
+ partitionKeyList,
+ watermark,
+ comment,
+ tableLike,
+ isTemporary,
+ false);
+ }
+
+ public SqlCreateTable(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList columnList,
+ List<SqlTableConstraint> tableConstraints,
+ SqlNodeList propertyList,
+ SqlNodeList partitionKeyList,
+ @Nullable SqlWatermark watermark,
+ @Nullable SqlCharStringLiteral comment,
+ @Nullable SqlTableLike tableLike,
+ boolean isTemporary,
+ boolean ifNotExists) {
+ super(OPERATOR, pos, false, ifNotExists);
this.tableName = requireNonNull(tableName, "tableName should not be null");
this.columnList = requireNonNull(columnList, "columnList should not be null");
this.tableConstraints = requireNonNull(tableConstraints, "table constraints should not be null");