You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Malcolm Lockyer <ma...@hapara.com> on 2016/05/31 01:45:36 UTC

Spark + Kafka processing trouble

Hopefully this is not off topic for this list, but I am hoping to
reach some people who have used Kafka + Spark before.

We are new to Spark and are setting up our first production
environment and hitting a speed issue that maybe configuration related
- and we have little experience in configuring Spark environments.

So we've got a Spark streaming job that seems to take an inordinate
amount of time to process. I realize that without specifics, it is
difficult to trace - however the most basic primitives in Spark are
performing horribly. The lazy nature of Spark is making it difficult
for me to understand what is happening - any suggestions are very much
appreciated.

Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
Kafka and PostgreSQL, both local. The job is designed to:

a) grab some data from Kafka
b) correlate with existing data in PostgreSQL
c) output data to Kafka

I am isolating timings by calling System.nanoTime() before and after
something that forces calculation, for example .count() on a
DataFrame. It seems like every operation has a MASSIVE fixed overhead
and that is stacking up making each iteration on the RDD extremely
slow. Slow operations include pulling a single item from the Kafka
queue, running a simple query against PostgresSQL, and running a Spark
aggregation on a RDD with a handful of rows.

The machine is not maxing out on memory, disk or CPU. The machine
seems to be doing nothing for a high percentage of the execution time.
We have reproduced this behavior on two other machines. So we're
suspecting a configuration issue

As a concrete example, we have a DataFrame produced by running a JDBC
query by mapping over an RDD from Kafka. Calling count() (I guess
forcing execution) on this DataFrame when there is *1* item/row (Note:
SQL database is EMPTY at this point so this is not a factor) takes 4.5
seconds, calling count when there are 10,000 items takes 7 seconds.

Can anybody offer experience of something like this happening for
them? Any suggestions on how to understand what is going wrong?

I have tried tuning the number of Kafka partitions - increasing this
seems to increase the concurrency and ultimately number of things
processed per minute, but to get something half decent, I'm going to
need running with 1024 or more partitions. Is 1024 partitions a
reasonable number? What do you use in you environments?

