You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Tzu-Li (Gordon) Tai" <tz...@gmail.com> on 2016/01/19 11:27:56 UTC

What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Hi devs,

I need a little help on clarification of what the arguments "topic" and
"offset" is used for in KeyedDeserializationSchema.deserialize(). The main
issue is that I'm currently in progress of implementing Flink Kinesis
Consumer, and Kinesis offsets, unlike Kafka offsets which are incremental
starting from 0, are digits that can only by stored in BigIntegers and
generally doesn't increment by 1 between each data record.

Just need to make sure that I won't be messing things up with these two
values. A point to any part of the codebase where I can understand how Flink
uses "topic" and "offset" in the deserialization schema would be perfect.

Many thanks in advance!

Cheers,
Gordon



--
View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911.html
Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Posted by "Tzu-Li (Gordon) Tai" <tz...@gmail.com>.
Hi Robert,

No problem, thanks for the notice!



--
View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9980.html
Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Posted by Robert Metzger <rm...@apache.org>.
Hi Gordon,

I'll move the KeyedDeserializationSchema to the Kafka module.

On Wed, Jan 20, 2016 at 8:24 AM, Tzu-Li (Gordon) Tai <tz...@gmail.com>
wrote:

> Hi Stephan,
>
> A comment on this. For KeyedDeserializationSchema, I don't think it is
> necessary.
> As previously explained, the interfaces for the KeyedDeserializationSchema
> of Kafka / Kinesis can be quite different, and may also be specific for
> future external systems that we might implement connectors to. Wrapper
> classes for a common KeyedDeserializationSchema doesn't seem to make sense,
> since in the end we will still need to expose system-specific interfaces
> for
> the user.
>
> It may be reasonable to keep the most simple DeSerializationSchema
> interfaces and wrappers in flink-streaming-java. By a simple
> KeyDeserializationSchema, I mean deserialize() methods that only take key
> as
> byte[] and message as byte[]. If new connectors happen to require more
> specific interfaces, then they create them in their own module
> (flink-connector-*).
>
> Cheers,
> Gordon
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9944.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Posted by "Tzu-Li (Gordon) Tai" <tz...@gmail.com>.
Hi Stephan,

A comment on this. For KeyedDeserializationSchema, I don't think it is
necessary.
As previously explained, the interfaces for the KeyedDeserializationSchema
of Kafka / Kinesis can be quite different, and may also be specific for
future external systems that we might implement connectors to. Wrapper
classes for a common KeyedDeserializationSchema doesn't seem to make sense,
since in the end we will still need to expose system-specific interfaces for
the user.

It may be reasonable to keep the most simple DeSerializationSchema
interfaces and wrappers in flink-streaming-java. By a simple
KeyDeserializationSchema, I mean deserialize() methods that only take key as
byte[] and message as byte[]. If new connectors happen to require more
specific interfaces, then they create them in their own module
(flink-connector-*).

Cheers,
Gordon



--
View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9944.html
Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Posted by Stephan Ewen <se...@apache.org>.
While the de-serializations schema is not used by the Flink internals, I
think the initial idea  was to use it across different sources/sinks (like
Kafka, Socket, RabbitMQ, ...)

Does it make sense to have a KafkaDeSerializationSchema, and then wrap the
common serialization schemata?

On Tue, Jan 19, 2016 at 12:25 PM, Robert Metzger <rm...@apache.org>
wrote:

