You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Hito Zhu <qr...@gmail.com> on 2020/07/13 09:22:54 UTC

flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定 createTemporaryTable
为事件时间,程序包 Field null does not exist 错误,是我用法有问题?
看了下  https://issues.apache.org/jira/browse/FLINK-16160
<https://issues.apache.org/jira/browse/FLINK-16160>   这个 issue 是解决的这个问题吗?

tableEnv.connect(kafka)
    .withSchema(
      new Schema().field("searchTime",
DataTypes.TIMESTAMP()).rowtime(rowtime);
    )
    .withFormat(
        new Json().failOnMissingField(false)
    )
    .createTemporaryTable("tablename");



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

Posted by Hito Zhu <qr...@gmail.com>.
好吧,感谢回答



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

Posted by Jark Wu <im...@gmail.com>.
这可能是 connect API  的某个 bug 吧。 建议先用 DDL 。

Best,
Jark

On Tue, 14 Jul 2020 at 08:54, Hito Zhu <qr...@gmail.com> wrote:

> rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。
> Rowtime rowtime = new Rowtime()
>                 .timestampsFromField("searchTime")
>                 .watermarksPeriodicBounded(5 * 1000);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

Posted by Hito Zhu <qr...@gmail.com>.
rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。
Rowtime rowtime = new Rowtime()
                .timestampsFromField("searchTime")
                .watermarksPeriodicBounded(5 * 1000);



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

Posted by Jark Wu <im...@gmail.com>.
你的源码中 new
Schema().field("searchTime",DataTypes.TIMESTAMP()).rowtime(rowtime);
里面的 rowtime 的定义能贴下吗?

On Mon, 13 Jul 2020 at 20:53, Hito Zhu <qr...@gmail.com> wrote:

> Hi Jark 异常信息如下:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field null does not exist
>         at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$mapToResolvedField$4(TimestampExtractorUtils.java:85)
>         at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)
>         at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.mapToResolvedField(TimestampExtractorUtils.java:85)
>         at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$getAccessedFields$0(TimestampExtractorUtils.java:58)
>         at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>         at
>
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>         at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>         at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
>         at
>
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>         at
> java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
>         at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:73)
>         at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:65)
>         at
>
> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:244)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:119)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:118)
>         at scala.Option.map(Option.scala:146)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:118)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63)
>         at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>         at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>         at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>         at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>         at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
>         at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
>         at
>
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:321)
>         at
>
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:297)
>         at
>
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:288)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

Posted by Hito Zhu <qr...@gmail.com>.
Hi Jark 异常信息如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field null does not exist
	at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$mapToResolvedField$4(TimestampExtractorUtils.java:85)
	at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)
	at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.mapToResolvedField(TimestampExtractorUtils.java:85)
	at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$getAccessedFields$0(TimestampExtractorUtils.java:58)
	at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
	at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
	at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:73)
	at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:65)
	at
org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:244)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:119)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:118)
	at scala.Option.map(Option.scala:146)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:118)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63)
	at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
	at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
	at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
	at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
	at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
	at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
	at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
	at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
	at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:321)
	at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:297)
	at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:288)



--
Sent from: http://apache-flink.147419.n8.nabble.com/