You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Young, Matthew T" <ma...@intel.com> on 2015/10/30 23:38:13 UTC

Very slow performance on very small record counts

In a job I am writing I have encountered impossibly poor performance with Spark Streaming 1.5.1. The environment is three 16 core/32 GB RAM VMs

The job involves parsing 600 bytes or so of JSON (per record) from Kafka, extracting two values from the JSON, doing some aggregation and averages, and writing a handful of summary results back to Kafka each two-second batch.

The issue is that Spark seems to be hitting a hard minimum of 4 seconds to process each batch, even a batch with as few as 192 records in it!

When I check the Web UI for such a batch I see a proper distribution of offsets (each worker gets < 10 records) and four jobs for the batch. Three of the jobs are completed very quickly (as I would expect), but one job essentially dominates the 4 seconds. This WebUI screenshot is presented in the attachment "Spark Idle Time 2.png".

When I drill down into that job and look at the event timeline I see very odd behavior. The duration for the longest task is only ~0.3 s, and there is nice parallelism. What seems to be happening is right at the start of the job there is a few milliseconds of deserialization, followed by almost 4s(!) of doing absolutely nothing, followed by a few hundred milliseconds where the actual processing is taking place. This WebUI screenshot is presented in the attachment "Spark Idle Time.png"

What can cause this delay where Spark does nothing (or reports doing nothing) for so much time? I have included the code corresponding to the foreachRDD that is triggering this 4 second job below.

Thank you for your time,

-- Matthew


    // Transmit Kafka config to all workers so they can write back as necessary
    val broadcastBrokers = ssc.sparkContext.broadcast(brokerList)
    val broadcastZookeeper = ssc.sparkContext.broadcast(zookeeper)

    // Define the task for Spark Streaming to do
    messages.foreachRDD(sourceStream => {

      // Parse the JSON into a more usable structure
      val parsed = sourceStream.map(y => parse(y._2))

      val GeosRUSECs = parsed.map {
        x => ((x \ "geoip").extract[String](DefaultFormats, manifest[String]), ((x \ "rusec").extract[Long](DefaultFormats, manifest[Long]), 1L))
      }.cache
      if (!GeosRUSECs.isEmpty) {
        val totalRUSEC = GeosRUSECs.map(x => (x._2._1)).reduce(_ + _)
        val avgRUSEC = totalRUSEC / GeosRUSECs.count.toDouble

        if (!avgRUSEC.isNaN && !avgRUSEC.isInfinite) {
          // Acquire local Kafka connection
          val producer = kafkaWriter.getProducer(broadcastBrokers.value, broadcastZookeeper.value)
          producer.send(new KeyedMessage[String, String]("SparkRUSECout", avgRUSEC.toString))
        }

        // Wait times for each geo with total wait and number of queries
        val GeosWaitsCounts = GeosRUSECs.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

        val avgRespPerGeo = GeosWaitsCounts.map { case (geo, (totWait, numQueries)) => (geo, totWait.toDouble / numQueries) }

        avgRespPerGeo.foreach { geoInfo =>
          val producer = kafkaWriter.getProducer(broadcastBrokers.value, broadcastZookeeper.value)
          producer.send(new KeyedMessage[String, String]("SparkRUSECout", geoInfo._1 + " average RUSEC: " + geoInfo._2))
        }
      }
    })


Re: Very slow performance on very small record counts

Posted by Cody Koeninger <co...@koeninger.org>.
I had put in a patch to improve the performance of count(), take(), and
isEmpty() on KafkaRDD that should be in spark 1.5.1...

My bet is because you were doing the isEmpty after the map, it was using
the implementations on MapPartitionsRDD, not KafkaRDD.

If things are working now you may not want to mess with it, but calling
count on the rdd before doing the map should be faster, since it will just
use the offset difference rather than actually materializing and counting
each record.


On Tue, Nov 3, 2015 at 11:17 AM, Young, Matthew T <matthew.t.young@intel.com
> wrote:

