You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lars Skjærven <la...@gmail.com> on 2022/09/07 12:17:34 UTC

Cassandra sink with Flink 1.15

Hello,

When upgrading from 1.14 to 1.15 we bumped into a type issue when
attempting to sink to Cassandra (scala 2.12.13). This was working nicely in
1.14. Any tip is highly appreciated.

Using a MapFunction() to generate the stream of tuples:

CassandraSink
 .addSink(
    mystream.map(new ToTupleMapper)
  )...

Exception: No support for the type of the given DataStream:
GenericType<scala.Tuple2>

Or with a lambda function:

CassandraSink
 .addSink(
    mystream.map((v: MyCaseClass => (v.key v.someLongValue))
  )...

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The
generic type parameters of 'Tuple2' are missing. In many cases lambda
methods don't provide enough information for automatic type extraction when
Java generics are involved. An easy workaround is to use an (anonymous)
class instead that implements the
'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise
the type has to be specified explicitly using type information.

Re: Cassandra sink with Flink 1.15

Posted by Lars Skjærven <la...@gmail.com>.
Thanks !

For reference, solved with mapping to Flink tuples.



On Wed, Sep 7, 2022 at 2:27 PM Chesnay Schepler <ch...@apache.org> wrote:

> Are you running into this in the IDE, or when submitting the job to a
> Flink cluster?
>
> If it is the first, then you're probably affected by the Scala-free Flink
> efforts. Either add an explicit dependency on flink-streaming-scala or
> migrate to Flink tuples.
>
> On 07/09/2022 14:17, Lars Skjærven wrote:
>
> Hello,
>
> When upgrading from 1.14 to 1.15 we bumped into a type issue when
> attempting to sink to Cassandra (scala 2.12.13). This was working nicely in
> 1.14. Any tip is highly appreciated.
>
> Using a MapFunction() to generate the stream of tuples:
>
> CassandraSink
>  .addSink(
>     mystream.map(new ToTupleMapper)
>   )...
>
> Exception: No support for the type of the given DataStream:
> GenericType<scala.Tuple2>
>
> Or with a lambda function:
>
> CassandraSink
>  .addSink(
>     mystream.map((v: MyCaseClass => (v.key v.someLongValue))
>   )...
>
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
> The generic type parameters of 'Tuple2' are missing. In many cases lambda
> methods don't provide enough information for automatic type extraction when
> Java generics are involved. An easy workaround is to use an (anonymous)
> class instead that implements the
> 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise
> the type has to be specified explicitly using type information.
>
>
>
>
>
>

Re: Cassandra sink with Flink 1.15

Posted by Chesnay Schepler <ch...@apache.org>.
Are you running into this in the IDE, or when submitting the job to a 
Flink cluster?

If it is the first, then you're probably affected by the Scala-free 
Flink efforts. Either add an explicit dependency on 
flink-streaming-scala or migrate to Flink tuples.

On 07/09/2022 14:17, Lars Skjærven wrote:
> Hello,
>
> When upgrading from 1.14 to 1.15 we bumped into a type issue when 
> attempting to sink to Cassandra (scala 2.12.13). This was working 
> nicely in 1.14. Any tip is highly appreciated.
>
> Using a MapFunction() to generate the stream of tuples:
>
> CassandraSink
>  .addSink(
> mystream.map(new ToTupleMapper)
>   )...
>
> Exception: No support for the type of the given DataStream: 
> GenericType<scala.Tuple2>
>
> Or with a lambda function:
>
> CassandraSink
>  .addSink(
>     mystream.map((v: MyCaseClass => (v.key v.someLongValue))
>   )...
>
> Caused by: 
> org.apache.flink.api.common.functions.InvalidTypesException: The 
> generic type parameters of 'Tuple2' are missing. In many cases lambda 
> methods don't provide enough information for automatic type extraction 
> when Java generics are involved. An easy workaround is to use an 
> (anonymous) class instead that implements the 
> 'org.apache.flink.api.common.functions.MapFunction' interface. 
> Otherwise the type has to be specified explicitly using type information.
>
>
>