You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Saulo Sobreiro <> on 2018/05/21 02:34:20 UTC

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Hi Javier,

Thank you a lot for the feedback.
Indeed the CPU is a huge limitation. I got a lot of trouble trying to run this use case in yarn-client mode. I managed to run this in standalone (local master) mode only.

I do not have the hardware available to run this setup in a cluster yet, so I decided to dig a little bit more in the implementation to see what could I improve. I just finished evaluating some results.
If you find something wrong or odd please let me know.

Following your suggestion to use "saveToCassandra" directly I decided to try Scala. Everything was implemented in the most similar way possible and I got surprised by the results. The scala implementation is much faster.

My current implementation is slightly different from the Python code shared some emails ago but to compare the languages influence in the most comparable way I used the following snippets:

# Scala implementation ------------------

val kstream = KafkaUtils.createDirectStream[String, String](
                 ConsumerStrategies.Subscribe[String, String](topic, kafkaParams))
           .map( x => parse(x.value) )
           .saveToCassandra("hdpkns", "batch_measurement")

# Python implementation ----------------
# Adapted from the previously shared code. However instead of calculating the metrics, it is just parsing the messages.
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"": brokers})

kafkaStream \
    .transform(parse) \

For the same streaming input the scala app took an average of ~1.5 seconds to handle each event. For the python implementation, the app took an average of ~80 seconds to handle each event (and after a lot of pickle concurrency access issues).

Note that I considered the time as the difference between the event generation (before being published to Kafka) and the moment just before the saveToCassandra.

The problem in the python implementation seems to be due to the delay introduced by the foreachRDD(casssave) call, which only runs rdd.saveToCassandra( "test_hdpkns", "measurement" ).

Honestly I was not expecting such a difference between these 2 codes... Can you understand why is this happening ?

Again, Thank you very much for your help,

Best Regards

Sharing my current Scala code below
# Scala Snippet =========================
val sparkConf = new SparkConf(). // ...
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext
val kstream = KafkaUtils.createDirectStream[String, String](
                 ConsumerStrategies.Subscribe[String, String](topic, kafkaParams))
// handle Kafka messages in a parallel fashion
val ckstream = x => parse(x.value) ).cache()
              .foreachRDD( rdd => {
              } )
              .saveToCassandra("hdpkns", "microbatch_raw_measurement")

On 30/04/2018 14:57:50, Javier Pareja <> wrote:

Hi Saulo,

If the CPU is close to 100% then you are hitting the limit. I don't think that moving to Scala will make a difference. Both Spark and Cassandra are CPU hungry, your setup is small in terms of CPUs. Try running Spark on another (physical) machine so that the 2 cores are dedicated to Cassandra.

Kind Regards

On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <>> wrote:
Hi Javier,

I will try to implement this in scala then. As far as I can see in the documentation there is no SaveToCassandra in the python interface unless you are working with dataframes and the kafkaStream instance does not provide methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to make it write faster?
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage;

The system CPU while the demo is running is almost always at 100% for both cores.

Thank you.

Best Regards,

On 29/04/2018 20:46:30, Javier Pareja <>> wrote:

Hi Saulo,

I meant using this to save:

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might compete for resources which will slow down the insert. You can check the CPU usage of the machine at the time. Also the design of the table schema can make a big difference.

On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <>> wrote:
Hi Javier,

I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly.

Do you know any workarround for this?

Thank you for the suggestion.

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <>> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <>> wrote:

Hi all,

I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space.
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: (recommended in stackoverflow for similar questions).

Thank you in advance,

Best Regards,

=============== # CODE # =================================
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf'localhost' --num-executors 2 --executor-cores 2 localhost:6667 test_topic2

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"": brokers})

kafkaStream \
    .transform(process) \



Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Posted by Timur Shenkao <>.
Did you use RDDs or DataFrames?
What is the Spark version?

On Mon, May 28, 2018 at 10:32 PM, Saulo Sobreiro <>

