You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@streampipes.apache.org by Yudistira Hanifmuti <yu...@val.id> on 2021/02/10 05:28:37 UTC

How to use connect from Apache Kafka

Hello everyone,



I have a problem with using the data stream connector to Kafka.



I have a specific topic with multiple partitions in my Kafka broker. This
topic contains IoT sensors readings from devices and I have to use the key
in the topic to maintain the order.



The problem is when I want to add a data stream in StreamPipes (ver 0.67.0)
from my broker, it shows an error of SerializationException in schema
reading.



"org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition ... "



Then,  I tried to make a copy of this topic, except this time the key was
not used. The result was it goes smoothly and the data can be read inside
the pipeline editor.



So, how are we supposed to use this stream connector? Could we use the
record key in the Kafka broker? And if we couldn't, what is the
alternative?



Greetings,

Yudistira

Re: How to use connect from Apache Kafka

Posted by wiener <wi...@apache.org>.
Hi,

Currently I’d suggest you create an issue for the that on our Jira [1] so we can track it.

This would be some minor issue that could be resolved quiet fast by simply changing
the key deserializer to the String one in the Class mentioned in the mail before.

Also, we would really appreciate if you could provide us with a pull request to resolve that issue 
if you need it to be quickly resolved.

Greetings,
Patrick


[1] https://issues.apache.org/jira/projects/STREAMPIPES/issues/STREAMPIPES-284?filter=allopenissues <https://issues.apache.org/jira/projects/STREAMPIPES/issues/STREAMPIPES-284?filter=allopenissues> 

> Am 11.02.2021 um 06:13 schrieb Yudistira Hanifmuti <yu...@val.id>:
> 
> Hi Patrick,
> 
> Thank you for your reply and explanation. I can see the problem clearly.
> 
> That's right, we use the device ID as a partition key and it is a String instead of Long. That's why it gives the serialization error.
> 
> So, what is your plan on this issue?
> I'm building a proof of concept using StreamPipes. For now, I can still get around this problem by removing the key. I hope this issue can be resolved.
> 
> Greetings,
> Yudistira
> 
> Pada tanggal Kam, 11 Feb 2021 pukul 01.23 wiener <wiener@apache.org <ma...@apache.org>> menulis:
> Hi Yudistira,
> 
> thanks for sharing your problem with us.
> 
> Unfortunately, we currently use a Long deserializer as part of the Kafka adapter creation process [1]
> 
> see following method:
> 
> private static Consumer<Long, String> createConsumer(String broker, String topic) {
> ...
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
> …
> }
> 
> That would explain why you can’t deserialize your original topic with a partition key that can’t be deserialized into long.
> I assume, you have the device ID as a partition key? How does your key look like?
> 
> In general, I guess this is an issue that we should address.
> 
> Greetings,
> Patrick
> 
> 
> [1] https://github.com/apache/incubator-streampipes-extensions/blob/rel/0.67.0/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/KafkaProtocol.javaa <https://github.com/apache/incubator-streampipes-extensions/blob/rel/0.67.0/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/KafkaProtocol.javaa>
> 
>> Am 10.02.2021 um 06:28 schrieb Yudistira Hanifmuti <yudistira@val.id <ma...@val.id>>:
>> 
>> Hello everyone,
>>  
>> I have a problem with using the data stream connector to Kafka.
>>  
>> I have a specific topic with multiple partitions in my Kafka broker. This topic contains IoT sensors readings from devices and I have to use the key in the topic to maintain the order.
>>  
>> The problem is when I want to add a data stream in StreamPipes (ver 0.67.0) from my broker, it shows an error of SerializationException in schema reading.
>>  
>> "org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ... "
>>  
>> Then,  I tried to make a copy of this topic, except this time the key was not used. The result was it goes smoothly and the data can be read inside the pipeline editor.
>>  
>> So, how are we supposed to use this stream connector? Could we use the record key in the Kafka broker? And if we couldn't, what is the alternative?
>>  
>> Greetings,
>> Yudistira
> 


