You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "baiyg25281@hundsun.com" <ba...@hundsun.com> on 2019/05/20 09:21:38 UTC

flink Table API Date 类型 是否支持 ?

同学们好!

    flink Table API 中显示支持 Date 类型,实际应用中 报错异常:Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date。
 

    应用场景:

        第一步: 从mysql 数据库中读取 Date  型 数据
                new JDBCInputFormat
                        .JDBCInputFormatBuilder()
                        .setRowTypeInfo(
                                new RowTypeInfo(
                                        TypeInformation[BasicTypeInfo.DATE_TYPE_INFO]
                                )
                        )
                DataStreamSource dss = StreamExecutionEnvironment.createInput(JDBCInputFormat)
        第二步:注册为table表
                ((org.apache.flink.table.api.java.BatchTableEnvironment) tEnv) .registerOrReplaceBoundedStream(outTableName,dss,outField);
        第三步:查询 outTableName 表中的数据 sink
                Table sample3 = BatchTableEnvironment.sqlQuery(dataSql);
                JDBCAppendTableSink
                        .builder()
                        .setParameterTypes(InternalType[DataTypes.DATE])
                sample3.writeToSink(JDBCAppendTableSink);

    全部异常信息:
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
************
ERR_ID:
     SQL-00120001
CAUSE:
     SQL validation failed:
     Type is not supported: Date
ACTION:
     Please see descriptions above. If it doesn't help, please contact customer support for this.
DETAIL:

************
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:103)
at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:1127)
at com.hundsun.futurex.template.JDBCSink.run(JDBCSink.java:64)  指向上面下划线标注的代码
at com.hundsun.futurex.template.WorkFlow.run(WorkFlow.java:35)
at com.hundsun.futurex.ProgramEntry.main(ProgramEntry.java:35)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.typeInfoToSqlTypeName(FlinkTypeFactory.scala:485)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:84)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromInternalType(FlinkTypeFactory.scala:71)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:281)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:275)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:275)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:262)
at org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:113)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:185)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3276)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3255)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3520)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:225)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:657)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
... 4 more







baiyg25281@hundsun.com

回复: RE: flink Table API Date 类型 是否支持 ?

Posted by "baiyg25281@hundsun.com" <ba...@hundsun.com>.
感谢!可以了。



baiyg25281@hundsun.com
 
发件人: Shi Quan
发送时间: 2019-05-20 17:28
收件人: user-zh@flink.apache.org
主题: RE: flink Table API Date 类型 是否支持 ?
可以用SqlTimeTypeInfo.DATE 。
 
 
 
在FlinkTypeFactory typeInfoToSqlTypeName中的类型转换关系:
 
      // temporal types
 
      case SqlTimeTypeInfo.DATE => DATE
 
      case SqlTimeTypeInfo.TIME => TIME
 
      case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
 
      case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
 
      case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
 
 
 
Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows 10
 
 
 
________________________________
From: baiyg25281@hundsun.com <ba...@hundsun.com>
Sent: Monday, May 20, 2019 5:21:38 PM
To: user-zh
Subject: flink Table API Date 类型 是否支持 ?
 
 
同学们好!
 
    flink Table API 中显示支持 Date 类型,实际应用中 报错异常:Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date。
 
 
    应用场景:
 
        第一步: 从mysql 数据库中读取 Date  型 数据
                new JDBCInputFormat
                        .JDBCInputFormatBuilder()
                        .setRowTypeInfo(
                                new RowTypeInfo(
                                        TypeInformation[BasicTypeInfo.DATE_TYPE_INFO]
                                )
                        )
                DataStreamSource dss = StreamExecutionEnvironment.createInput(JDBCInputFormat)
        第二步:注册为table表
                ((org.apache.flink.table.api.java.BatchTableEnvironment) tEnv) .registerOrReplaceBoundedStream(outTableName,dss,outField);
        第三步:查询 outTableName 表中的数据 sink
                Table sample3 = BatchTableEnvironment.sqlQuery(dataSql);
                JDBCAppendTableSink
                        .builder()
                        .setParameterTypes(InternalType[DataTypes.DATE])
                sample3.writeToSink(JDBCAppendTableSink);
 
    全部异常信息:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
************
ERR_ID:
     SQL-00120001
CAUSE:
     SQL validation failed:
     Type is not supported: Date
ACTION:
     Please see descriptions above. If it doesn't help, please contact customer support for this.
DETAIL:
 
************
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:103)
at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:1127)
at com.hundsun.futurex.template.JDBCSink.run(JDBCSink.java:64)  指向上面下划线标注的代码
at com.hundsun.futurex.template.WorkFlow.run(WorkFlow.java:35)
at com.hundsun.futurex.ProgramEntry.main(ProgramEntry.java:35)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.typeInfoToSqlTypeName(FlinkTypeFactory.scala:485)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:84)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromInternalType(FlinkTypeFactory.scala:71)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:281)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:275)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:275)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:262)
at org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:113)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:185)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3276)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3255)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3520)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:225)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:657)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
... 4 more
 
 
 
 
 
 
 
baiyg25281@hundsun.com

RE: flink Table API Date 类型 是否支持 ?

Posted by Shi Quan <qu...@outlook.com>.
可以用SqlTimeTypeInfo.DATE 。



在FlinkTypeFactory typeInfoToSqlTypeName中的类型转换关系:

      // temporal types

      case SqlTimeTypeInfo.DATE => DATE

      case SqlTimeTypeInfo.TIME => TIME

      case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP

      case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH

      case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND



Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows 10



________________________________
From: baiyg25281@hundsun.com <ba...@hundsun.com>
Sent: Monday, May 20, 2019 5:21:38 PM
To: user-zh
Subject: flink Table API Date 类型 是否支持 ?


同学们好!

    flink Table API 中显示支持 Date 类型,实际应用中 报错异常:Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date。


    应用场景:

        第一步: 从mysql 数据库中读取 Date  型 数据
                new JDBCInputFormat
                        .JDBCInputFormatBuilder()
                        .setRowTypeInfo(
                                new RowTypeInfo(
                                        TypeInformation[BasicTypeInfo.DATE_TYPE_INFO]
                                )
                        )
                DataStreamSource dss = StreamExecutionEnvironment.createInput(JDBCInputFormat)
        第二步:注册为table表
                ((org.apache.flink.table.api.java.BatchTableEnvironment) tEnv) .registerOrReplaceBoundedStream(outTableName,dss,outField);
        第三步:查询 outTableName 表中的数据 sink
                Table sample3 = BatchTableEnvironment.sqlQuery(dataSql);
                JDBCAppendTableSink
                        .builder()
                        .setParameterTypes(InternalType[DataTypes.DATE])
                sample3.writeToSink(JDBCAppendTableSink);

    全部异常信息:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
************
ERR_ID:
     SQL-00120001
CAUSE:
     SQL validation failed:
     Type is not supported: Date
ACTION:
     Please see descriptions above. If it doesn't help, please contact customer support for this.
DETAIL:

************
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:103)
at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:1127)
at com.hundsun.futurex.template.JDBCSink.run(JDBCSink.java:64)  指向上面下划线标注的代码
at com.hundsun.futurex.template.WorkFlow.run(WorkFlow.java:35)
at com.hundsun.futurex.ProgramEntry.main(ProgramEntry.java:35)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.typeInfoToSqlTypeName(FlinkTypeFactory.scala:485)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:84)
at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromInternalType(FlinkTypeFactory.scala:71)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:281)
at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:275)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:275)
at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:262)
at org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:113)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:185)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3276)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3255)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3520)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:225)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:657)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
... 4 more







baiyg25281@hundsun.com