> Hi,
> I run a few more tests and found that even with a lot more operations on
> the scala side, python is outperformed...
> Dataset Stream duration: ~3 minutes (csv formatted data messages read from
> Kafka)
> Scala process/store time: ~3 minutes (map with split + metrics
> calculations + store raw + strore metrics )
> Python process/store time: ~7 minutes (map with split + store raw )
> This is the difference between being usable in production or not. I get
> that python is likely to be slower because of that Python - Java object
> transformations, but I was not expecting such a huge difference.
> This results are very interesting as I was comparing to the time that an
> "equivalent" application in storm takes to process the exact same stream
> (~3 minutes as well) for the same results and spark was clearly losing the
> race.
> Thank you all for your feedback :)
> Regards,
> Saulo
> On 21/05/2018 14:09:40, Russell Spitzer <> wrote:
> The answer is most likely that when you use Cross Java - Python code you
> incur a penalty for every objects that you transform from a Java object
> into a Python object (and then back again to a Python object) when data is
> being passed in and out of your functions. A way around this would probably
> be to have used the Dataframe API if possible, which would have compiled
> the interactions in Java and skipped python-java serialization. Using Scala
> from the start thought is a great idea. I would also probably remove the
> cache from your stream since that probably is only hurting (adding an
> additional serialization which is only used once.)
> On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman <>
> wrote:
>> The main language they developed spark with is scala, so all the new
>> features go first to scala, java and finally python. I'm not surprised by
>> the results, we've seen it on Stratio since the first versions of spark. At
>> the beginning of development, some of our engineers make the prototype with
>> python, but when it comes down to it, if it goes into production, it has to
>> be rewritten in scala or java, usually scala.
>> El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<
>>>) escribió:
>>> Hi Javier,
>>> Thank you a lot for the feedback.
>>> Indeed the CPU is a huge limitation. I got a lot of trouble trying to
>>> run this use case in yarn-client mode. I managed to run this in standalone
>>> (local master) mode only.
>>> I do not have the hardware available to run this setup in a cluster yet,
>>> so I decided to dig a little bit more in the implementation to see what
>>> could I improve. I just finished evaluating some results.
>>> If you find something wrong or odd please let me know.
>>> Following your suggestion to use "saveToCassandra" directly I decided to
>>> try Scala. Everything was implemented in the most similar way possible and
>>> I got surprised by the results. The scala implementation is much faster.
>>> My current implementation is slightly different from the Python code
>>> shared some emails ago but to compare the languages influence in the most
>>> comparable way I used the following snippets:
>>> # Scala implementation ------------------
>>> val kstream = KafkaUtils.createDirectStream[String, String](
>>>                  ssc,
>>>                  LocationStrategies.PreferConsistent,
>>>                  ConsumerStrategies.Subscribe[String, String](topic,
>>> kafkaParams))
>>> kstream
>>>            .map( x => parse(x.value) )
>>>            .saveToCassandra("hdpkns", "batch_measurement")
>>> # Python implementation ----------------
>>> # Adapted from the previously shared code. However instead of
>>> calculating the metrics, it is just parsing the messages.
>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>>>> {"": brokers})
>>>>>> kafkaStream \
>>>>>>     .transform(parse) \
>>>>>>     .foreachRDD(casssave)
>>> For the same streaming input the scala app took an average of ~1.5
>>> seconds to handle each event. For the python implementation, the app took
>>> an average of ~80 seconds to handle each event (and after a lot of pickle
>>> concurrency access issues).
>>> Note that I considered the time as the difference between the event
>>> generation (before being published to Kafka) and the moment just before the
>>> saveToCassandra.
>>> The problem in the python implementation seems to be due to the delay
>>> introduced by the foreachRDD(casssave) call, which only runs rdd.saveToCassandra(
>>> "test_hdpkns", "measurement" ).
>>> Honestly I was not expecting such a difference between these 2 codes...
>>> Can you understand why is this happening ?
>>> Again, Thank you very much for your help,
>>> Best Regards
>>> Sharing my current Scala code below
>>> # Scala Snippet =========================
>>> val sparkConf = new SparkConf(). // ...
>>> val ssc = new StreamingContext(sparkConf, Seconds(1))
>>> val sc = ssc.sparkContext
>>> //...
>>> val kstream = KafkaUtils.createDirectStream[String, String](
>>>                  ssc,
>>>                  LocationStrategies.PreferConsistent,
>>>                  ConsumerStrategies.Subscribe[String, String](topic,
>>> kafkaParams))
>>> //...
>>> // handle Kafka messages in a parallel fashion
>>> val ckstream = x => parse(x.value) ).cache()
>>> ckstream
>>>               .foreachRDD( rdd => {
>>>                     rdd.foreach(metrics)
>>>               } )
>>> ckstream
>>>               .saveToCassandra("hdpkns", "microbatch_raw_measurement")
>>> #=========================
>>> On 30/04/2018 14:57:50, Javier Pareja <> wrote:
>>> Hi Saulo,
>>> If the CPU is close to 100% then you are hitting the limit. I don't
>>> think that moving to Scala will make a difference. Both Spark and Cassandra
>>> are CPU hungry, your setup is small in terms of CPUs. Try running Spark on
>>> another (physical) machine so that the 2 cores are dedicated to Cassandra.
>>> Kind Regards
>>> Javier
>>> On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <>
>>> wrote:
>>>> Hi Javier,
>>>> I will try to implement this in scala then. As far as I can see in the
>>>> documentation there is no SaveToCassandra in the python interface unless
>>>> you are working with dataframes and the kafkaStream instance does not
>>>> provide methods to convert an RDD into DF.
>>>> Regarding my table, it is very simple (see below). Can I change
>>>> something to make it write faster?
>>>> CREATE TABLE test_hdpkns.measurement (
>>>>   mid bigint,
>>>>   tt timestamp,
>>>>   in_tt timestamp,
>>>>   out_tt timestamp,
>>>>   sensor_id int,
>>>>   measure double,
>>>>   PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
>>>> ) with compact storage;
>>>> The system CPU while the demo is running is almost always at 100% for
>>>> both cores.
>>>> Thank you.
>>>> Best Regards,
>>>> On 29/04/2018 20:46:30, Javier Pareja <> wrote:
>>>> Hi Saulo,
>>>> I meant using this to save:
>>>> blob/master/doc/
>>>> But it might be slow on a different area.
>>>> Another point is that Cassandra and spark running on the same machine
>>>> might compete for resources which will slow down the insert. You can check
>>>> the CPU usage of the machine at the time. Also the design of the table
>>>> schema can make a big difference.
>>>> On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <>
>>>> wrote:
>>>>> Hi Javier,
>>>>> I removed the map and used "map" directly instead of using transform,
>>>>> but the *kafkaStream* is created with KafkaUtils which does not have
>>>>> a method to save to cassandra directly.
>>>>> Do you know any workarround for this?
>>>>> Thank you for the suggestion.
>>>>> Best Regards,
>>>>> On 29/04/2018 17:03:24, Javier Pareja <> wrote:
>>>>> Hi Saulo,
>>>>> I'm no expert but I will give it a try.
>>>>> I would remove the rdd2.count(), I can't see the point and you will
>>>>> gain performance right away. Because of this, I would not use a transform,
>>>>> just directly the map.
>>>>> I have not used python but in Scala the cassandra-spark connector can
>>>>> save directly to Cassandra without a foreachRDD.
>>>>> Finally I would use the spark UI to find which stage is the bottleneck
>>>>> here.
>>>>> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <>
>>>>> wrote:
>>>>>> Hi all,
>>>>>> I am implementing a use case where I read some sensor data from Kafka
>>>>>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and,
>>>>>> after some transformations, write the output (RDD) to Cassandra.
>>>>>> Everything is working properly but I am having some trouble with the
>>>>>> performance. My kafka topic receives around 2000 messages per second. For a
>>>>>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to
>>>>>> Cassandra, which is not acceptable for longer runs.
>>>>>> I am running this application in a "sandbox" with 12GB of RAM, 2
>>>>>> cores and 30GB SSD space.
>>>>>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).
>>>>>> I would like to know you have some suggestion to improve performance
>>>>>> (other than getting more resources :) ).
>>>>>> My code (pyspark) is posted in the end of this email so you can take
>>>>>> a look. I tried some different cassandra configurations following this
>>>>>> link:
>>>>>> cassandra-a-performance-benchmark (recommended in stackoverflow for
>>>>>> similar questions).
>>>>>> Thank you in advance,
>>>>>> Best Regards,
>>>>>> Saulo
>>>>>> =============== # CODE # =================================
>>>>>> ####
>>>>>> # run command:
>>>>>> # spark2-submit --packages org.apache.spark:spark-
>>>>>> streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.
>>>>>> 7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf
>>>>>>'localhost' --num-executors 2
>>>>>> --executor-cores 2 localhost:6667 test_topic2
>>>>>> ##
>>>>>> # Run Spark imports
>>>>>> from pyspark import SparkConf # SparkContext, SparkConf
>>>>>> from pyspark.streaming import StreamingContext
>>>>>> from pyspark.streaming.kafka import KafkaUtils
>>>>>> # Run Cassandra imports
>>>>>> import pyspark_cassandra
>>>>>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>>>>>> def recordHandler(record):
>>>>>>     (mid, tt, in_tt, sid, mv) = parseData( record )
>>>>>>     return processMetrics(mid, tt, in_tt, sid, mv)
>>>>>> def process(time, rdd):
>>>>>>     rdd2 = lambda w: recordHandler(w[1]) )
>>>>>>     if rdd2.count() > 0:
>>>>>>         return rdd2
>>>>>> def casssave(time, rdd):
>>>>>>     rdd.saveToCassandra( "test_hdpkns", "measurement" )
>>>>>> # ...
>>>>>> brokers, topic = sys.argv[1:]
>>>>>> # ...
>>>>>> sconf = SparkConf() \
>>>>>>         .setAppName("SensorDataStreamHandler") \
>>>>>>         .setMaster("local[*]") \
>>>>>>         .set("spark.default.parallelism", "2")
>>>>>> sc = CassandraSparkContext(conf = sconf)
>>>>>> batchIntervalSeconds = 2
>>>>>> ssc = StreamingContext(sc, batchIntervalSeconds)
>>>>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>>>> {"": brokers})
>>>>>> kafkaStream \
>>>>>>     .transform(process) \
>>>>>>     .foreachRDD(casssave)
>>>>>> ssc.start()
>>>>>> ssc.awaitTermination()
>>>>>> ================================================
>> --
>> Alonso Isidoro Roman
>> [image: https://]
>> <>

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Posted by Saulo Sobreiro <>.
I run a few more tests and found that even with a lot more operations on the scala side, python is outperformed...

Dataset Stream duration: ~3 minutes (csv formatted data messages read from Kafka)
Scala process/store time: ~3 minutes (map with split + metrics calculations + store raw + strore metrics )
Python process/store time: ~7 minutes (map with split + store raw )

This is the difference between being usable in production or not. I get that python is likely to be slower because of that Python - Java object transformations, but I was not expecting such a huge difference.

This results are very interesting as I was comparing to the time that an "equivalent" application in storm takes to process the exact same stream (~3 minutes as well) for the same results and spark was clearly losing the race.

Thank you all for your feedback :)


