You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/02/26 01:59:25 UTC

[flink] branch master updated: [FLINK-16265][table][csv] CsvTableSourceFactoryBase should compare LogicalTypes instead of TableSchema (#11214)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a283be  [FLINK-16265][table][csv] CsvTableSourceFactoryBase should compare LogicalTypes instead of TableSchema (#11214)
7a283be is described below

commit 7a283be4093f7258dd33ba13200e32b22fb582ca
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Feb 26 09:59:03 2020 +0800

    [FLINK-16265][table][csv] CsvTableSourceFactoryBase should compare LogicalTypes instead of TableSchema (#11214)
    
    Filesystem OldCsv with timestamp type will fail in yaml.
    The root cause is we will convert the properties into CatalogTableImpl and then convert into properties again. The schema type properties will use new type systems then which is not equal to the legacy types due to conversion classes.
---
 .../table/client/gateway/local/LocalExecutorITCase.java    |  4 ++--
 .../src/test/resources/test-sql-client-catalogs.yaml       |  4 ++++
 .../src/test/resources/test-sql-client-defaults.yaml       |  4 ++++
 .../flink/table/sources/CsvTableSourceFactoryBase.java     | 14 +++++++++++++-
 4 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index c136f50..5a25472 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -380,8 +380,8 @@ public class LocalExecutorITCase extends TestLogger {
 		final TableSchema actualTableSchema = executor.getTableSchema(sessionId, "TableNumber2");
 
 		final TableSchema expectedTableSchema = new TableSchema(
-			new String[]{"IntegerField2", "StringField2"},
-			new TypeInformation[]{Types.INT, Types.STRING});
+			new String[]{"IntegerField2", "StringField2", "TimestampField3"},
+			new TypeInformation[]{Types.INT, Types.STRING, Types.SQL_TIMESTAMP});
 
 		assertEquals(expectedTableSchema, actualTableSchema);
 		executor.closeSession(sessionId);
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
index ff2ccea..9c1bfb7 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
@@ -56,6 +56,8 @@ tables:
         type: INT
       - name: StringField2
         type: VARCHAR
+      - name: TimestampField3
+        type: TIMESTAMP
     connector:
       type: filesystem
       path: "$VAR_SOURCE_PATH2"
@@ -66,6 +68,8 @@ tables:
           type: INT
         - name: StringField2
           type: VARCHAR
+        - name: TimestampField3
+          type: TIMESTAMP
       line-delimiter: "\n"
       comment-prefix: "#"
   - name: TestView2
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 09ca2eb..77a1274 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -56,6 +56,8 @@ tables:
         type: INT
       - name: StringField2
         type: VARCHAR
+      - name: TimestampField3
+        type: TIMESTAMP
     connector:
       type: filesystem
       path: "$VAR_SOURCE_PATH2"
@@ -66,6 +68,8 @@ tables:
           type: INT
         - name: StringField2
           type: VARCHAR
+        - name: TimestampField3
+          type: TIMESTAMP
       line-delimiter: "\n"
       comment-prefix: "#"
   - name: TableSourceSink
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
index 0bb5244..972fef9 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
@@ -27,12 +27,16 @@ import org.apache.flink.table.descriptors.FormatDescriptorValidator;
 import org.apache.flink.table.descriptors.OldCsvValidator;
 import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
@@ -123,7 +127,9 @@ public abstract class CsvTableSourceFactoryBase implements TableFactory {
 			TableSchema formatSchema = params.getTableSchema(FORMAT_FIELDS);
 			// the CsvTableSource needs some rework first
 			// for now the schema must be equal to the encoding
-			if (!formatSchema.equals(tableSchema)) {
+			// Ignore conversion classes in DataType
+			if (!getFieldLogicalTypes(formatSchema)
+					.equals(getFieldLogicalTypes(tableSchema))) {
 				throw new TableException(
 					"Encodings that differ from the schema are not supported yet for CsvTableSources.");
 			}
@@ -153,4 +159,10 @@ public abstract class CsvTableSourceFactoryBase implements TableFactory {
 		return csvTableSourceBuilder.build();
 	}
 
+	private static List<LogicalType> getFieldLogicalTypes(TableSchema schema) {
+		return Arrays
+				.stream(schema.getFieldDataTypes())
+				.map(DataType::getLogicalType)
+				.collect(Collectors.toList());
+	}
 }