You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mario Briggs (JIRA)" <ji...@apache.org> on 2015/12/28 20:02:49 UTC

[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

    [ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15073020#comment-15073020 ] 

Mario Briggs edited comment on SPARK-12177 at 12/28/15 7:02 PM:
----------------------------------------------------------------

  Hi Nikita,
 thanks. Here are my review comments. I couldnt find a way to add them on the PR review in github, so added them here. My comments are a little detailed, since i too developed an implementation and tested it along with my colleague Praveen :-) after initial discussion on dev list 

A - KafkaCluster class
     getPartitions()
     seek()
        callers of the above methods
        all other methods that use withConsumer()

 These should not return a ‘Either’, but rather just the expected object ( the ‘Right’). The reason for the ‘Either’ object in the earlier code was due to the fact the earlier kafka client had to deal with trying the operation on all the ‘seedBrokers’ and handle the case if some of them were down. Similarly when dealing with ‘leaders’, client had to try the operation on all leaders for a TP (TopicAndPartition).  When we use the new kaka-clients API, we don’t have to deal with trying against all the seedBrokers, leaders etc, since the new KafkaConsumer object internally handles all those details.
Notice that in the earlier code, withBrokers() tries to connect() and invoke the passed in function multiple times with the brokers.forEach() and hence the need to accumulate errors. The earlier code also did a ‘return’ immediately when successful with one of the brokers. This does not apply with the new KafkaConsumer object.

getPartitions() - https://github.com/nikit-os/spark/blob/kafka-09-consumer-api/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala#L62
  consumer.partitionsFor() java API will returns a null if the topic doesn’t exist. If you don’t handle that, you run into a NPE when the user specifies a topic that doesn’t exist or makes a typo in the topic name (also not returning an exception saying the partition doesn’t exist is not right)

our implementation is at - https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala. If it is easier for you that we issue a PR to your repo, let us know

B - KafkaRDD class
   getPreferredLocations()
  this method is missing in your code. The earlier implementation from Cody had the optimisation that if Kafka and the spark code (KafkaRDD) was running on the same cluster, then the RDD partition for a particular TopicPartition, would be local to that TopicPartition leader. Could you please add code to bring back this functionality.

 Our implementation, pulled this info inside the getPartitions- https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L52. Probably more efficient to do it inside compute() of the DStream, but that meant exposing ‘leaders’ on the KafkaRDD constructor as discussed - https://mail-archives.apache.org/mod_mbox/spark-dev/201512.mbox/%3CCAKWX9VV34Dto9irT3ZZfH78EoXO3bV3VHN8YYvTxfnyvGcRsQw%40mail.gmail.com%3E

C - KafkaRDDIterator class

 getNext()
  As mentioned in issue #1 noted here - https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/README.md#issues-noted-with-kafka-090-consumer-api
,KafkaConsumer.poll(x) is not guaranteed to return the data, when x is a small value. We are following up this issue with kafka - https://issues.apache.org/jira/browse/KAFKA-3044 . I see that you have made this a configurable value in your implementation which is good, but either ways till this behaviour is clarified or even otherwise, we need this assert -https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L171 or else we will be silently skipping data without the user knowing it (either default value or user specifying a smaller value)

D- Non use of TopicPartition class of new Consumer

   You already have figured out that this class is not Serializable and hence in the public interface you have used the older TopicAndPartition class. We have raised this issue https://issues.apache.org/jira/browse/KAFKA-3029 with Kafka and maybe be provided with one (yet to see). However using the older TopicAndPartition class in our public API, which introduces a dependency on the older kafka core rather than just kaka-clients jar, i would think is not the preferred approach. If we are not provided with a serializable TopicPartition, then we should rather use our own serializable object (or just a tuple of string, int, Long) inside of DirectKafkaInputDStream to ensure it is Serializable.



was (Author: mariobriggs):
  Hi Nikita,
 thanks. Here are my review comments. I couldnt find a way to add them on the PR review in github, so added them here. My comments are a little detailed, since i too developed an implementation and tested it along with my colleague Praveen :-) after initial discussion on dev list 

