You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Saulo Sobreiro <sa...@outlook.pt> on 2018/04/28 23:05:47 UTC
[Spark2.X] SparkStreaming to Cassandra performance problem
Hi all,
I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.
Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.
I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. HDP:
Spark 2.1
I would like to know you have some suggestion to improve performance (other than getting more resources :) ).
My code (pyspark) is posted in the end of this email so you can take a look.
Thank you in advance,
Best Regards,
Saulo
=============== # CODE # =================================
####
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##
# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra
def recordHandler(record):
(mid, tt, in_tt, sid, mv) = parseData( record )
return processMetrics(mid, tt, in_tt, sid, mv)
def process(time, rdd):
rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
if rdd2.count() > 0:
return rdd2
def casssave(time, rdd):
rdd.saveToCassandra( "test_hdpkns", "measurement" )
# ...
brokers, topic = sys.argv[1:]
# ...
sconf = SparkConf() \
.setAppName("SensorDataStreamHandler") \
.setMaster("local[*]") \
.set("spark.default.parallelism", "2")
sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
kafkaStream \
.transform(process) \
.foreachRDD(casssave)
ssc.start()
ssc.awaitTermination()
================================================