You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Platon Potapov (JIRA)" <ji...@apache.org> on 2015/04/22 12:48:00 UTC

[jira] [Comment Edited] (SPARK-7053) KafkaUtils.createStream leaks resources

    [ https://issues.apache.org/jira/browse/SPARK-7053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506809#comment-14506809 ] 

Platon Potapov edited comment on SPARK-7053 at 4/22/15 10:47 AM:
-----------------------------------------------------------------

the 'queue' test is here just to illustrate that spark core does not leak resources. other than that, you may disregard it - the two remaining tests (both with kafka as input) are of interest.

the only difference between the two tests is that the first one declares its input DStream as:
{code}
      val bytes: InputDStream[(String, Array[Byte])] =  {
        val conf = Map(
          "zookeeper.connect" -> cfg.getString("zookeeper.addresses"),
          "group.id" -> cfg.getString("spark.kafka.groupId"),
          "auto.offset.reset" -> "largest"
        )
        val topics = Map(
          "raw" -> 1
        )
        KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, conf, topics, StorageLevel.MEMORY_AND_DISK_SER)
      }
{code}

while the second one has the following in this place:
{code}
      val bytes: InputDStream[(String, Array[Byte])] = {
        val kafkaParams = Map(
          "auto.offset.reset" -> "largest",
          "bootstrap.servers" -> cfg.getString("kafka.brokerAddresses")
        )
        val topics = Set("raw")
        KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)
      }
{code}

the rest of the code (transformations on the "bytes" DStream) is the same.

the window DStream transformation is very small:
xxx.window(Seconds(8), Seconds(2))

also note that "spark.streaming.receiver.writeAheadLog.enable" was set to its default value.


was (Author: minisaw):
the 'queue' test is here just to illustrate that spark core does not leak resources. other than that, you may disregard it - the two remaining tests (both with kafka as input) are of interest.

the only difference between the two tests is that the first one declares its input DStream as:
{code}
      val bytes: InputDStream[(String, Array[Byte])] =  {
        val conf = Map(
          "zookeeper.connect" -> cfg.getString("zookeeper.addresses"),
          "group.id" -> cfg.getString("spark.kafka.groupId"),
          "auto.offset.reset" -> "largest"
        )
        val topics = Map(
          "raw" -> 1
        )
        KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, conf, topics, StorageLevel.MEMORY_AND_DISK_SER)
      }
{code}

while the second one has the following in this place:
{code}
      val bytes: InputDStream[(String, Array[Byte])] = {
        val kafkaParams = Map(
          "auto.offset.reset" -> "largest",
          "bootstrap.servers" -> cfg.getString("kafka.brokerAddresses")
        )
        val topics = Set("raw")
        KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)
      }
{code}

the rest of the code (transformations on the "bytes" DStream) are the same.
and the window DStream transformation is very small:
xxx.window(Seconds(8), Seconds(2))

also note that "spark.streaming.receiver.writeAheadLog.enable" was set to its default value.

> KafkaUtils.createStream leaks resources
> ---------------------------------------
>
>                 Key: SPARK-7053
>                 URL: https://issues.apache.org/jira/browse/SPARK-7053
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.1
>         Environment: Spark Streaming 1.3.1, standalone mode running on just 1 box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>            Reporter: Platon Potapov
>         Attachments: from.kafka.direct.stream.gif, from.kafka.receiver.gif, from.queue.gif
>
>
> I have a simple spark streaming application that reads messages from a Kafka topic, applies trivial transforms (window, map, mapValues, foreachRDD), and throws away the results. A test application, really. Streaming batch duration is 2 seconds.
> The kafka topic is steadily populated by 5000 messages each second by a data generator application. Each message is small (a few double values).
> The problem:
> * in case KafkaUtils.createStream is used to create a receiver fetching messages from Kafka, "Processing Time" (as seen at http://localhost:4040/streaming) starts with hundreds of milliseconds, but steadily increases over time, eventually resulting in scheduling delays and out of memory. Note that memory consumption (by the JVM) and garbage collection pauses grow over time.
> * in case KafkaUtils.createDirectStream is used (the rest of the setup is exactly the same), processing time starts at about 5 second (compare to hundreds of milliseconds in case of kafka receiver), but remains at this level.
> * in case the input is substituted to an in-memory queue populated by the same messages at the same rate (streamingContext.queueStream(queue)), processing time is again hundreds of milliseconds, and remains stable.
> This leads me to conclude that the setup with KafkaUtils.createStream receiver leaks resources.
> Attached are the graphs depicting each of the three scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org