A - KafkaCluster class
     getPartitions()
     seek()
        callers of the above methods
        all other methods that use withConsumer()

 These should not return a ‘Either’, but rather just the expected object ( the ‘Right’). The reason for the ‘Either’ object in the earlier code was due to the fact the earlier kafka client had to deal with trying the operation on all the ‘seedBrokers’ and handle the case if some of them were down. Similarly when dealing with ‘leaders’, client had to try the operation on all leaders for a TP (TopicAndPartition).  When we use the new kaka-clients API, we don’t have to deal with trying against all the seedBrokers, leaders etc, since the new KafkaConsumer object internally handles all those details.
Notice that in the earlier code, withBrokers() tries to connect() and invoke the passed in function multiple times with the brokers.forEach() and hence the need to accumulate errors. The earlier code also did a ‘return’ immediately when successful with one of the brokers. This does not apply with the new KafkaConsumer object.

getPartitions() - https://github.com/nikit-os/spark/blob/kafka-09-consumer-api/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala#L62
  consumer.partitionsFor() java API will returns a null if the topic doesn’t exist. If you don’t handle that, you run into a NPE when the user specifies a topic that doesn’t exist or makes a typo in the topic name (also not returning an exception saying the partition doesn’t exist is not right)

our implementation is at - https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala. If it is easier for you that we issue a PR to your repo, let us know

B - KafkaRDD class
   getPreferredLocations()
  this method is missing in your code. The earlier implementation from Cody had the optimisation that if Kafka and the spark code (KafkaRDD) was running on the same cluster, then the RDD partition for a particular TopicPartition, would be local to that TopicPartition leader. Could you please add code to bring back this functionality.

 Our implementation, pulled this info inside the getPartitions- https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L52. Probably more efficient to do it inside compute() of the DStream, but that meant exposing ‘leaders’ on the KafkaRDD constructor as discussed - https://mail-archives.apache.org/mod_mbox/spark-dev/201512.mbox/%3CCAKWX9VV34Dto9irT3ZZfH78EoXO3bV3VHN8YYvTxfnyvGcRsQw%40mail.gmail.com%3E

C - KafkaRDDIterator class

 getNext()
  As mentioned in issue #1 noted here - https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/README.md#issues-noted-with-kafka-090-consumer-api
,KafkaConsumer.poll(x) is not guaranteed to return the data, when x is a small value. We are following up with [this issue with kafka](https://issues.apache.org/jira/browse/KAFKA-3044.I see that you have made this a configurable value in your implementation which is good, but either ways till this behaviour is clarified or even otherwise, we need [this assert](https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L171) or else we will be silently skipping data without the user knowing it (either default value or user specifying a smaller value)

D- Non use of TopicPartition class of new Consumer

   You already have figured out that this class is not Serializable and hence in the public interface you have used the older TopicAndPartition class. We have raised [this issue](https://issues.apache.org/jira/browse/KAFKA-3029) with Kafka and maybe be provided with one (yet to see). However using the older TopicAndPartition class in our public API, which introduces a dependency on the older kafka core rather than just kaka-clients jar, i would think is not the preferred approach. If we are not provided with a serializable TopicPartition, then we should rather use our own serializable object (or just a tuple of string, int, Long) inside of DirectKafkaInputDStream to ensure it is Serializable.


> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --------------------------------------------------
>
>                 Key: SPARK-12177
>                 URL: https://issues.apache.org/jira/browse/SPARK-12177
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.6.0
>            Reporter: Nikita Tarasenko
>              Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not compatible with old one. So, I added new consumer api. I made separate classes in package org.apache.spark.streaming.kafka.v09 with changed API. I didn't remove old classes for more backward compatibility. User will not need to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org