You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (JIRA)" <ji...@apache.org> on 2019/07/23 14:18:00 UTC

[jira] [Commented] (FLINK-13341) Connector Sinks should implement consumeDataStream instead of emitDataStream

    [ https://issues.apache.org/jira/browse/FLINK-13341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16891061#comment-16891061 ] 

Till Rohrmann commented on FLINK-13341:
---------------------------------------

Thanks for reporting and fixing this issue [~kakachen]. Could you move this issue into "In Progress". That would help with tracking ongoing efforts vs. issues which need to be tackled. Thanks a lot.

> Connector Sinks should implement consumeDataStream instead of emitDataStream
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-13341
>                 URL: https://issues.apache.org/jira/browse/FLINK-13341
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Common
>            Reporter: chenqi
>            Assignee: chenqi
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.9.0, 1.10.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Some streamTableSink#consumeDataStream(DataStream) don't be implemented as returning the sink transformation DataStreamSink when using blink planner.
> which will throw the following errors:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, org.apache.flink.streaming.connectors.kafka.Kafka010TableSink doesn't implement this method.
>  at org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>  at org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>  at org.apache.flink.table.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:60)
>  at org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>  at org.apache.flink.table.planner.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:71)
>  at org.apache.flink.table.planner.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at org.apache.flink.table.planner.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>  at org.apache.flink.table.planner.PlannerBase.translate(PlannerBase.scala:155)
>  at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:446)
>  at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:355)
>  at org.apache.flink.table.examples.java.StreamSQLLookupJoinExample.main(StreamSQLLookupJoinExample.java:139){code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)