You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/12/27 07:09:05 UTC

[GitHub] [spark] spektom opened a new pull request #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

spektom opened a new pull request #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022
 
 
   ## What changes were proposed in this pull request?
   
   This patch introduces messageHandler parameter that can be provided to Kafka DStream, which allows processing events received from Kafka at an early stage.
   
   Lack of messageHandler parameter to KafkaUtils.createDirectStrem(...) in the new Kafka 10 API is what prevents us from upgrading our processes to use it, and here's why:
   
   1. messageHandler() allowed parsing / filtering / projecting huge JSON files at an early stage (only a small subset of JSON fields is required for a process), without this current cluster configuration doesn't keep up with the traffic.
   2. Transforming Kafka events right after a stream is created prevents from using HasOffsetRanges interface later. This means that whole message must be propagated to the end of a pipeline, which is very ineffective.
   
   ## How was this patch tested?
   
   Unit tests are provided.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-569208787
 
 
   Can one of the admins verify this patch?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] spektom closed pull request #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
spektom closed pull request #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] spektom commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
spektom commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-570334765
 
 
   @koeninger Let me explain (probably my original description is not clear enough).
   
   Let's say, there are Kafka topics with huge JSON documents, and let's say my Spark streaming job only operates on several JSON fields. What I'd like to do is to strip down the original message at some early stage, and this is what the preliminary message handler allows me to do.  Now, I would strip the JSON content down as the first step when I get stream's RDD, but this would prevent from getting Kafka offsets earlier (because offsets retrieval must happen as the first operation on RDD).
   
   I've seen environments when Spark streaming applications simply wouldn't work because of tremendous memory consumption when operating on big JSON documents, and message handler was the remedy. Therefore, I think removal of this feature in the new API is some kind of regression to some workloads.
   
   Does this make sense?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-570369499
 
 
   That is same as my understanding actually. Maybe there's some difference between Kafka old API and new API, but I'm not familiar with old API so not sure. At least from skimming the code on KafkaRDD in kafka-08 module, it also seems to store the original data (batch of records) Kafka provides into memory, and apply messageHandler just before providing one record; so messageHandler may not help to save memory there as well.
   
   @spektom 
   Could you craft the example projects (smaller would be really appreciated) for both kafka 08 and 010, and compare memory usage?
   
   And if I understand correctly, if you really want to achieve the goal, the patch doesn't seem to be sufficient; you may want to go through KafkaDataConsumer and apply transformation in `buffer` which would store transformed data into memory and serve them, but I wouldn't think it is easy to do safely as KafkaDataConsumer is cached and served for same topicpartition. (If you have to apply different messageHandlers for the same topicpartition, it'll be messed up.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-569208787
 
 
   Can one of the admins verify this patch?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-570105468
 
 
   Looks like we dropped messageHandler while migrating to Kafka 0.10 (as messageHandler was added to the module for kafka 0.8 earlier than migrating). I'm not sure whether it's intentional or not, as there's unfortunately no comment regarding this and this is couple of years old.
   
   cc. @koeninger @tdas 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] spektom edited a comment on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
spektom edited a comment on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-570334765
 
 
   @koeninger Let me explain (probably my original description is not clear enough).
   
   Let's say, there are Kafka topics with huge JSON documents, and let's say my Spark streaming job only operates on several JSON fields. What I'd like to do is to strip down the original message at some early stage, and this is what the preliminary message handler allows me to do.  Now, I would strip the JSON content down as the first step when I get stream's RDD, but this would prevent me from Kafka offsets retrieval from RDD (because offsets retrieval must happen as the first operation on RDD).
   
   I've seen environments when Spark streaming applications simply wouldn't work because of tremendous memory consumption when operating on big JSON documents, and message handler was the remedy. Therefore, I think removal of this feature in the new API is some kind of regression to some workloads.
   
   Does this make sense?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] spektom commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
spektom commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-570344472
 
 
   > Why can't you strip the json content down in foreachPartition?
   
   Because it's too late: all three JSON documents are in Spark's memory already. This is what I'm trying to avoid.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] spektom commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
spektom commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-573351961
 
 
   Can't reproduce anymore - closing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] koeninger commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
koeninger commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-570357155
 
 
   Do you have a minimal reproducible case showing the difference in memory usage?  
   
   My expectation would be that if the very first thing you were doing with the dstream was calling foreachRDD and then rdd.foreachPartition, that the memory usage would be comparable to what you are doing here.  It's an iterator backed by a Kafka consumer that has to have the whole ConsumerRecord in memory either way.  It's just a question of whether your message conversion is happening before or after next() returns from the iterator, right?  Or am I missing something?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] koeninger commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
koeninger commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-570341366
 
 
   Why can't you strip the json content down in foreachPartition?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-569209034
 
 
   Can one of the admins verify this patch?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] koeninger commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205

Posted by GitBox <gi...@apache.org>.
koeninger commented on issue #27022: [SPARK-28415][DSTREAMS] Add messageHandler to Kafka 10 direct stream API #25205
URL: https://github.com/apache/spark/pull/27022#issuecomment-570330109
 
 
   @spektom  I'm confused as to what this gets you that foreachRdd followed by foreachPartition does not, e.g. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org