> +user to potentially help others
>
>
>
> Cody,
>
>
>
> Thanks for calling out isEmpty, I didn’t realize that it was so dangerous.
> Taking that out and just reusing the count has eliminated the issue, and
> now the cluster is happily eating 400,000 record batches.
>
>
>
> For completeness’ sake: I am using the direct stream API.
>
>
>
> *From:* Cody Koeninger [mailto:cody@koeninger.org]
> *Sent:* Saturday, October 31, 2015 2:00 PM
> *To:* YOUNG, MATTHEW, T (Intel Corp) <ma...@intel.com>
> *Subject:* Re: Very slow performance on very small record counts
>
>
>
> Have you looked at jstack or the thread dump from the spark ui during that
> time to see what's happening?
>
>
>
> Are you using receiver based stream or direct stream?
>
>
>
> The only odd thing I notice about your code is that you're calling
> isEmpty, which will do a take(), which can end up scheduling multiple times
> if it initially grabs empty partitions.  You're counting the rdd anyway, so
> why not just do count() first?
>
>
>
>
>
>
>
> On Fri, Oct 30, 2015 at 5:38 PM, Young, Matthew T <
> matthew.t.young@intel.com> wrote:
>
> In a job I am writing I have encountered impossibly poor performance with
> Spark Streaming 1.5.1. The environment is three 16 core/32 GB RAM VMs
>
>
>
> The job involves parsing 600 bytes or so of JSON (per record) from Kafka,
> extracting two values from the JSON, doing some aggregation and averages,
> and writing a handful of summary results back to Kafka each two-second
> batch.
>
>
>
> The issue is that Spark seems to be hitting a hard minimum of 4 seconds to
> process each batch, even a batch with as few as 192 records in it!
>
>
>
> When I check the Web UI for such a batch I see a proper distribution of
> offsets (each worker gets < 10 records) and four jobs for the batch. Three
> of the jobs are completed very quickly (as I would expect), but one job
> essentially dominates the 4 seconds. This WebUI screenshot is presented in
> the attachment “Spark Idle Time 2.png”.
>
>
>
> When I drill down into that job and look at the event timeline I see very
> odd behavior. The duration for the longest task is only ~0.3 s, and there
> is nice parallelism. What seems to be happening is right at the start of
> the job there is a few milliseconds of deserialization, followed by almost
> 4s(!) of doing absolutely nothing, followed by a few hundred milliseconds
> where the actual processing is taking place. This WebUI screenshot is
> presented in the attachment “Spark Idle Time.png”
>
>
>
> What can cause this delay where Spark does nothing (or reports doing
> nothing) for so much time? I have included the code corresponding to the
> foreachRDD that is triggering this 4 second job below.
>
>
>
> Thank you for your time,
>
>
>
> -- Matthew
>
>
>
>
>
>     *// Transmit Kafka config to all workers so they can write back as
> necessary*
>
>     *val* broadcastBrokers = ssc.sparkContext.broadcast(brokerList)
>
>     *val* broadcastZookeeper = ssc.sparkContext.broadcast(zookeeper)
>
>
>
>     *// Define the task for Spark Streaming to do*
>
>     messages.foreachRDD(sourceStream => *{*
>
>
>
>       *// Parse the JSON into a more usable structure*
>
>       *val* parsed = sourceStream.map(y => parse(y._2))
>
>
>
>       *val* GeosRUSECs = parsed.map *{*
>
>         x => ((x \ "geoip").extract[String](DefaultFormats,
> manifest[String]), ((x \ "rusec").extract[Long](DefaultFormats,
> manifest[Long]), 1L))
>
>       *}*.cache
>
>       *if* (!GeosRUSECs.isEmpty) *{*
>
>         *val* totalRUSEC = GeosRUSECs.map(x => (x._2._1)).reduce(_ + _)
>
>         *val* avgRUSEC = totalRUSEC / GeosRUSECs.count.toDouble
>
>
>
>         *if* (!avgRUSEC.isNaN && !avgRUSEC.isInfinite) *{*
>
>           *// Acquire local Kafka connection*
>
>           *val* producer =
> kafkaWriter.getProducer(broadcastBrokers.value, broadcastZookeeper.value)
>
>           producer.send(new KeyedMessage[String, String]("SparkRUSECout",
> avgRUSEC.toString))
>
>         *}*
>
>
>
>         *// Wait times for each geo with total wait and number of queries*
>
>         *val* GeosWaitsCounts = GeosRUSECs.reduceByKey((x, y) => (x._1 +
> y._1, x._2 + y._2))
>
>
>
>         *val* avgRespPerGeo = GeosWaitsCounts.map *{* *case* (geo,
> (totWait, numQueries)) => (geo, totWait.toDouble / numQueries) *}*
>
>
>
>         avgRespPerGeo.foreach *{* geoInfo =>
>
>           *val* producer =
> kafkaWriter.getProducer(broadcastBrokers.value, broadcastZookeeper.value)
>
>           producer.send(new KeyedMessage[String, String]("SparkRUSECout",
> geoInfo._1 + " average RUSEC: " + geoInfo._2))
>
>         *}*
>
>       *}*
>
>     *}*)
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
>

