You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nicolas PHUNG (JIRA)" <ji...@apache.org> on 2015/06/01 21:33:18 UTC

[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

    [ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14567857#comment-14567857 ] 

Nicolas PHUNG commented on SPARK-7122:
--------------------------------------

I have a similar issue regarding the performance between the Kafka Spark Streaming integration :

{code}
val messages = KafkaUtils.createStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK)
{code}
vs
{code}
val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
{code}

KafkaUtils.createStream has a slower throughput than KafkaUtils.createDirectStream when I'm reprocessing all my events in my Kafka topic (about 300k message for about 3-4kb per message). After a while, it keeps up with the almost real time event, KafkaUtils.createDirectStream get slower and slower and get almost 30 minutes processing behind. Whereas KafkaUtils.createStream manage to be almost "real time" (it means recent event get stored almost instantly without delay). For both, I have a simple context without windows defined like this : 

{code}
val ssc = new StreamingContext(sparkConf, Seconds(2))
{code}

I don't understand why KafkaUtils.createDirectStream got so far behind on doing the same processing. 

> KafkaUtils.createDirectStream - unreasonable processing time in absence of load
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-7122
>                 URL: https://issues.apache.org/jira/browse/SPARK-7122
>             Project: Spark
>          Issue Type: Question
>          Components: Streaming
>    Affects Versions: 1.3.1
>         Environment: Spark Streaming 1.3.1, standalone mode running on just 1 box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>            Reporter: Platon Potapov
>            Priority: Minor
>         Attachments: 10.second.window.fast.job.txt, 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.window(Seconds(40), Seconds(5))
>     abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.map(x => (1, x))
>     abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth of job duration?
> note: the result is the same regardless of the number of kafka topic partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org