You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "baiyg25281@hundsun.com" <ba...@hundsun.com> on 2019/05/20 09:21:38 UTC
flink Table API Date 类型 是否支持 ?
同学们好!
flink Table API 中显示支持 Date 类型,实际应用中 报错异常:Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date。
应用场景:
第一步: 从mysql 数据库中读取 Date 型 数据
new JDBCInputFormat
.JDBCInputFormatBuilder()
.setRowTypeInfo(
new RowTypeInfo(
TypeInformation[BasicTypeInfo.DATE_TYPE_INFO]
)
)
DataStreamSource dss = StreamExecutionEnvironment.createInput(JDBCInputFormat)
第二步:注册为table表
((org.apache.flink.table.api.java.BatchTableEnvironment) tEnv) .registerOrReplaceBoundedStream(outTableName,dss,outField);
第三步:查询 outTableName 表中的数据 sink
Table sample3 = BatchTableEnvironment.sqlQuery(dataSql);
JDBCAppendTableSink
.builder()
.setParameterTypes(InternalType[DataTypes.DATE])
sample3.writeToSink(JDBCAppendTableSink);
全部异常信息:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
************
ERR_ID:
SQL-00120001
CAUSE:
SQL validation failed:
Type is not supported: Date
ACTION:
Please see descriptions above. If it doesn't help, please contact customer support for this.
DETAIL:
************
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:103)
at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:1127)
at com.hundsun.futurex.template.JDBCSink.run(JDBCSink.java:64) 指向上面下划线标注的代码
at com.hundsun.futurex.template.WorkFlow.run(WorkFlow.java:35)
at com.hundsun.futurex.ProgramEntry.main(ProgramEntry.java:35)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.typeInfoToSqlTypeName(FlinkTypeFactory.scala:485)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:84)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromInternalType(FlinkTypeFactory.scala:71)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:281)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:275)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:275)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:262)
at org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:113)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:185)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3276)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3255)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3520)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:225)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:657)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
... 4 more
baiyg25281@hundsun.com
回复: RE: flink Table API Date 类型 是否支持 ?
Posted by "baiyg25281@hundsun.com" <ba...@hundsun.com>.
感谢!可以了。
baiyg25281@hundsun.com
发件人: Shi Quan
发送时间: 2019-05-20 17:28
收件人: user-zh@flink.apache.org
主题: RE: flink Table API Date 类型 是否支持 ?
可以用SqlTimeTypeInfo.DATE 。
在FlinkTypeFactory typeInfoToSqlTypeName中的类型转换关系:
// temporal types
case SqlTimeTypeInfo.DATE => DATE
case SqlTimeTypeInfo.TIME => TIME
case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows 10
________________________________
From: baiyg25281@hundsun.com <ba...@hundsun.com>
Sent: Monday, May 20, 2019 5:21:38 PM
To: user-zh
Subject: flink Table API Date 类型 是否支持 ?
同学们好!
flink Table API 中显示支持 Date 类型,实际应用中 报错异常:Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date。
应用场景:
第一步: 从mysql 数据库中读取 Date 型 数据
new JDBCInputFormat
.JDBCInputFormatBuilder()
.setRowTypeInfo(
new RowTypeInfo(
TypeInformation[BasicTypeInfo.DATE_TYPE_INFO]
)
)
DataStreamSource dss = StreamExecutionEnvironment.createInput(JDBCInputFormat)
第二步:注册为table表
((org.apache.flink.table.api.java.BatchTableEnvironment) tEnv) .registerOrReplaceBoundedStream(outTableName,dss,outField);
第三步:查询 outTableName 表中的数据 sink
Table sample3 = BatchTableEnvironment.sqlQuery(dataSql);
JDBCAppendTableSink
.builder()
.setParameterTypes(InternalType[DataTypes.DATE])
sample3.writeToSink(JDBCAppendTableSink);
全部异常信息:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
************
ERR_ID:
SQL-00120001
CAUSE:
SQL validation failed:
Type is not supported: Date
ACTION:
Please see descriptions above. If it doesn't help, please contact customer support for this.
DETAIL:
************
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:103)
at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:1127)
at com.hundsun.futurex.template.JDBCSink.run(JDBCSink.java:64) 指向上面下划线标注的代码
at com.hundsun.futurex.template.WorkFlow.run(WorkFlow.java:35)
at com.hundsun.futurex.ProgramEntry.main(ProgramEntry.java:35)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.typeInfoToSqlTypeName(FlinkTypeFactory.scala:485)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:84)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromInternalType(FlinkTypeFactory.scala:71)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:281)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:275)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:275)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:262)
at org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:113)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:185)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3276)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3255)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3520)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:225)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:657)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
... 4 more
baiyg25281@hundsun.com
RE: flink Table API Date 类型 是否支持 ?
Posted by Shi Quan <qu...@outlook.com>.
可以用SqlTimeTypeInfo.DATE 。
在FlinkTypeFactory typeInfoToSqlTypeName中的类型转换关系:
// temporal types
case SqlTimeTypeInfo.DATE => DATE
case SqlTimeTypeInfo.TIME => TIME
case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows 10
________________________________
From: baiyg25281@hundsun.com <ba...@hundsun.com>
Sent: Monday, May 20, 2019 5:21:38 PM
To: user-zh
Subject: flink Table API Date 类型 是否支持 ?
同学们好!
flink Table API 中显示支持 Date 类型,实际应用中 报错异常:Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date。
应用场景:
第一步: 从mysql 数据库中读取 Date 型 数据
new JDBCInputFormat
.JDBCInputFormatBuilder()
.setRowTypeInfo(
new RowTypeInfo(
TypeInformation[BasicTypeInfo.DATE_TYPE_INFO]
)
)
DataStreamSource dss = StreamExecutionEnvironment.createInput(JDBCInputFormat)
第二步:注册为table表
((org.apache.flink.table.api.java.BatchTableEnvironment) tEnv) .registerOrReplaceBoundedStream(outTableName,dss,outField);
第三步:查询 outTableName 表中的数据 sink
Table sample3 = BatchTableEnvironment.sqlQuery(dataSql);
JDBCAppendTableSink
.builder()
.setParameterTypes(InternalType[DataTypes.DATE])
sample3.writeToSink(JDBCAppendTableSink);
全部异常信息:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
************
ERR_ID:
SQL-00120001
CAUSE:
SQL validation failed:
Type is not supported: Date
ACTION:
Please see descriptions above. If it doesn't help, please contact customer support for this.
DETAIL:
************
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:103)
at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:1127)
at com.hundsun.futurex.template.JDBCSink.run(JDBCSink.java:64) 指向上面下划线标注的代码
at com.hundsun.futurex.template.WorkFlow.run(WorkFlow.java:35)
at com.hundsun.futurex.ProgramEntry.main(ProgramEntry.java:35)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.typeInfoToSqlTypeName(FlinkTypeFactory.scala:485)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:84)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromInternalType(FlinkTypeFactory.scala:71)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:281)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:275)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:275)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:262)
at org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:113)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:185)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3276)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3255)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3520)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:225)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:657)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
... 4 more
baiyg25281@hundsun.com