You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/09/22 08:10:06 UTC
[flink] branch release-1.11 updated:
[FLINK-19281][table-planner-blink] LIKE cannot recognize full table path
This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 383991a [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path
383991a is described below
commit 383991adc7ca68a0e0e4b595e06e8c5edd853eb1
Author: zhushang <zh...@qutoutiao.net>
AuthorDate: Sun Sep 20 22:46:14 2020 +0800
[FLINK-19281][table-planner-blink] LIKE cannot recognize full table path
This closes #13431
---
.../operations/SqlCreateTableConverter.java | 3 +-
.../operations/SqlToOperationConverterTest.java | 33 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 2530897..cf8f62b 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -140,8 +140,7 @@ class SqlCreateTableConverter {
}
private CatalogTable lookupLikeSourceTable(SqlTableLike sqlTableLike) {
- UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlTableLike.getSourceTable()
- .toString());
+ UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlTableLike.getSourceTable().names);
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
CatalogManager.TableLookupResult lookupResult = catalogManager.getTable(identifier)
.orElseThrow(() -> new ValidationException(String.format(
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 1c582a3..2d5faad 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -529,6 +529,39 @@ public class SqlToOperationConverterTest {
}
@Test
+ public void testCreateTableLikeWithFullPath(){
+ Map<String, String> sourceProperties = new HashMap<>();
+ sourceProperties.put("connector.type", "kafka");
+ sourceProperties.put("format.type", "json");
+ CatalogTableImpl catalogTable = new CatalogTableImpl(
+ TableSchema.builder()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.TIMESTAMP(3))
+ .build(),
+ sourceProperties,
+ null
+ );
+ catalogManager.createTable(catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
+ final String sql = "create table mytable like `builtin`.`default`.sourceTable";
+ Operation operation = parseAndConvert(sql);
+
+ assertThat(
+ operation,
+ isCreateTableOperation(
+ withSchema(
+ TableSchema.builder()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.TIMESTAMP(3))
+ .build()
+ ),
+ withOptions(
+ entry("connector.type", "kafka"),
+ entry("format.type", "json")
+ )
+ ));
+ }
+
+ @Test
public void testMergingCreateTableLike() {
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("format.type", "json");