On 21/05/2018 14:09:40, Russell Spitzer <> wrote:

The answer is most likely that when you use Cross Java - Python code you incur a penalty for every objects that you transform from a Java object into a Python object (and then back again to a Python object) when data is being passed in and out of your functions. A way around this would probably be to have used the Dataframe API if possible, which would have compiled the interactions in Java and skipped python-java serialization. Using Scala from the start thought is a great idea. I would also probably remove the cache from your stream since that probably is only hurting (adding an additional serialization which is only used once.)

On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman <>> wrote:
The main language they developed spark with is scala, so all the new features go first to scala, java and finally python. I'm not surprised by the results, we've seen it on Stratio since the first versions of spark. At the beginning of development, some of our engineers make the prototype with python, but when it comes down to it, if it goes into production, it has to be rewritten in scala or java, usually scala.

El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<>>) escribió:
Hi Javier,

Thank you a lot for the feedback.
Indeed the CPU is a huge limitation. I got a lot of trouble trying to run this use case in yarn-client mode. I managed to run this in standalone (local master) mode only.

I do not have the hardware available to run this setup in a cluster yet, so I decided to dig a little bit more in the implementation to see what could I improve. I just finished evaluating some results.
If you find something wrong or odd please let me know.

Following your suggestion to use "saveToCassandra" directly I decided to try Scala. Everything was implemented in the most similar way possible and I got surprised by the results. The scala implementation is much faster.

