You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 14:37:13 UTC
[flink] 01/05: [FLINK-13495][table-api] Use DataType in
ConnectorCatalogTable instead of TypeInformation
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 49202e2cf849dce7df3856ec9f40b1b5596a7ae9
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:02:22 2019 +0200
[FLINK-13495][table-api] Use DataType in ConnectorCatalogTable instead of TypeInformation
---
.../flink/table/catalog/ConnectorCatalogTable.java | 22 ++++++++++++++--------
1 file changed, 14 insertions(+), 8 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index a3f7bda..1ea8b6f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -20,14 +20,16 @@ package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
import java.util.Arrays;
import java.util.Collections;
@@ -121,7 +123,7 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
return tableSchema;
}
- TypeInformation[] types = Arrays.copyOf(tableSchema.getFieldTypes(), tableSchema.getFieldCount());
+ DataType[] types = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
String[] fieldNames = tableSchema.getFieldNames();
if (source instanceof DefinedRowtimeAttributes) {
updateRowtimeIndicators((DefinedRowtimeAttributes) source, fieldNames, types);
@@ -129,13 +131,13 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
if (source instanceof DefinedProctimeAttribute) {
updateProctimeIndicator((DefinedProctimeAttribute) source, fieldNames, types);
}
- return new TableSchema(fieldNames, types);
+ return TableSchema.builder().fields(fieldNames, types).build();
}
private static void updateRowtimeIndicators(
DefinedRowtimeAttributes source,
String[] fieldNames,
- TypeInformation[] types) {
+ DataType[] types) {
List<String> rowtimeAttributes = source.getRowtimeAttributeDescriptors()
.stream()
.map(RowtimeAttributeDescriptor::getAttributeName)
@@ -143,7 +145,9 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
for (int i = 0; i < fieldNames.length; i++) {
if (rowtimeAttributes.contains(fieldNames[i])) {
- types[i] = TimeIndicatorTypeInfo.ROWTIME_INDICATOR;
+ // bridged to timestamp for compatible flink-planner
+ types[i] = new AtomicDataType(new TimestampType(true, TimestampKind.ROWTIME, 3))
+ .bridgedTo(java.sql.Timestamp.class);
}
}
}
@@ -151,12 +155,14 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
private static void updateProctimeIndicator(
DefinedProctimeAttribute source,
String[] fieldNames,
- TypeInformation[] types) {
+ DataType[] types) {
String proctimeAttribute = source.getProctimeAttribute();
for (int i = 0; i < fieldNames.length; i++) {
if (fieldNames[i].equals(proctimeAttribute)) {
- types[i] = TimeIndicatorTypeInfo.PROCTIME_INDICATOR;
+ // bridged to timestamp for compatible flink-planner
+ types[i] = new AtomicDataType(new TimestampType(true, TimestampKind.PROCTIME, 3))
+ .bridgedTo(java.sql.Timestamp.class);
break;
}
}