> I'll relocate the KeyedDeserializationSchema as part of the Kafka 0.9.0.0
> support (its a pending pull request I'll merge soon)
>
> On Tue, Jan 19, 2016 at 12:20 PM, Tzu-Li (Gordon) Tai <tz...@gmail.com>
> wrote:
>
> > Hi Robert,
> >
> > +1 for a change to where the KeyedDeserializationSchema is located. I was
> > just starting to wonder how I should name the Kinesis's
> > deserializationSchema if I were to create another one in the same
> package.
> >
> > For Kinesis, the API returns String for key, byte[] for value, String for
> > streamName (similar to Kafka topic), and a String for the offset. So I
> > would
> > definitely need to create a new deserializationSchema for this.
> >
> > For https://issues.apache.org/jira/browse/FLINK-3229, I'll create the
> > required new interfaces in Kinesis connector specific packages. I'd be
> > happy
> > to help with relocating the current KeyedDeserializationSchema related
> > interfaces and classes to Kafka specific package as a seperate issue, if
> > you
> > want to =)
> >
> > Cheers,
> > Gordon
> >
> >
> >
> > --
> > View this message in context:
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9917.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> > Nabble.com.
> >
>

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Posted by Robert Metzger <rm...@apache.org>.
I'll relocate the KeyedDeserializationSchema as part of the Kafka 0.9.0.0
support (its a pending pull request I'll merge soon)

On Tue, Jan 19, 2016 at 12:20 PM, Tzu-Li (Gordon) Tai <tz...@gmail.com>
wrote:

> Hi Robert,
>
> +1 for a change to where the KeyedDeserializationSchema is located. I was
> just starting to wonder how I should name the Kinesis's
> deserializationSchema if I were to create another one in the same package.
>
> For Kinesis, the API returns String for key, byte[] for value, String for
> streamName (similar to Kafka topic), and a String for the offset. So I
> would
> definitely need to create a new deserializationSchema for this.
>
> For https://issues.apache.org/jira/browse/FLINK-3229, I'll create the
> required new interfaces in Kinesis connector specific packages. I'd be
> happy
> to help with relocating the current KeyedDeserializationSchema related
> interfaces and classes to Kafka specific package as a seperate issue, if
> you
> want to =)
>
> Cheers,
> Gordon
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9917.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Posted by "Tzu-Li (Gordon) Tai" <tz...@gmail.com>.
Hi Robert,

+1 for a change to where the KeyedDeserializationSchema is located. I was
just starting to wonder how I should name the Kinesis's
deserializationSchema if I were to create another one in the same package.

For Kinesis, the API returns String for key, byte[] for value, String for
streamName (similar to Kafka topic), and a String for the offset. So I would
definitely need to create a new deserializationSchema for this.

For https://issues.apache.org/jira/browse/FLINK-3229, I'll create the
required new interfaces in Kinesis connector specific packages. I'd be happy
to help with relocating the current KeyedDeserializationSchema related
interfaces and classes to Kafka specific package as a seperate issue, if you
want to =)

Cheers,
Gordon



--
View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9917.html
Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Posted by Robert Metzger <rm...@apache.org>.
Hi Gordon,

thank you for starting the discussion. I think in fact the
KeyedDeserializationSchema is located in the wrong package. Its methods are
very Kafka specific, maybe I should move them there.

How would the deserializationSchema for Kinesis look like? Does the Kinesis
API return byte[] ? Is there any other information which is useful for
users?


On Tue, Jan 19, 2016 at 11:49 AM, Tzu-Li (Gordon) Tai <tz...@gmail.com>
wrote:

> Hi Max,
>
> Thanks for the quick response and clarification :)
> I got a bit confused and thought that Flink internals would be accessing
> this interface too.
>
> Cheers,
> Gordon
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9914.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Posted by "Tzu-Li (Gordon) Tai" <tz...@gmail.com>.
Hi Max,

Thanks for the quick response and clarification :)
I got a bit confused and thought that Flink internals would be accessing
this interface too.

Cheers,
Gordon



--
View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9914.html
Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Posted by Maximilian Michels <mx...@apache.org>.
Hi Gordon,

You may use "topic" and "offset" for whatever you like. Note that this
is just an interface. If it does not work for your Kinesis adapter,
you may create a new interface. For existing usage of the
KeyedDeserializationSchema, please have a look at the
FlinkKafkaConsumer.

Cheers,
Max

On Tue, Jan 19, 2016 at 11:27 AM, Tzu-Li (Gordon) Tai
<tz...@gmail.com> wrote:
> Hi devs,
>
> I need a little help on clarification of what the arguments "topic" and
> "offset" is used for in KeyedDeserializationSchema.deserialize(). The main
> issue is that I'm currently in progress of implementing Flink Kinesis
> Consumer, and Kinesis offsets, unlike Kafka offsets which are incremental
> starting from 0, are digits that can only by stored in BigIntegers and
> generally doesn't increment by 1 between each data record.
>
> Just need to make sure that I won't be messing things up with these two
> values. A point to any part of the codebase where I can understand how Flink
> uses "topic" and "offset" in the deserialization schema would be perfect.
>
> Many thanks in advance!
>
> Cheers,
> Gordon
>
>
>
> --
> View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.