My current implementation is slightly different from the Python code shared some emails ago but to compare the languages influence in the most comparable way I used the following snippets:

# Scala implementation ------------------

val kstream = KafkaUtils.createDirectStream[String, String](
                 ConsumerStrategies.Subscribe[String, String](topic, kafkaParams))
           .map( x => parse(x.value) )
           .saveToCassandra("hdpkns", "batch_measurement")

# Python implementation ----------------
# Adapted from the previously shared code. However instead of calculating the metrics, it is just parsing the messages.
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"": brokers})

kafkaStream \
    .transform(parse) \

For the same streaming input the scala app took an average of ~1.5 seconds to handle each event. For the python implementation, the app took an average of ~80 seconds to handle each event (and after a lot of pickle concurrency access issues).

Note that I considered the time as the difference between the event generation (before being published to Kafka) and the moment just before the saveToCassandra.

The problem in the python implementation seems to be due to the delay introduced by the foreachRDD(casssave) call, which only runs rdd.saveToCassandra( "test_hdpkns", "measurement" ).

Honestly I was not expecting such a difference between these 2 codes... Can you understand why is this happening ?

Again, Thank you very much for your help,

Best Regards

Sharing my current Scala code below
# Scala Snippet =========================
val sparkConf = new SparkConf(). // ...
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext
val kstream = KafkaUtils.createDirectStream[String, String](
                 ConsumerStrategies.Subscribe[String, String](topic, kafkaParams))
// handle Kafka messages in a parallel fashion
val ckstream = x => parse(x.value) ).cache()
              .foreachRDD( rdd => {
              } )
              .saveToCassandra("hdpkns", "microbatch_raw_measurement")

On 30/04/2018 14:57:50, Javier Pareja <>> wrote:

Hi Saulo,

If the CPU is close to 100% then you are hitting the limit. I don't think that moving to Scala will make a difference. Both Spark and Cassandra are CPU hungry, your setup is small in terms of CPUs. Try running Spark on another (physical) machine so that the 2 cores are dedicated to Cassandra.

