You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Bill G (Jira)" <ji...@apache.org> on 2022/10/24 09:10:00 UTC

[jira] [Created] (FLINK-29738) Allow UDT codec registration for CassandraSinkBuilder

Bill G created FLINK-29738:
------------------------------

             Summary: Allow UDT codec registration for CassandraSinkBuilder
                 Key: FLINK-29738
                 URL: https://issues.apache.org/jira/browse/FLINK-29738
             Project: Flink
          Issue Type: New Feature
          Components: Connectors / Cassandra
    Affects Versions: 1.16.0
            Reporter: Bill G


When streaming POJO types, the codec is registered automatically. However, when streaming a tuples containing a UDT, the cassandra driver can't serialize the type.

Motivating Example: If we have a table containing a collection of UDTs, then the only way to append is through a tuple stream.
{code:java}
create type link (
  title text,
  url text
);

create table users (
  id int primary key,
  links set<frozen<link>>
);
{code}
If we were to use a POJO stream, the field containing the collection would be overwritten with a new collection on each upsert. If we set the query in a tuple stream:
{code:java}
DataStream<Tuple2<Link, Integer>> linkStream = ...
CassandraSink.addSink(linkStream)
  .setQuery("update users set link = link + ? where id = ?")
  ...
  .build();
{code}
We will get a {{{}CodecNotFoundException{}}}. Using the datastax java driver outside of the Flink framework, it is easy to register a codec:
{code:java}
Session session = cluster.connect();
new MappingManager(session).udtCodec(Link.class);
{code}
However, this requires access to session, which {{ClusterBuilder}} does not expose in any way.

Potential solutions: expose {{Session}} or {{MapperManager}} in some way to the {{ClusterBuilder}} class or create some method such as {{registerUDT}} on {{{}CassandraSinkBuilder{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)