You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Anil <an...@gmail.com> on 2016/10/24 06:44:53 UTC

Kafka Streamer

Hi,

I am playing with kafka streamer for my use case and noticed that message
has to value of the ignite cache.

getStreamer().addData(msg.key(), msg.message());

(
https://github.com/apache/ignite/blob/master/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
)

i tried with stream receiver to covert incoming kafka message to number of
cache entries and did not help. Seems like stream receiver is not
pre-process of cache entry. Correct ?

To allow client to add its own way of processing, Kafka streamer must
provide a way to transform kafka message into cache entries. what do you
say ?

code would be like below (just giving a try with sudo code)

kafkaStreamer.registerTransformer(<some transformer>)

if (null !=transformer){
    getStreamer().addData(transformer.transform(msg.message()));
}else {
getStreamer().addData(msg.key(), msg.message());
}

Thanks for your help.

Re: Kafka Streamer

Posted by Anil <an...@gmail.com>.
Got you. thanks.

On 27 October 2016 at 09:30, vkulichenko <va...@gmail.com>
wrote:

> I changed the definition of the class (generic types, in particular).
> Compilation will be broken for those who already use KafkaStreamer.
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Kafka-Streamer-tp8432p8532.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: Kafka Streamer

Posted by vkulichenko <va...@gmail.com>.
I changed the definition of the class (generic types, in particular).
Compilation will be broken for those who already use KafkaStreamer.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Kafka-Streamer-tp8432p8532.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Kafka Streamer

Posted by Anil <an...@gmail.com>.
Looks perfect. Could you please elaborate "this change breaks API
compatibility" ? Thanks

On 27 October 2016 at 01:42, vkulichenko <va...@gmail.com>
wrote:

> Hi,
>
> OK, I see now what you mean and it looks like this is not supported right
> now. I created a ticket [1], however the way I propose it will break API
> compatibility, so can be done only in 2.0. If you have any other
> suggestions, feel free to add comments.
>
> For now you can create your own consumer code using KafkaStreamer as a
> reference (it's pretty trivial, actually).
>
> [1] https://issues.apache.org/jira/browse/IGNITE-4140
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Kafka-Streamer-tp8432p8521.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: Kafka Streamer

Posted by vkulichenko <va...@gmail.com>.
Hi,

OK, I see now what you mean and it looks like this is not supported right
now. I created a ticket [1], however the way I propose it will break API
compatibility, so can be done only in 2.0. If you have any other
suggestions, feel free to add comments.

For now you can create your own consumer code using KafkaStreamer as a
reference (it's pretty trivial, actually).

[1] https://issues.apache.org/jira/browse/IGNITE-4140

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Kafka-Streamer-tp8432p8521.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Kafka Streamer

Posted by Anil <an...@gmail.com>.
HI Val,

in my case, kafka message key and value (actual message) are strings. I
used decoders for key and value as StringDecoder. but value is not the
value of cache and key is to maintain the order of the messages in kafka,
it is not actual key of the cache. Message is json object which is
transformed into number of cache entries.

I have created custom kafka data streamer with custom multiple tuple
extractor implementation and it looks good and working.

Thanks.

On 25 October 2016 at 23:56, vkulichenko <va...@gmail.com>
wrote:

> Anil,
>
> Decoders convert binary message from Kafka to a key-value pair. Streamer
> then redirects this pair to cache. Why doesn't this work for you?
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Kafka-Streamer-tp8432p8481.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: Kafka Streamer

Posted by vkulichenko <va...@gmail.com>.
Anil,

Decoders convert binary message from Kafka to a key-value pair. Streamer
then redirects this pair to cache. Why doesn't this work for you?

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Kafka-Streamer-tp8432p8481.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Kafka Streamer

Posted by Anil <an...@gmail.com>.
Should we use Tuple Extractor ? i still need to look at code and give a try.

On 25 October 2016 at 09:18, Anil <an...@gmail.com> wrote:

> No Val. A message cannot be converted into number of cache entries using
> value decoder. am i wrong ?
>
> Thanks.
>
> On 25 October 2016 at 02:42, vkulichenko <va...@gmail.com>
> wrote:
>
>> Hi,
>>
>> There are keyDecoder and valueDecoder that you can specify when creating
>> the
>> KafkaStreamer. Is that what you're looking for?
>>
>> -Val
>>
>>
>>
>> --
>> View this message in context: http://apache-ignite-users.705
>> 18.x6.nabble.com/Kafka-Streamer-tp8432p8447.html
>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>
>
>

Re: Kafka Streamer

Posted by Anil <an...@gmail.com>.
No Val. A message cannot be converted into number of cache entries using
value decoder. am i wrong ?

Thanks.

On 25 October 2016 at 02:42, vkulichenko <va...@gmail.com>
wrote:

> Hi,
>
> There are keyDecoder and valueDecoder that you can specify when creating
> the
> KafkaStreamer. Is that what you're looking for?
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Kafka-Streamer-tp8432p8447.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: Kafka Streamer

Posted by vkulichenko <va...@gmail.com>.
Hi,

There are keyDecoder and valueDecoder that you can specify when creating the
KafkaStreamer. Is that what you're looking for?

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Kafka-Streamer-tp8432p8447.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.