Kind Regards

On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <>> wrote:
Hi Javier,

I will try to implement this in scala then. As far as I can see in the documentation there is no SaveToCassandra in the python interface unless you are working with dataframes and the kafkaStream instance does not provide methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to make it write faster?
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage;

The system CPU while the demo is running is almost always at 100% for both cores.

Thank you.

Best Regards,

On 29/04/2018 20:46:30, Javier Pareja <>> wrote:

Hi Saulo,

I meant using this to save:

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might compete for resources which will slow down the insert. You can check the CPU usage of the machine at the time. Also the design of the table schema can make a big difference.

On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <>> wrote:
Hi Javier,

I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly.

Do you know any workarround for this?

Thank you for the suggestion.

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <>> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <>> wrote:

Hi all,

I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space.
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: (recommended in stackoverflow for similar questions).

Thank you in advance,

Best Regards,

=============== # CODE # =================================
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf'localhost' --num-executors 2 --executor-cores 2 localhost:6667 test_topic2

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"": brokers})

kafkaStream \
    .transform(process) \



Alonso Isidoro Roman

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Posted by Russell Spitzer <>.
The answer is most likely that when you use Cross Java - Python code you
incur a penalty for every objects that you transform from a Java object
into a Python object (and then back again to a Python object) when data is
being passed in and out of your functions. A way around this would probably
be to have used the Dataframe API if possible, which would have compiled
the interactions in Java and skipped python-java serialization. Using Scala
from the start thought is a great idea. I would also probably remove the
cache from your stream since that probably is only hurting (adding an
additional serialization which is only used once.)

On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman <>

