You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Priyank Shah <ps...@hortonworks.com> on 2017/05/10 00:47:14 UTC

Some questions on storm-kaka-client KafkaSpout

I was going through new kafka spout code and had a couple of questions.


1.       https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L98 The instance variable at that line and following 3 lines. Why do we need them? Because of that we have Builder constructors with different parameters for key and value deserializers. We even have https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java Not sure if we really need it. https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L555  already has a constructor that takes in Properties or a Map and if key.deserializer and value.deserialzer keys are set to fqcns then it will instantiate them and take care of them at https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L642 . And we already have setProp method on builder to set different kafka configs that will be passed to KafkaConsumer constructor. We can get rid of the SerializableDeserializer interface and a bunch of constructors and instance variables related to Key and Value Deserializable.

2.       We have a RecordTranslator interface that is used to declareOutputFields at https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L486 and then we have this special instanceof check here https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L333 Why is return type of apply a List<Object> ? Should we change it to List<KafkaTuple>? That way we can get rid of instanceof check and support multiple tuples emitted for one kafka message.


Fixes for above two might not be backward compatible but if everyone is okay with above changes then I can create a patch.

Re: Some questions on storm-kaka-client KafkaSpout

Posted by Stig Døssing <ge...@gmail.com>.
Hi Priyank,

For question 1 I can think of a couple of reasons (not sure how important
they are though): Using FQCN makes it impossible to check the generic type
of the deserializer, so you'd be able to pass the wrong type of
deserializer to the spout (e.g. a spout that otherwise expects String but
is passed an Integer deserializer). Passing class instances instead of
using FQCNs makes it possible to set up some configuration for the
serializer (e.g. call a constructor with a parameter). I would assume
that's why the KafkaConsumer API also supports passing instances instead of
FQCNs. We have SerializableDeserializer as a bit of a service to the user.
Kafka's Deserializer isn't inherently serializable, and if you configure
your spout to use a deserializer that isn't serializable, the topology
submission will fail when Nimbus tries to serialize the spout.

For question 2, I think we didn't want to require people to subclass
KafkaTuple. If they aren't emitting to multiple streams, it's unnecessary
for the tuple to subclass KafkaTuple. I'm almost certain we don't want to
change it to List<KafkaTuple>. Splitting a Kafka message into multiple
tuples can already be done by adding a splitter bolt that does that after
the KafkaSpout in the topology. I don't really see a good reason for
putting this functionality in the spout, especially since it complicates
ack/commit management a bit more if we have to keep track of multiple
tuples per Kafka message. Is there a reason you'd like the message split in
the spout, rather than in a downstream bolt?

2017-05-10 2:47 GMT+02:00 Priyank Shah <ps...@hortonworks.com>:

> I was going through new kafka spout code and had a couple of questions.
>
>
> 1.       https://github.com/apache/storm/blob/master/external/
> storm-kafka-client/src/main/java/org/apache/storm/kafka/
> spout/KafkaSpoutConfig.java#L98 The instance variable at that line and
> following 3 lines. Why do we need them? Because of that we have Builder
> constructors with different parameters for key and value deserializers. We
> even have https://github.com/apache/storm/blob/master/external/
> storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/
> SerializableDeserializer.java Not sure if we really need it.
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L555
> already has a constructor that takes in Properties or a Map and if
> key.deserializer and value.deserialzer keys are set to fqcns then it will
> instantiate them and take care of them at https://github.com/apache/
> kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/
> KafkaConsumer.java#L642 . And we already have setProp method on builder
> to set different kafka configs that will be passed to KafkaConsumer
> constructor. We can get rid of the SerializableDeserializer interface and a
> bunch of constructors and instance variables related to Key and Value
> Deserializable.
>
> 2.       We have a RecordTranslator interface that is used to
> declareOutputFields at https://github.com/apache/
> storm/blob/master/external/storm-kafka-client/src/main/
> java/org/apache/storm/kafka/spout/KafkaSpout.java#L486 and then we have
> this special instanceof check here https://github.com/apache/
> storm/blob/master/external/storm-kafka-client/src/main/
> java/org/apache/storm/kafka/spout/KafkaSpout.java#L333 Why is return type
> of apply a List<Object> ? Should we change it to List<KafkaTuple>? That way
> we can get rid of instanceof check and support multiple tuples emitted for
> one kafka message.
>
>
> Fixes for above two might not be backward compatible but if everyone is
> okay with above changes then I can create a patch.
>