You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/09/22 08:10:06 UTC

[flink] branch release-1.11 updated: [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path

This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 383991a  [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path
383991a is described below

commit 383991adc7ca68a0e0e4b595e06e8c5edd853eb1
Author: zhushang <zh...@qutoutiao.net>
AuthorDate: Sun Sep 20 22:46:14 2020 +0800

    [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path
    
    This closes #13431
---
 .../operations/SqlCreateTableConverter.java        |  3 +-
 .../operations/SqlToOperationConverterTest.java    | 33 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 2530897..cf8f62b 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -140,8 +140,7 @@ class SqlCreateTableConverter {
 	}
 
 	private CatalogTable lookupLikeSourceTable(SqlTableLike sqlTableLike) {
-		UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlTableLike.getSourceTable()
-			.toString());
+		UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlTableLike.getSourceTable().names);
 		ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
 		CatalogManager.TableLookupResult lookupResult = catalogManager.getTable(identifier)
 			.orElseThrow(() -> new ValidationException(String.format(
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 1c582a3..2d5faad 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -529,6 +529,39 @@ public class SqlToOperationConverterTest {
 	}
 
 	@Test
+	public void testCreateTableLikeWithFullPath(){
+		Map<String, String> sourceProperties = new HashMap<>();
+		sourceProperties.put("connector.type", "kafka");
+		sourceProperties.put("format.type", "json");
+		CatalogTableImpl catalogTable = new CatalogTableImpl(
+			TableSchema.builder()
+				.field("f0", DataTypes.INT().notNull())
+				.field("f1", DataTypes.TIMESTAMP(3))
+				.build(),
+			sourceProperties,
+			null
+		);
+		catalogManager.createTable(catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
+		final String sql = "create table mytable like `builtin`.`default`.sourceTable";
+		Operation operation = parseAndConvert(sql);
+
+		assertThat(
+			operation,
+			isCreateTableOperation(
+			withSchema(
+				TableSchema.builder()
+					.field("f0", DataTypes.INT().notNull())
+					.field("f1", DataTypes.TIMESTAMP(3))
+					.build()
+			),
+			withOptions(
+				entry("connector.type", "kafka"),
+				entry("format.type", "json")
+			)
+		));
+	}
+
+	@Test
 	public void testMergingCreateTableLike() {
 		Map<String, String> sourceProperties = new HashMap<>();
 		sourceProperties.put("format.type", "json");