> The main language they developed spark with is scala, so all the new
> features go first to scala, java and finally python. I'm not surprised by
> the results, we've seen it on Stratio since the first versions of spark. At
> the beginning of development, some of our engineers make the prototype with
> python, but when it comes down to it, if it goes into production, it has to
> be rewritten in scala or java, usually scala.
> El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<
>>) escribió:
>> Hi Javier,
>> Thank you a lot for the feedback.
>> Indeed the CPU is a huge limitation. I got a lot of trouble trying to run
>> this use case in yarn-client mode. I managed to run this in standalone
>> (local master) mode only.
>> I do not have the hardware available to run this setup in a cluster yet,
>> so I decided to dig a little bit more in the implementation to see what
>> could I improve. I just finished evaluating some results.
>> If you find something wrong or odd please let me know.
>> Following your suggestion to use "saveToCassandra" directly I decided to
>> try Scala. Everything was implemented in the most similar way possible and
>> I got surprised by the results. The scala implementation is much faster.
>> My current implementation is slightly different from the Python code
>> shared some emails ago but to compare the languages influence in the most
>> comparable way I used the following snippets:
>> # Scala implementation ------------------
>> val kstream = KafkaUtils.createDirectStream[String, String](
>>                  ssc,
>>                  LocationStrategies.PreferConsistent,
>>                  ConsumerStrategies.Subscribe[String, String](topic,
>> kafkaParams))
>> kstream
>>            .map( x => parse(x.value) )
>>            .saveToCassandra("hdpkns", "batch_measurement")
>> # Python implementation ----------------
>> # Adapted from the previously shared code. However instead of
>> calculating the metrics, it is just parsing the messages.
>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>>> {"": brokers})
>>>>> kafkaStream \
>>>>>     .transform(parse) \
>>>>>     .foreachRDD(casssave)
>> For the same streaming input the scala app took an average of ~1.5
>> seconds to handle each event. For the python implementation, the app took
>> an average of ~80 seconds to handle each event (and after a lot of pickle
>> concurrency access issues).
>> Note that I considered the time as the difference between the event
>> generation (before being published to Kafka) and the moment just before the
>> saveToCassandra.
>> The problem in the python implementation seems to be due to the delay
>> introduced by the foreachRDD(casssave) call, which only runs rdd.saveToCassandra(
>> "test_hdpkns", "measurement" ).
>> Honestly I was not expecting such a difference between these 2 codes...
>> Can you understand why is this happening ?
>> Again, Thank you very much for your help,
>> Best Regards
>> Sharing my current Scala code below
>> # Scala Snippet =========================
>> val sparkConf = new SparkConf(). // ...
>> val ssc = new StreamingContext(sparkConf, Seconds(1))
>> val sc = ssc.sparkContext
>> //...
>> val kstream = KafkaUtils.createDirectStream[String, String](
>>                  ssc,
>>                  LocationStrategies.PreferConsistent,
>>                  ConsumerStrategies.Subscribe[String, String](topic,
>> kafkaParams))
>> //...
>> // handle Kafka messages in a parallel fashion
>> val ckstream = x => parse(x.value) ).cache()
>> ckstream
>>               .foreachRDD( rdd => {
>>                     rdd.foreach(metrics)
>>               } )
>> ckstream
>>               .saveToCassandra("hdpkns", "microbatch_raw_measurement")
>> #=========================
>> On 30/04/2018 14:57:50, Javier Pareja <> wrote:
>> Hi Saulo,
>> If the CPU is close to 100% then you are hitting the limit. I don't think
>> that moving to Scala will make a difference. Both Spark and Cassandra are
>> CPU hungry, your setup is small in terms of CPUs. Try running Spark on
>> another (physical) machine so that the 2 cores are dedicated to Cassandra.
>> Kind Regards
>> Javier
>> On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <>
>> wrote:
>>> Hi Javier,
>>> I will try to implement this in scala then. As far as I can see in the
>>> documentation there is no SaveToCassandra in the python interface unless
>>> you are working with dataframes and the kafkaStream instance does not
>>> provide methods to convert an RDD into DF.
>>> Regarding my table, it is very simple (see below). Can I change
>>> something to make it write faster?
>>> CREATE TABLE test_hdpkns.measurement (
>>>   mid bigint,
>>>   tt timestamp,
>>>   in_tt timestamp,
>>>   out_tt timestamp,
>>>   sensor_id int,
>>>   measure double,
>>>   PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
>>> ) with compact storage;
>>> The system CPU while the demo is running is almost always at 100% for
>>> both cores.
>>> Thank you.
>>> Best Regards,
>>> On 29/04/2018 20:46:30, Javier Pareja <> wrote:
>>> Hi Saulo,
>>> I meant using this to save:
>>> But it might be slow on a different area.
>>> Another point is that Cassandra and spark running on the same machine
>>> might compete for resources which will slow down the insert. You can check
>>> the CPU usage of the machine at the time. Also the design of the table
>>> schema can make a big difference.
>>> On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <>
>>> wrote:
>>>> Hi Javier,
>>>> I removed the map and used "map" directly instead of using transform,
>>>> but the *kafkaStream* is created with KafkaUtils which does not have a
>>>> method to save to cassandra directly.
>>>> Do you know any workarround for this?
>>>> Thank you for the suggestion.
>>>> Best Regards,
>>>> On 29/04/2018 17:03:24, Javier Pareja <> wrote:
>>>> Hi Saulo,
>>>> I'm no expert but I will give it a try.
>>>> I would remove the rdd2.count(), I can't see the point and you will
>>>> gain performance right away. Because of this, I would not use a transform,
>>>> just directly the map.
>>>> I have not used python but in Scala the cassandra-spark connector can
>>>> save directly to Cassandra without a foreachRDD.
>>>> Finally I would use the spark UI to find which stage is the bottleneck
>>>> here.
>>>> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <>
>>>> wrote:
>>>>> Hi all,
>>>>> I am implementing a use case where I read some sensor data from Kafka
>>>>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and,
>>>>> after some transformations, write the output (RDD) to Cassandra.
>>>>> Everything is working properly but I am having some trouble with the
>>>>> performance. My kafka topic receives around 2000 messages per second. For a
>>>>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to
>>>>> Cassandra, which is not acceptable for longer runs.
>>>>> I am running this application in a "sandbox" with 12GB of RAM, 2 cores
>>>>> and 30GB SSD space.
>>>>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).
>>>>> I would like to know you have some suggestion to improve performance
>>>>> (other than getting more resources :) ).
>>>>> My code (pyspark) is posted in the end of this email so you can take a
>>>>> look. I tried some different cassandra configurations following this link:
>>>>> (recommended in stackoverflow for similar questions).
>>>>> Thank you in advance,
>>>>> Best Regards,
>>>>> Saulo
>>>>> =============== # CODE # =================================
>>>>> ####
>>>>> # run command:
>>>>> # spark2-submit --packages
>>>>> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
>>>>>  --conf'localhost' --num-executors 2
>>>>> --executor-cores 2 localhost:6667 test_topic2
>>>>> ##
>>>>> # Run Spark imports
>>>>> from pyspark import SparkConf # SparkContext, SparkConf
>>>>> from pyspark.streaming import StreamingContext
>>>>> from pyspark.streaming.kafka import KafkaUtils
>>>>> # Run Cassandra imports
>>>>> import pyspark_cassandra
>>>>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>>>>> def recordHandler(record):
>>>>>     (mid, tt, in_tt, sid, mv) = parseData( record )
>>>>>     return processMetrics(mid, tt, in_tt, sid, mv)
>>>>> def process(time, rdd):
>>>>>     rdd2 = lambda w: recordHandler(w[1]) )
>>>>>     if rdd2.count() > 0:
>>>>>         return rdd2
>>>>> def casssave(time, rdd):
>>>>>     rdd.saveToCassandra( "test_hdpkns", "measurement" )
>>>>> # ...
>>>>> brokers, topic = sys.argv[1:]
>>>>> # ...
>>>>> sconf = SparkConf() \
>>>>>         .setAppName("SensorDataStreamHandler") \
>>>>>         .setMaster("local[*]") \
>>>>>         .set("spark.default.parallelism", "2")
>>>>> sc = CassandraSparkContext(conf = sconf)
>>>>> batchIntervalSeconds = 2
>>>>> ssc = StreamingContext(sc, batchIntervalSeconds)
>>>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>>> {"": brokers})
>>>>> kafkaStream \
>>>>>     .transform(process) \
>>>>>     .foreachRDD(casssave)
>>>>> ssc.start()
>>>>> ssc.awaitTermination()
>>>>> ================================================
> --
> Alonso Isidoro Roman
> [image: https://]
> <>

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Posted by Alonso Isidoro Roman <>.
The main language they developed spark with is scala, so all the new
features go first to scala, java and finally python. I'm not surprised by
the results, we've seen it on Stratio since the first versions of spark. At
the beginning of development, some of our engineers make the prototype with
python, but when it comes down to it, if it goes into production, it has to
be rewritten in scala or java, usually scala.

El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<>)