RE: Very slow performance on very small record counts

Posted by "Young, Matthew T" <ma...@intel.com>.
+user to potentially help others

Cody,

Thanks for calling out isEmpty, I didn’t realize that it was so dangerous. Taking that out and just reusing the count has eliminated the issue, and now the cluster is happily eating 400,000 record batches.

For completeness’ sake: I am using the direct stream API.

From: Cody Koeninger [mailto:cody@koeninger.org]
Sent: Saturday, October 31, 2015 2:00 PM
To: YOUNG, MATTHEW, T (Intel Corp) <ma...@intel.com>
Subject: Re: Very slow performance on very small record counts

Have you looked at jstack or the thread dump from the spark ui during that time to see what's happening?

Are you using receiver based stream or direct stream?

The only odd thing I notice about your code is that you're calling isEmpty, which will do a take(), which can end up scheduling multiple times if it initially grabs empty partitions.  You're counting the rdd anyway, so why not just do count() first?



On Fri, Oct 30, 2015 at 5:38 PM, Young, Matthew T <ma...@intel.com>> wrote:
In a job I am writing I have encountered impossibly poor performance with Spark Streaming 1.5.1. The environment is three 16 core/32 GB RAM VMs

The job involves parsing 600 bytes or so of JSON (per record) from Kafka, extracting two values from the JSON, doing some aggregation and averages, and writing a handful of summary results back to Kafka each two-second batch.

The issue is that Spark seems to be hitting a hard minimum of 4 seconds to process each batch, even a batch with as few as 192 records in it!

When I check the Web UI for such a batch I see a proper distribution of offsets (each worker gets < 10 records) and four jobs for the batch. Three of the jobs are completed very quickly (as I would expect), but one job essentially dominates the 4 seconds. This WebUI screenshot is presented in the attachment “Spark Idle Time 2.png”.

When I drill down into that job and look at the event timeline I see very odd behavior. The duration for the longest task is only ~0.3 s, and there is nice parallelism. What seems to be happening is right at the start of the job there is a few milliseconds of deserialization, followed by almost 4s(!) of doing absolutely nothing, followed by a few hundred milliseconds where the actual processing is taking place. This WebUI screenshot is presented in the attachment “Spark Idle Time.png”

What can cause this delay where Spark does nothing (or reports doing nothing) for so much time? I have included the code corresponding to the foreachRDD that is triggering this 4 second job below.

Thank you for your time,

-- Matthew


    // Transmit Kafka config to all workers so they can write back as necessary
    val broadcastBrokers = ssc.sparkContext.broadcast(brokerList)
    val broadcastZookeeper = ssc.sparkContext.broadcast(zookeeper)

    // Define the task for Spark Streaming to do
    messages.foreachRDD(sourceStream => {

      // Parse the JSON into a more usable structure
      val parsed = sourceStream.map(y => parse(y._2))

      val GeosRUSECs = parsed.map {
        x => ((x \ "geoip").extract[String](DefaultFormats, manifest[String]), ((x \ "rusec").extract[Long](DefaultFormats, manifest[Long]), 1L))
      }.cache
      if (!GeosRUSECs.isEmpty) {
        val totalRUSEC = GeosRUSECs.map(x => (x._2._1)).reduce(_ + _)
        val avgRUSEC = totalRUSEC / GeosRUSECs.count.toDouble

        if (!avgRUSEC.isNaN && !avgRUSEC.isInfinite) {
          // Acquire local Kafka connection
          val producer = kafkaWriter.getProducer(broadcastBrokers.value, broadcastZookeeper.value)
          producer.send(new KeyedMessage[String, String]("SparkRUSECout", avgRUSEC.toString))
        }

        // Wait times for each geo with total wait and number of queries
        val GeosWaitsCounts = GeosRUSECs.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

        val avgRespPerGeo = GeosWaitsCounts.map { case (geo, (totWait, numQueries)) => (geo, totWait.toDouble / numQueries) }

        avgRespPerGeo.foreach { geoInfo =>
          val producer = kafkaWriter.getProducer(broadcastBrokers.value, broadcastZookeeper.value)
          producer.send(new KeyedMessage[String, String]("SparkRUSECout", geoInfo._1 + " average RUSEC: " + geoInfo._2))
        }
      }
    })



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<ma...@spark.apache.org>