Re: How to use connect from Apache Kafka

Posted by Yudistira Hanifmuti <yu...@val.id>.
Hi Patrick,

Thank you for your reply and explanation. I can see the problem clearly.

That's right, we use the device ID as a partition key and it is a String
instead of Long. That's why it gives the serialization error.

So, what is your plan on this issue?
I'm building a proof of concept using StreamPipes. For now, I can still get
around this problem by removing the key. I hope this issue can be resolved.

Greetings,
Yudistira

Pada tanggal Kam, 11 Feb 2021 pukul 01.23 wiener <wi...@apache.org>
menulis:

> Hi Yudistira,
>
> thanks for sharing your problem with us.
>
> Unfortunately, we currently use a Long deserializer as part of the Kafka
> adapter creation process [1]
>
> see following method:
>
> private static Consumer<Long, String> createConsumer(String broker, String
> topic) {
> ...
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> LongDeserializer.class.getName());
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class.getName());
> …
> }
>
> That would explain why you can’t deserialize your original topic with a
> partition key that can’t be deserialized into long.
> I assume, you have the device ID as a partition key? How does your key
> look like?
>
> In general, I guess this is an issue that we should address.
>
> Greetings,
> Patrick
>
>
> [1]
> https://github.com/apache/incubator-streampipes-extensions/blob/rel/0.67.0/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/KafkaProtocol.javaa
>
> Am 10.02.2021 um 06:28 schrieb Yudistira Hanifmuti <yu...@val.id>:
>
> Hello everyone,
>
>
> I have a problem with using the data stream connector to Kafka.
>
>
> I have a specific topic with multiple partitions in my Kafka broker. This
> topic contains IoT sensors readings from devices and I have to use the key
> in the topic to maintain the order.
>
>
> The problem is when I want to add a data stream in StreamPipes (ver
> 0.67.0) from my broker, it shows an error of SerializationException in
> schema reading.
>
>
> "org.apache.kafka.common.errors.SerializationException: Error
> deserializing key/value for partition ... "
>
>
> Then,  I tried to make a copy of this topic, except this time the key was
> not used. The result was it goes smoothly and the data can be read inside
> the pipeline editor.
>
>
> So, how are we supposed to use this stream connector? Could we use the
> record key in the Kafka broker? And if we couldn't, what is the
> alternative?
>
>
> Greetings,
> Yudistira
>
>
>

Re: How to use connect from Apache Kafka

Posted by wiener <wi...@apache.org>.
Hi Yudistira,

thanks for sharing your problem with us.

Unfortunately, we currently use a Long deserializer as part of the Kafka adapter creation process [1]

see following method:

private static Consumer<Long, String> createConsumer(String broker, String topic) {
...
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
…
}

That would explain why you can’t deserialize your original topic with a partition key that can’t be deserialized into long.
I assume, you have the device ID as a partition key? How does your key look like?

In general, I guess this is an issue that we should address.

Greetings,
Patrick


[1] https://github.com/apache/incubator-streampipes-extensions/blob/rel/0.67.0/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/KafkaProtocol.javaa

> Am 10.02.2021 um 06:28 schrieb Yudistira Hanifmuti <yu...@val.id>:
> 
> Hello everyone,
>  
> I have a problem with using the data stream connector to Kafka.
>  
> I have a specific topic with multiple partitions in my Kafka broker. This topic contains IoT sensors readings from devices and I have to use the key in the topic to maintain the order.
>  
> The problem is when I want to add a data stream in StreamPipes (ver 0.67.0) from my broker, it shows an error of SerializationException in schema reading.
>  
> "org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ... "
>  
> Then,  I tried to make a copy of this topic, except this time the key was not used. The result was it goes smoothly and the data can be read inside the pipeline editor.
>  
> So, how are we supposed to use this stream connector? Could we use the record key in the Kafka broker? And if we couldn't, what is the alternative?
>  
> Greetings,
> Yudistira