You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yuemeng (JIRA)" <ji...@apache.org> on 2018/05/10 09:47:00 UTC
[jira] [Updated] (FLINK-9329) hasRowtimeAttribute will throw NPE if
user use setProctimeAttribute for table source
[ https://issues.apache.org/jira/browse/FLINK-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yuemeng updated FLINK-9329:
---------------------------
Description:
{code:java}
KafkaTableSource source = Kafka010JsonTableSource.builder()
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG())
.field("temp", Types.DOUBLE())
.field("ptime", Types.SQL_TIMESTAMP()).build())
.withProctimeAttribute("ptime")
.build(); tableEnv.registerTableSource("flights", source ); {code}
{{ }}
{code:java}
Kafka010JsonTableSource implement the DefinedRowtimeAttributes when TableSourceUtil.hasRowtimeAttribute(soource) will call
/** Returns a list with all rowtime attribute names of the [[TableSource]]. */
private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = {
tableSource match {
case r: DefinedRowtimeAttributes =>
r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
case _ =>
Array()
}
}
r.getRowtimeAttributeDescriptors will throw NPE because of we use ProctimeAttribute here
{code}
was:
{{{code}}}
{{KafkaTableSource source = Kafka010JsonTableSource.builder() // ... .withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) .field("temp", Types.DOUBLE()) // field "ptime" is of type SQL_TIMESTAMP .field("ptime", Types.SQL_TIMESTAMP()).build()) // declare "ptime" as processing time attribute .withProctimeAttribute("ptime") .build();}}
tableEnv.registerTableSource("flights", kafkaTableSource);
{{{code}}}
{{ }}
> hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source
> ------------------------------------------------------------------------------------
>
> Key: FLINK-9329
> URL: https://issues.apache.org/jira/browse/FLINK-9329
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: yuemeng
> Assignee: yuemeng
> Priority: Critical
>
> {code:java}
> KafkaTableSource source = Kafka010JsonTableSource.builder()
> .withSchema(TableSchema.builder()
> .field("sensorId", Types.LONG())
> .field("temp", Types.DOUBLE())
> .field("ptime", Types.SQL_TIMESTAMP()).build())
> .withProctimeAttribute("ptime")
> .build(); tableEnv.registerTableSource("flights", source ); {code}
> {{ }}
> {code:java}
> Kafka010JsonTableSource implement the DefinedRowtimeAttributes when TableSourceUtil.hasRowtimeAttribute(soource) will call
> /** Returns a list with all rowtime attribute names of the [[TableSource]]. */
> private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = {
> tableSource match {
> case r: DefinedRowtimeAttributes =>
> r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
> case _ =>
> Array()
> }
> }
> r.getRowtimeAttributeDescriptors will throw NPE because of we use ProctimeAttribute here
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)