You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 14:37:13 UTC

[flink] 01/05: [FLINK-13495][table-api] Use DataType in ConnectorCatalogTable instead of TypeInformation

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49202e2cf849dce7df3856ec9f40b1b5596a7ae9
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:02:22 2019 +0200

    [FLINK-13495][table-api] Use DataType in ConnectorCatalogTable instead of TypeInformation
---
 .../flink/table/catalog/ConnectorCatalogTable.java | 22 ++++++++++++++--------
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index a3f7bda..1ea8b6f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -20,14 +20,16 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.DefinedProctimeAttribute;
 import org.apache.flink.table.sources.DefinedRowtimeAttributes;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -121,7 +123,7 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
 			return tableSchema;
 		}
 
-		TypeInformation[] types = Arrays.copyOf(tableSchema.getFieldTypes(), tableSchema.getFieldCount());
+		DataType[] types = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
 		String[] fieldNames = tableSchema.getFieldNames();
 		if (source instanceof DefinedRowtimeAttributes) {
 			updateRowtimeIndicators((DefinedRowtimeAttributes) source, fieldNames, types);
@@ -129,13 +131,13 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
 		if (source instanceof DefinedProctimeAttribute) {
 			updateProctimeIndicator((DefinedProctimeAttribute) source, fieldNames, types);
 		}
-		return new TableSchema(fieldNames, types);
+		return TableSchema.builder().fields(fieldNames, types).build();
 	}
 
 	private static void updateRowtimeIndicators(
 			DefinedRowtimeAttributes source,
 			String[] fieldNames,
-			TypeInformation[] types) {
+			DataType[] types) {
 		List<String> rowtimeAttributes = source.getRowtimeAttributeDescriptors()
 			.stream()
 			.map(RowtimeAttributeDescriptor::getAttributeName)
@@ -143,7 +145,9 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
 
 		for (int i = 0; i < fieldNames.length; i++) {
 			if (rowtimeAttributes.contains(fieldNames[i])) {
-				types[i] = TimeIndicatorTypeInfo.ROWTIME_INDICATOR;
+				// bridged to timestamp for compatible flink-planner
+				types[i] = new AtomicDataType(new TimestampType(true, TimestampKind.ROWTIME, 3))
+						.bridgedTo(java.sql.Timestamp.class);
 			}
 		}
 	}
@@ -151,12 +155,14 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
 	private static void updateProctimeIndicator(
 			DefinedProctimeAttribute source,
 			String[] fieldNames,
-			TypeInformation[] types) {
+			DataType[] types) {
 		String proctimeAttribute = source.getProctimeAttribute();
 
 		for (int i = 0; i < fieldNames.length; i++) {
 			if (fieldNames[i].equals(proctimeAttribute)) {
-				types[i] = TimeIndicatorTypeInfo.PROCTIME_INDICATOR;
+				// bridged to timestamp for compatible flink-planner
+				types[i] = new AtomicDataType(new TimestampType(true, TimestampKind.PROCTIME, 3))
+						.bridgedTo(java.sql.Timestamp.class);
 				break;
 			}
 		}