I've tried different options for batchDuration. The calculation seems
to be batchDuration * Kafka partitions for number of items per
iteration, but this is always still extremely slow (many per iteration
vs. very few doesn't seem to really improve things). Can you suggest a
list of the Spark configuration parameters related to speed that you
think are key - preferably with the values you use for those
parameters?

I'd really really appreciate any help or suggestions as I've been
working on this speed issue for 3 days without success and my head is
starting to hurt. Thanks in advance.



Thanks,

--

Malcolm Lockyer

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark + Kafka processing trouble

Posted by Mich Talebzadeh <mi...@gmail.com>.
fair enough lets us take this for a ride.

I have a spark streaming test set up.

briefly  I use kafka to publish price figures (simulated) in a stream of
10,000 rows every 2 seconds in the shell script I call kafka using
${KAFKA_HOME}/bin/kafka-console-producer.sh

Main windows intervals


while true
do
  genrandomnumber
  cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
--broker-list rhes564:9092 --topic newtopic
  if [ $? != 0 ]
  then
        echo `date` " Abort: $0 failed. Could not send message to
rhes564:9092" > ${LOG_FILE}
        exit 1
  fi
  sleep 2
done

That is from a different box.

The Spark program picks up prices where they score > 99 as below with the
following batch interval, windows length and sliding window

val ssc = new StreamingContext(sparkConf, Seconds(2))
val windowLength = 4
val slidingInterval = 2

The code below work OK

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
//
object CEP_streaming_with_JDBC {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().
             setAppName("CEP_streaming_with_JDBC").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081",
"zookeeper.connect" -> "rhes564:2181", "group.id" ->
"CEP_streaming_with_JDBC" )
val topics = Set("newtopic")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
dstream.cache()
val lines = dstream.map(_._2)
val price = lines.map(_.split(',').view(2)).map(_.toFloat)
// window length - The duration of the window below that must be multiple
of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
val windowLength = 4
// sliding interval - The interval at which the window operation is
performed in other words data is collected within this "previous interval'
val slidingInterval = 2  // keep this the same as batch window for
continous streaming. You are aggregating data that you are collecting over
the  batch Window
val countByValueAndWindow = price.filter(_ >
99.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
countByValueAndWindow.print()
//
ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}

This is output when there are satisfying prices

-------------------------------------------
Time: 1464721016000 ms
-------------------------------------------
(99.849884,2)
(99.206856,2)
(99.37303,4)
(99.53894,1)
(99.32271,2)
(99.70958,4)
(99.89995,1)
(99.5088,3)
(99.95999,2)
(99.7547,1)
...
-------------------------------------------
Time: 1464721018000 ms
-------------------------------------------

and the streaming statistics are shown below

[image: Inline images 1]

OK I will wait for comments/corrections.


Thanks


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 31 May 2016 at 16:31, Cody Koeninger <co...@koeninger.org> wrote:

> >  500ms is I believe the minimum batch interval for Spark micro batching.
>
> It's better to test than to believe, I've run 250ms jobs.  Same
> applies to the comments around JDBC, why assume when you could
> (dis)prove?  It's not like it's a lot of effort to set up a minimal
> job that does foreach(println) from kafka.
>
> On Tue, May 31, 2016 at 9:59 AM, Mich Talebzadeh
> <mi...@gmail.com> wrote:
> > 500ms is I believe the minimum batch interval for Spark micro batching.
> >
> > However, a JDBC call is a use of Unix file descriptor and context switch
> and
> > it does have performance implication. That is irrespective of Kafka as
> it is
> > happening one is actually going through Hive JDBC.
> >
> > It is a classic data access issue. Opening and closing JDBC connection
> once
> > every 0.5 second is very problematic.
> >
> > HTH
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> >
> >
> > On 31 May 2016 at 15:34, Cody Koeninger <co...@koeninger.org> wrote:
> >>
> >> There isn't a magic spark configuration setting that would account for
> >> multiple-second-long fixed overheads, you should be looking at maybe
> >> 200ms minimum for a streaming batch.  1024 kafka topicpartitions is
> >> not reasonable for the volume you're talking about.  Unless you have
> >> really extreme workloads, 32 or 64 is a better starting guess.
> >>
> >> Rather than jumping to conclusions about sql operations being the
> >> problem, start from the very beginning.  Read a stream of messages
> >> from kafka and just do .foreach(println), at a reasonable batch size
> >> (say 500ms or a second), and see how that keeps up in your
> >> environment.  Don't use take(), don't use count(), don't use print(),
> >> since they may have non-obvious performance implications.
> >>
> >> If that works, add on further operations one step at a time and see
> >> when issues arise.
> >>
> >> On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
> >> <ma...@hapara.com> wrote:
> >> > Hopefully this is not off topic for this list, but I am hoping to
> >> > reach some people who have used Kafka + Spark before.
> >> >
> >> > We are new to Spark and are setting up our first production
> >> > environment and hitting a speed issue that maybe configuration related
> >> > - and we have little experience in configuring Spark environments.
> >> >
> >> > So we've got a Spark streaming job that seems to take an inordinate
> >> > amount of time to process. I realize that without specifics, it is
> >> > difficult to trace - however the most basic primitives in Spark are
> >> > performing horribly. The lazy nature of Spark is making it difficult
> >> > for me to understand what is happening - any suggestions are very much
> >> > appreciated.
> >> >
> >> > Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
> >> > Kafka and PostgreSQL, both local. The job is designed to:
> >> >
> >> > a) grab some data from Kafka
> >> > b) correlate with existing data in PostgreSQL
> >> > c) output data to Kafka
> >> >
> >> > I am isolating timings by calling System.nanoTime() before and after
> >> > something that forces calculation, for example .count() on a
> >> > DataFrame. It seems like every operation has a MASSIVE fixed overhead
> >> > and that is stacking up making each iteration on the RDD extremely
> >> > slow. Slow operations include pulling a single item from the Kafka
> >> > queue, running a simple query against PostgresSQL, and running a Spark
> >> > aggregation on a RDD with a handful of rows.
> >> >
> >> > The machine is not maxing out on memory, disk or CPU. The machine
> >> > seems to be doing nothing for a high percentage of the execution time.
> >> > We have reproduced this behavior on two other machines. So we're
> >> > suspecting a configuration issue
> >> >
> >> > As a concrete example, we have a DataFrame produced by running a JDBC
> >> > query by mapping over an RDD from Kafka. Calling count() (I guess
> >> > forcing execution) on this DataFrame when there is *1* item/row (Note:
> >> > SQL database is EMPTY at this point so this is not a factor) takes 4.5
> >> > seconds, calling count when there are 10,000 items takes 7 seconds.
> >> >
> >> > Can anybody offer experience of something like this happening for
> >> > them? Any suggestions on how to understand what is going wrong?
> >> >
> >> > I have tried tuning the number of Kafka partitions - increasing this
> >> > seems to increase the concurrency and ultimately number of things
> >> > processed per minute, but to get something half decent, I'm going to
> >> > need running with 1024 or more partitions. Is 1024 partitions a
> >> > reasonable number? What do you use in you environments?
> >> >
> >> > I've tried different options for batchDuration. The calculation seems
> >> > to be batchDuration * Kafka partitions for number of items per
> >> > iteration, but this is always still extremely slow (many per iteration
> >> > vs. very few doesn't seem to really improve things). Can you suggest a
> >> > list of the Spark configuration parameters related to speed that you
> >> > think are key - preferably with the values you use for those
> >> > parameters?
> >> >
> >> > I'd really really appreciate any help or suggestions as I've been
> >> > working on this speed issue for 3 days without success and my head is
> >> > starting to hurt. Thanks in advance.
> >> >
> >> >
> >> >
> >> > Thanks,
> >> >
> >> > --
> >> >
> >> > Malcolm Lockyer
> >> >
> >> > ---------------------------------------------------------------------
> >> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> > For additional commands, e-mail: user-help@spark.apache.org
> >> >
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> For additional commands, e-mail: user-help@spark.apache.org
> >>
> >
>

Re: Spark + Kafka processing trouble

Posted by Cody Koeninger <co...@koeninger.org>.
>  500ms is I believe the minimum batch interval for Spark micro batching.

It's better to test than to believe, I've run 250ms jobs.  Same
applies to the comments around JDBC, why assume when you could
(dis)prove?  It's not like it's a lot of effort to set up a minimal
job that does foreach(println) from kafka.

On Tue, May 31, 2016 at 9:59 AM, Mich Talebzadeh
<mi...@gmail.com> wrote:
> 500ms is I believe the minimum batch interval for Spark micro batching.
>
> However, a JDBC call is a use of Unix file descriptor and context switch and
> it does have performance implication. That is irrespective of Kafka as it is
> happening one is actually going through Hive JDBC.
>
> It is a classic data access issue. Opening and closing JDBC connection once
> every 0.5 second is very problematic.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 31 May 2016 at 15:34, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> There isn't a magic spark configuration setting that would account for
>> multiple-second-long fixed overheads, you should be looking at maybe
>> 200ms minimum for a streaming batch.  1024 kafka topicpartitions is
>> not reasonable for the volume you're talking about.  Unless you have
>> really extreme workloads, 32 or 64 is a better starting guess.
>>
>> Rather than jumping to conclusions about sql operations being the
>> problem, start from the very beginning.  Read a stream of messages
>> from kafka and just do .foreach(println), at a reasonable batch size
>> (say 500ms or a second), and see how that keeps up in your
>> environment.  Don't use take(), don't use count(), don't use print(),
>> since they may have non-obvious performance implications.
>>
>> If that works, add on further operations one step at a time and see
>> when issues arise.
>>
>> On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
>> <ma...@hapara.com> wrote:
>> > Hopefully this is not off topic for this list, but I am hoping to
>> > reach some people who have used Kafka + Spark before.
>> >
>> > We are new to Spark and are setting up our first production
>> > environment and hitting a speed issue that maybe configuration related
>> > - and we have little experience in configuring Spark environments.
>> >
>> > So we've got a Spark streaming job that seems to take an inordinate
>> > amount of time to process. I realize that without specifics, it is
>> > difficult to trace - however the most basic primitives in Spark are
>> > performing horribly. The lazy nature of Spark is making it difficult
>> > for me to understand what is happening - any suggestions are very much
>> > appreciated.
>> >
>> > Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
>> > Kafka and PostgreSQL, both local. The job is designed to:
>> >
>> > a) grab some data from Kafka
>> > b) correlate with existing data in PostgreSQL
>> > c) output data to Kafka
>> >
>> > I am isolating timings by calling System.nanoTime() before and after
>> > something that forces calculation, for example .count() on a
>> > DataFrame. It seems like every operation has a MASSIVE fixed overhead
>> > and that is stacking up making each iteration on the RDD extremely
>> > slow. Slow operations include pulling a single item from the Kafka
>> > queue, running a simple query against PostgresSQL, and running a Spark
>> > aggregation on a RDD with a handful of rows.
>> >
>> > The machine is not maxing out on memory, disk or CPU. The machine
>> > seems to be doing nothing for a high percentage of the execution time.
>> > We have reproduced this behavior on two other machines. So we're
>> > suspecting a configuration issue
>> >
>> > As a concrete example, we have a DataFrame produced by running a JDBC
>> > query by mapping over an RDD from Kafka. Calling count() (I guess
>> > forcing execution) on this DataFrame when there is *1* item/row (Note:
>> > SQL database is EMPTY at this point so this is not a factor) takes 4.5
>> > seconds, calling count when there are 10,000 items takes 7 seconds.
>> >
>> > Can anybody offer experience of something like this happening for
>> > them? Any suggestions on how to understand what is going wrong?
>> >
>> > I have tried tuning the number of Kafka partitions - increasing this
>> > seems to increase the concurrency and ultimately number of things
>> > processed per minute, but to get something half decent, I'm going to
>> > need running with 1024 or more partitions. Is 1024 partitions a
>> > reasonable number? What do you use in you environments?
>> >
>> > I've tried different options for batchDuration. The calculation seems
>> > to be batchDuration * Kafka partitions for number of items per
>> > iteration, but this is always still extremely slow (many per iteration
>> > vs. very few doesn't seem to really improve things). Can you suggest a
>> > list of the Spark configuration parameters related to speed that you
>> > think are key - preferably with the values you use for those
>> > parameters?
>> >
>> > I'd really really appreciate any help or suggestions as I've been
>> > working on this speed issue for 3 days without success and my head is
>> > starting to hurt. Thanks in advance.
>> >
>> >
>> >
>> > Thanks,
>> >
>> > --
>> >
>> > Malcolm Lockyer
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> > For additional commands, e-mail: user-help@spark.apache.org
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark + Kafka processing trouble

Posted by Mich Talebzadeh <mi...@gmail.com>.
500ms is I believe the minimum batch interval for Spark micro batching.

However, a JDBC call is a use of Unix file descriptor and context switch
and it does have performance implication. That is irrespective of Kafka as
it is happening one is actually going through Hive JDBC.

It is a classic data access issue. Opening and closing JDBC connection once
every 0.5 second is very problematic.

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 31 May 2016 at 15:34, Cody Koeninger <co...@koeninger.org> wrote:

> There isn't a magic spark configuration setting that would account for
> multiple-second-long fixed overheads, you should be looking at maybe
> 200ms minimum for a streaming batch.  1024 kafka topicpartitions is
> not reasonable for the volume you're talking about.  Unless you have
> really extreme workloads, 32 or 64 is a better starting guess.
>
> Rather than jumping to conclusions about sql operations being the
> problem, start from the very beginning.  Read a stream of messages
> from kafka and just do .foreach(println), at a reasonable batch size
> (say 500ms or a second), and see how that keeps up in your
> environment.  Don't use take(), don't use count(), don't use print(),
> since they may have non-obvious performance implications.
>
> If that works, add on further operations one step at a time and see
> when issues arise.
>
> On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
> <ma...@hapara.com> wrote:
> > Hopefully this is not off topic for this list, but I am hoping to
> > reach some people who have used Kafka + Spark before.
> >
> > We are new to Spark and are setting up our first production
> > environment and hitting a speed issue that maybe configuration related
> > - and we have little experience in configuring Spark environments.
> >
> > So we've got a Spark streaming job that seems to take an inordinate
> > amount of time to process. I realize that without specifics, it is
> > difficult to trace - however the most basic primitives in Spark are
> > performing horribly. The lazy nature of Spark is making it difficult
> > for me to understand what is happening - any suggestions are very much
> > appreciated.
> >
> > Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
> > Kafka and PostgreSQL, both local. The job is designed to:
> >
> > a) grab some data from Kafka
> > b) correlate with existing data in PostgreSQL
> > c) output data to Kafka
> >
> > I am isolating timings by calling System.nanoTime() before and after
> > something that forces calculation, for example .count() on a
> > DataFrame. It seems like every operation has a MASSIVE fixed overhead
> > and that is stacking up making each iteration on the RDD extremely
> > slow. Slow operations include pulling a single item from the Kafka
> > queue, running a simple query against PostgresSQL, and running a Spark
> > aggregation on a RDD with a handful of rows.
> >
> > The machine is not maxing out on memory, disk or CPU. The machine
> > seems to be doing nothing for a high percentage of the execution time.
> > We have reproduced this behavior on two other machines. So we're
> > suspecting a configuration issue
> >
> > As a concrete example, we have a DataFrame produced by running a JDBC
> > query by mapping over an RDD from Kafka. Calling count() (I guess
> > forcing execution) on this DataFrame when there is *1* item/row (Note:
> > SQL database is EMPTY at this point so this is not a factor) takes 4.5
> > seconds, calling count when there are 10,000 items takes 7 seconds.
> >
> > Can anybody offer experience of something like this happening for
> > them? Any suggestions on how to understand what is going wrong?
> >
> > I have tried tuning the number of Kafka partitions - increasing this
> > seems to increase the concurrency and ultimately number of things
> > processed per minute, but to get something half decent, I'm going to
> > need running with 1024 or more partitions. Is 1024 partitions a
> > reasonable number? What do you use in you environments?
> >
> > I've tried different options for batchDuration. The calculation seems
> > to be batchDuration * Kafka partitions for number of items per
> > iteration, but this is always still extremely slow (many per iteration
> > vs. very few doesn't seem to really improve things). Can you suggest a
> > list of the Spark configuration parameters related to speed that you
> > think are key - preferably with the values you use for those
> > parameters?
> >
> > I'd really really appreciate any help or suggestions as I've been
> > working on this speed issue for 3 days without success and my head is
> > starting to hurt. Thanks in advance.
> >
> >
> >
> > Thanks,
> >
> > --
> >
> > Malcolm Lockyer
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> > For additional commands, e-mail: user-help@spark.apache.org
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Spark + Kafka processing trouble

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Malcom

check this

You can call JDBC at the beginning it may work as long as you call it once

object CEP_streaming_with_JDBC {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().
             setAppName("CEP_streaming_with_JDBC").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")

  val sc = new SparkContext(sparkConf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  HiveContext.sql("use oraclehadoop")
  var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb12"
  var _username : String = "scratchpad"
  var _password : String = "oracle"

  // Get data from Oracle table
  val d = HiveContext.load("jdbc",
  Map("url" -> _ORACLEserver,
  "dbtable" -> "(SELECT amount_sold, time_id, TO_CHAR(channel_id) AS
channel_id FROM scratchpad.sales)",
  "user" -> _username,
  "password" -> _password))


*  d.registerTempTable("tmp")  HiveContext.sql("select count(1) from
tmp").show*

val ssc = new StreamingContext(sparkConf, Seconds(2))


The result

+------+
|   _c0|
+------+
|918843|
+------+

[image: Inline images 1]

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 31 May 2016 at 22:28, Malcolm Lockyer <ma...@hapara.com> wrote:

> Thanks for the suggestions. I agree that there isn't some magic
> configuration setting, or that the sql options have some flaw - I just
> intended to explain the frustration of having a non-trivial (but still
> simple) Spark streaming job running on tiny amounts of data performing
> absolutely horribly.
>
> .count() is something I was adding to try and force calculation and
> agree it might not be the best of tests.
>
> On Wed, Jun 1, 2016 at 2:34 AM, Cody Koeninger <co...@koeninger.org> wrote:
> > There isn't a magic spark configuration setting that would account for
> > multiple-second-long fixed overheads, you should be looking at maybe
> > 200ms minimum for a streaming batch.  1024 kafka topicpartitions is
> > not reasonable for the volume you're talking about.  Unless you have
> > really extreme workloads, 32 or 64 is a better starting guess.
> >
> > Rather than jumping to conclusions about sql operations being the
> > problem, start from the very beginning.  Read a stream of messages
> > from kafka and just do .foreach(println), at a reasonable batch size
> > (say 500ms or a second), and see how that keeps up in your
> > environment.  Don't use take(), don't use count(), don't use print(),
> > since they may have non-obvious performance implications.
> >
> > If that works, add on further operations one step at a time and see
> > when issues arise.
> >
> > On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
> > <ma...@hapara.com> wrote:
> >> Hopefully this is not off topic for this list, but I am hoping to
> >> reach some people who have used Kafka + Spark before.
> >>
> >> We are new to Spark and are setting up our first production
> >> environment and hitting a speed issue that maybe configuration related
> >> - and we have little experience in configuring Spark environments.
> >>
> >> So we've got a Spark streaming job that seems to take an inordinate
> >> amount of time to process. I realize that without specifics, it is
> >> difficult to trace - however the most basic primitives in Spark are
> >> performing horribly. The lazy nature of Spark is making it difficult
> >> for me to understand what is happening - any suggestions are very much
> >> appreciated.
> >>
> >> Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
> >> Kafka and PostgreSQL, both local. The job is designed to:
> >>
> >> a) grab some data from Kafka
> >> b) correlate with existing data in PostgreSQL
> >> c) output data to Kafka
> >>
> >> I am isolating timings by calling System.nanoTime() before and after
> >> something that forces calculation, for example .count() on a
> >> DataFrame. It seems like every operation has a MASSIVE fixed overhead
> >> and that is stacking up making each iteration on the RDD extremely
> >> slow. Slow operations include pulling a single item from the Kafka
> >> queue, running a simple query against PostgresSQL, and running a Spark
> >> aggregation on a RDD with a handful of rows.
> >>
> >> The machine is not maxing out on memory, disk or CPU. The machine
> >> seems to be doing nothing for a high percentage of the execution time.
> >> We have reproduced this behavior on two other machines. So we're
> >> suspecting a configuration issue
> >>
> >> As a concrete example, we have a DataFrame produced by running a JDBC
> >> query by mapping over an RDD from Kafka. Calling count() (I guess
> >> forcing execution) on this DataFrame when there is *1* item/row (Note:
> >> SQL database is EMPTY at this point so this is not a factor) takes 4.5
> >> seconds, calling count when there are 10,000 items takes 7 seconds.
> >>
> >> Can anybody offer experience of something like this happening for
> >> them? Any suggestions on how to understand what is going wrong?
> >>
> >> I have tried tuning the number of Kafka partitions - increasing this
> >> seems to increase the concurrency and ultimately number of things
> >> processed per minute, but to get something half decent, I'm going to
> >> need running with 1024 or more partitions. Is 1024 partitions a
> >> reasonable number? What do you use in you environments?
> >>
> >> I've tried different options for batchDuration. The calculation seems
> >> to be batchDuration * Kafka partitions for number of items per
> >> iteration, but this is always still extremely slow (many per iteration
> >> vs. very few doesn't seem to really improve things). Can you suggest a
> >> list of the Spark configuration parameters related to speed that you
> >> think are key - preferably with the values you use for those
> >> parameters?
> >>
> >> I'd really really appreciate any help or suggestions as I've been
> >> working on this speed issue for 3 days without success and my head is
> >> starting to hurt. Thanks in advance.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> --
> >>
> >> Malcolm Lockyer
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> For additional commands, e-mail: user-help@spark.apache.org
> >>
>
>
>
> --
>
> Malcolm Lockyer
> M: +64 21 258 6121
> Level 10, 99 Queen Street, Auckland, New Zealand
> hapara.com  ●  @hapara_team
>
> Check out this video!
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Spark + Kafka processing trouble

Posted by Malcolm Lockyer <ma...@hapara.com>.
Thanks for the suggestions. I agree that there isn't some magic
configuration setting, or that the sql options have some flaw - I just
intended to explain the frustration of having a non-trivial (but still
simple) Spark streaming job running on tiny amounts of data performing
absolutely horribly.

.count() is something I was adding to try and force calculation and
agree it might not be the best of tests.

On Wed, Jun 1, 2016 at 2:34 AM, Cody Koeninger <co...@koeninger.org> wrote:
> There isn't a magic spark configuration setting that would account for
> multiple-second-long fixed overheads, you should be looking at maybe
> 200ms minimum for a streaming batch.  1024 kafka topicpartitions is
> not reasonable for the volume you're talking about.  Unless you have
> really extreme workloads, 32 or 64 is a better starting guess.
>
> Rather than jumping to conclusions about sql operations being the
> problem, start from the very beginning.  Read a stream of messages
> from kafka and just do .foreach(println), at a reasonable batch size
> (say 500ms or a second), and see how that keeps up in your
> environment.  Don't use take(), don't use count(), don't use print(),
> since they may have non-obvious performance implications.
>
> If that works, add on further operations one step at a time and see
> when issues arise.
>
> On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
> <ma...@hapara.com> wrote:
>> Hopefully this is not off topic for this list, but I am hoping to
>> reach some people who have used Kafka + Spark before.
>>
>> We are new to Spark and are setting up our first production
>> environment and hitting a speed issue that maybe configuration related
>> - and we have little experience in configuring Spark environments.
>>
>> So we've got a Spark streaming job that seems to take an inordinate
>> amount of time to process. I realize that without specifics, it is
>> difficult to trace - however the most basic primitives in Spark are
>> performing horribly. The lazy nature of Spark is making it difficult
>> for me to understand what is happening - any suggestions are very much
>> appreciated.
>>
>> Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
>> Kafka and PostgreSQL, both local. The job is designed to:
>>
>> a) grab some data from Kafka
>> b) correlate with existing data in PostgreSQL
>> c) output data to Kafka
>>
>> I am isolating timings by calling System.nanoTime() before and after
>> something that forces calculation, for example .count() on a
>> DataFrame. It seems like every operation has a MASSIVE fixed overhead
>> and that is stacking up making each iteration on the RDD extremely
>> slow. Slow operations include pulling a single item from the Kafka
>> queue, running a simple query against PostgresSQL, and running a Spark
>> aggregation on a RDD with a handful of rows.
>>
>> The machine is not maxing out on memory, disk or CPU. The machine
>> seems to be doing nothing for a high percentage of the execution time.
>> We have reproduced this behavior on two other machines. So we're
>> suspecting a configuration issue
>>
>> As a concrete example, we have a DataFrame produced by running a JDBC
>> query by mapping over an RDD from Kafka. Calling count() (I guess
>> forcing execution) on this DataFrame when there is *1* item/row (Note:
>> SQL database is EMPTY at this point so this is not a factor) takes 4.5
>> seconds, calling count when there are 10,000 items takes 7 seconds.
>>
>> Can anybody offer experience of something like this happening for
>> them? Any suggestions on how to understand what is going wrong?
>>
>> I have tried tuning the number of Kafka partitions - increasing this
>> seems to increase the concurrency and ultimately number of things
>> processed per minute, but to get something half decent, I'm going to
>> need running with 1024 or more partitions. Is 1024 partitions a
>> reasonable number? What do you use in you environments?
>>
>> I've tried different options for batchDuration. The calculation seems
>> to be batchDuration * Kafka partitions for number of items per
>> iteration, but this is always still extremely slow (many per iteration
>> vs. very few doesn't seem to really improve things). Can you suggest a
>> list of the Spark configuration parameters related to speed that you
>> think are key - preferably with the values you use for those
>> parameters?
>>
>> I'd really really appreciate any help or suggestions as I've been
>> working on this speed issue for 3 days without success and my head is
>> starting to hurt. Thanks in advance.
>>
>>
>>
>> Thanks,
>>
>> --
>>
>> Malcolm Lockyer
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>



-- 

Malcolm Lockyer
M: +64 21 258 6121
Level 10, 99 Queen Street, Auckland, New Zealand
hapara.com  ●  @hapara_team

Check out this video!

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark + Kafka processing trouble

Posted by Cody Koeninger <co...@koeninger.org>.
There isn't a magic spark configuration setting that would account for
multiple-second-long fixed overheads, you should be looking at maybe
200ms minimum for a streaming batch.  1024 kafka topicpartitions is
not reasonable for the volume you're talking about.  Unless you have
really extreme workloads, 32 or 64 is a better starting guess.

Rather than jumping to conclusions about sql operations being the
problem, start from the very beginning.  Read a stream of messages
from kafka and just do .foreach(println), at a reasonable batch size
(say 500ms or a second), and see how that keeps up in your
environment.  Don't use take(), don't use count(), don't use print(),
since they may have non-obvious performance implications.

If that works, add on further operations one step at a time and see
when issues arise.

On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
<ma...@hapara.com> wrote:
> Hopefully this is not off topic for this list, but I am hoping to
> reach some people who have used Kafka + Spark before.
>
> We are new to Spark and are setting up our first production
> environment and hitting a speed issue that maybe configuration related
> - and we have little experience in configuring Spark environments.
>
> So we've got a Spark streaming job that seems to take an inordinate
> amount of time to process. I realize that without specifics, it is
> difficult to trace - however the most basic primitives in Spark are
> performing horribly. The lazy nature of Spark is making it difficult
> for me to understand what is happening - any suggestions are very much
> appreciated.
>
> Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
> Kafka and PostgreSQL, both local. The job is designed to:
>
> a) grab some data from Kafka
> b) correlate with existing data in PostgreSQL
> c) output data to Kafka
>
> I am isolating timings by calling System.nanoTime() before and after
> something that forces calculation, for example .count() on a
> DataFrame. It seems like every operation has a MASSIVE fixed overhead
> and that is stacking up making each iteration on the RDD extremely
> slow. Slow operations include pulling a single item from the Kafka
> queue, running a simple query against PostgresSQL, and running a Spark
> aggregation on a RDD with a handful of rows.
>
> The machine is not maxing out on memory, disk or CPU. The machine
> seems to be doing nothing for a high percentage of the execution time.
> We have reproduced this behavior on two other machines. So we're
> suspecting a configuration issue
>
> As a concrete example, we have a DataFrame produced by running a JDBC
> query by mapping over an RDD from Kafka. Calling count() (I guess
> forcing execution) on this DataFrame when there is *1* item/row (Note:
> SQL database is EMPTY at this point so this is not a factor) takes 4.5
> seconds, calling count when there are 10,000 items takes 7 seconds.
>
> Can anybody offer experience of something like this happening for
> them? Any suggestions on how to understand what is going wrong?
>
> I have tried tuning the number of Kafka partitions - increasing this
> seems to increase the concurrency and ultimately number of things
> processed per minute, but to get something half decent, I'm going to
> need running with 1024 or more partitions. Is 1024 partitions a
> reasonable number? What do you use in you environments?
>
> I've tried different options for batchDuration. The calculation seems
> to be batchDuration * Kafka partitions for number of items per
> iteration, but this is always still extremely slow (many per iteration
> vs. very few doesn't seem to really improve things). Can you suggest a
> list of the Spark configuration parameters related to speed that you
> think are key - preferably with the values you use for those
> parameters?
>
> I'd really really appreciate any help or suggestions as I've been
> working on this speed issue for 3 days without success and my head is
> starting to hurt. Thanks in advance.
>
>
>
> Thanks,
>
> --
>
> Malcolm Lockyer
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org