> Hi Javier,
> Thank you a lot for the feedback.
> Indeed the CPU is a huge limitation. I got a lot of trouble trying to run
> this use case in yarn-client mode. I managed to run this in standalone
> (local master) mode only.
> I do not have the hardware available to run this setup in a cluster yet,
> so I decided to dig a little bit more in the implementation to see what
> could I improve. I just finished evaluating some results.
> If you find something wrong or odd please let me know.
> Following your suggestion to use "saveToCassandra" directly I decided to
> try Scala. Everything was implemented in the most similar way possible and
> I got surprised by the results. The scala implementation is much faster.
> My current implementation is slightly different from the Python code
> shared some emails ago but to compare the languages influence in the most
> comparable way I used the following snippets:
> # Scala implementation ------------------
> val kstream = KafkaUtils.createDirectStream[String, String](
>                  ssc,
>                  LocationStrategies.PreferConsistent,
>                  ConsumerStrategies.Subscribe[String, String](topic,
> kafkaParams))
> kstream
>            .map( x => parse(x.value) )
>            .saveToCassandra("hdpkns", "batch_measurement")
> # Python implementation ----------------
> # Adapted from the previously shared code. However instead of calculating
> the metrics, it is just parsing the messages.
> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>> {"": brokers})
>>>> kafkaStream \
>>>>     .transform(parse) \
>>>>     .foreachRDD(casssave)
> For the same streaming input the scala app took an average of ~1.5 seconds
> to handle each event. For the python implementation, the app took an
> average of ~80 seconds to handle each event (and after a lot of pickle
> concurrency access issues).
> Note that I considered the time as the difference between the event
> generation (before being published to Kafka) and the moment just before the
> saveToCassandra.
> The problem in the python implementation seems to be due to the delay
> introduced by the foreachRDD(casssave) call, which only runs rdd.saveToCassandra(
> "test_hdpkns", "measurement" ).
> Honestly I was not expecting such a difference between these 2 codes...
> Can you understand why is this happening ?
> Again, Thank you very much for your help,
> Best Regards
> Sharing my current Scala code below
> # Scala Snippet =========================
> val sparkConf = new SparkConf(). // ...
> val ssc = new StreamingContext(sparkConf, Seconds(1))
> val sc = ssc.sparkContext
> //...
> val kstream = KafkaUtils.createDirectStream[String, String](
>                  ssc,
>                  LocationStrategies.PreferConsistent,
>                  ConsumerStrategies.Subscribe[String, String](topic,
> kafkaParams))
> //...
> // handle Kafka messages in a parallel fashion
> val ckstream = x => parse(x.value) ).cache()
> ckstream
>               .foreachRDD( rdd => {
>                     rdd.foreach(metrics)
>               } )
> ckstream
>               .saveToCassandra("hdpkns", "microbatch_raw_measurement")
> #=========================
> On 30/04/2018 14:57:50, Javier Pareja <> wrote:
> Hi Saulo,
> If the CPU is close to 100% then you are hitting the limit. I don't think
> that moving to Scala will make a difference. Both Spark and Cassandra are
> CPU hungry, your setup is small in terms of CPUs. Try running Spark on
> another (physical) machine so that the 2 cores are dedicated to Cassandra.
> Kind Regards
> Javier
> On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <>
> wrote:
>> Hi Javier,
>> I will try to implement this in scala then. As far as I can see in the
>> documentation there is no SaveToCassandra in the python interface unless
>> you are working with dataframes and the kafkaStream instance does not
>> provide methods to convert an RDD into DF.
>> Regarding my table, it is very simple (see below). Can I change something
>> to make it write faster?
>> CREATE TABLE test_hdpkns.measurement (
>>   mid bigint,
>>   tt timestamp,
>>   in_tt timestamp,
>>   out_tt timestamp,
>>   sensor_id int,
>>   measure double,
>>   PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
>> ) with compact storage;
>> The system CPU while the demo is running is almost always at 100% for
>> both cores.
>> Thank you.
>> Best Regards,
>> On 29/04/2018 20:46:30, Javier Pareja <> wrote:
>> Hi Saulo,
>> I meant using this to save:
>> But it might be slow on a different area.
>> Another point is that Cassandra and spark running on the same machine
>> might compete for resources which will slow down the insert. You can check
>> the CPU usage of the machine at the time. Also the design of the table
>> schema can make a big difference.
>> On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <>
>> wrote:
>>> Hi Javier,
>>> I removed the map and used "map" directly instead of using transform,
>>> but the *kafkaStream* is created with KafkaUtils which does not have a
>>> method to save to cassandra directly.
>>> Do you know any workarround for this?
>>> Thank you for the suggestion.
>>> Best Regards,
>>> On 29/04/2018 17:03:24, Javier Pareja <> wrote:
>>> Hi Saulo,
>>> I'm no expert but I will give it a try.
>>> I would remove the rdd2.count(), I can't see the point and you will gain
>>> performance right away. Because of this, I would not use a transform, just
>>> directly the map.
>>> I have not used python but in Scala the cassandra-spark connector can
>>> save directly to Cassandra without a foreachRDD.
>>> Finally I would use the spark UI to find which stage is the bottleneck
>>> here.
>>> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <>
>>> wrote:
>>>> Hi all,
>>>> I am implementing a use case where I read some sensor data from Kafka
>>>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and,
>>>> after some transformations, write the output (RDD) to Cassandra.
>>>> Everything is working properly but I am having some trouble with the
>>>> performance. My kafka topic receives around 2000 messages per second. For a
>>>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to
>>>> Cassandra, which is not acceptable for longer runs.
>>>> I am running this application in a "sandbox" with 12GB of RAM, 2 cores
>>>> and 30GB SSD space.
>>>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).
>>>> I would like to know you have some suggestion to improve performance
>>>> (other than getting more resources :) ).
>>>> My code (pyspark) is posted in the end of this email so you can take a
>>>> look. I tried some different cassandra configurations following this link:
>>>> (recommended in stackoverflow for similar questions).
>>>> Thank you in advance,
>>>> Best Regards,
>>>> Saulo
>>>> =============== # CODE # =================================
>>>> ####
>>>> # run command:
>>>> # spark2-submit --packages
>>>> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
>>>>  --conf'localhost' --num-executors 2
>>>> --executor-cores 2 localhost:6667 test_topic2
>>>> ##
>>>> # Run Spark imports
>>>> from pyspark import SparkConf # SparkContext, SparkConf
>>>> from pyspark.streaming import StreamingContext
>>>> from pyspark.streaming.kafka import KafkaUtils
>>>> # Run Cassandra imports
>>>> import pyspark_cassandra
>>>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>>>> def recordHandler(record):
>>>>     (mid, tt, in_tt, sid, mv) = parseData( record )
>>>>     return processMetrics(mid, tt, in_tt, sid, mv)
>>>> def process(time, rdd):
>>>>     rdd2 = lambda w: recordHandler(w[1]) )
>>>>     if rdd2.count() > 0:
>>>>         return rdd2
>>>> def casssave(time, rdd):
>>>>     rdd.saveToCassandra( "test_hdpkns", "measurement" )
>>>> # ...
>>>> brokers, topic = sys.argv[1:]
>>>> # ...
>>>> sconf = SparkConf() \
>>>>         .setAppName("SensorDataStreamHandler") \
>>>>         .setMaster("local[*]") \
>>>>         .set("spark.default.parallelism", "2")
>>>> sc = CassandraSparkContext(conf = sconf)
>>>> batchIntervalSeconds = 2
>>>> ssc = StreamingContext(sc, batchIntervalSeconds)
>>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>> {"": brokers})
>>>> kafkaStream \
>>>>     .transform(process) \
>>>>     .foreachRDD(casssave)
>>>> ssc.start()
>>>> ssc.awaitTermination()
>>>> ================================================

Alonso Isidoro Roman
[image: https://]