You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by be...@chapter7.ch on 2016/02/09 15:58:03 UTC

[Spark Streaming] Joining Kafka and Cassandra DataFrames

All,

I'm new to Spark and I'm having a hard time doing a simple join of two DFs

Intent:
-  I'm receiving data from Kafka via direct stream and would like to  
enrich the messages with data from Cassandra. The Kafka messages  
(Protobufs) are decoded into DataFrames and then joined with a  
(supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)  
streaming batch size to raw C* data is [several streaming messages to  
millions of C* rows], BUT the join always yields exactly ONE result  
[1:1] per message. After the join the resulting DF is eventually  
stored to another C* table.

Problem:
- Even though I'm joining the two DFs on the full Cassandra primary  
key and pushing the corresponding filter to C*, it seems that Spark is  
loading the whole C* data-set into memory before actually joining  
(which I'd like to prevent by using the filter/predicate pushdown).  
This leads to a lot of shuffling and tasks being spawned, hence the  
"simple" join takes forever...

Could anyone shed some light on this? In my perception this should be  
a prime-example for DFs and Spark Streaming.

Environment:
- Spark 1.6
- Cassandra 2.1.12
- Cassandra-Spark-Connector 1.5-RC1
- Kafka 0.8.2.2

Code:

def main(args: Array[String]) {
     val conf = new SparkConf()
       .setAppName("test")
       .set("spark.cassandra.connection.host", "xxx")
       .set("spark.cassandra.connection.keep_alive_ms", "30000")
       .setMaster("local[*]")

     val ssc = new StreamingContext(conf, Seconds(10))
     ssc.sparkContext.setLogLevel("INFO")

     // Initialise Kafka
     val kafkaTopics = Set[String]("xxx")
     val kafkaParams = Map[String, String](
       "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
       "auto.offset.reset" -> "smallest")

     // Kafka stream
     val messages = KafkaUtils.createDirectStream[String, MyMsg,  
StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)

     // Executed on the driver
     messages.foreachRDD { rdd =>

       // Create an instance of SQLContext
       val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
       import sqlContext.implicits._

       // Map MyMsg RDD
       val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}

       // Convert RDD[MyMsg] to DataFrame
       val MyMsgDf = MyMsgRdd.toDF()
         .select(
             $"prim1Id" as 'prim1_id,
             $"prim2Id" as 'prim2_id,
             $...
       )

       // Load DataFrame from C* data-source
       val base_data = base_data_df.getInstance(sqlContext)

       // Inner join on prim1Id and prim2Id
       val joinedDf = MyMsgDf.join(base_data,
             MyMsgDf("prim1_id") === base_data("prim1_id") &&
             MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
             .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
                 && base_data("prim2_id").isin(MyMsgDf("prim2_id")))

       joinedDf.show()
       joinedDf.printSchema()

       // Select relevant fields

       // Persist

     }

     // Start the computation
     ssc.start()
     ssc.awaitTermination()
}

SO:  
http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Posted by be...@chapter7.ch.
The filter in the join is re-arranged in the DAG (from what I can tell  
--> explain/UI) and should therefore be pushed accordingly. I also  
made experiments applying the filter to base_data before the join  
explicitly, effectively creating a new DF, but no luck either.


Quoting Mohammed Guller <mo...@glassbeam.com>:

> Moving the spark mailing list to BCC since this is not really  
> related to Spark.
>
> May be I am missing something, but where are you calling the filter  
> method on the base_data DF to push down the predicates to Cassandra  
> before calling the join method?
>
> Mohammed
> Author: Big Data Analytics with Spark
>
>
> -----Original Message-----
> From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch]
> Sent: Tuesday, February 9, 2016 10:47 PM
> To: Mohammed Guller
> Cc: user@spark.apache.org
> Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>
> Hi Mohammed
>
> I'm aware of that documentation, what are you hinting at specifically?
> I'm pushing all elements of the partition key, so that should work.  
> As user zero323 on SO pointed out it the problem is most probably  
> related to the dynamic nature of the predicate elements (two  
> distributed collections per filter per join).
>
> The statement "To push down partition keys, all of them must be  
> included, but not more than one predicate per partition key,  
> otherwise nothing is pushed down."
>
> Does not apply IMO?
>
> Bernhard
>
> Quoting Mohammed Guller <mo...@glassbeam.com>:
>
>> Hi Bernhard,
>>
>> Take a look at the examples shown under the "Pushing down clauses to
>> Cassandra" sections on this page:
>>
>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/
>> 14_data_frames.md
>>
>>
>> Mohammed
>> Author: Big Data Analytics with Spark
>>
>> -----Original Message-----
>> From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch]
>> Sent: Tuesday, February 9, 2016 10:05 PM
>> To: Mohammed Guller
>> Cc: user@spark.apache.org
>> Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>>
>> Hi Mohammed
>>
>> Thanks for hint, I should probably do that :)
>>
>> As for the DF singleton:
>>
>> /**
>>   * Lazily instantiated singleton instance of base_data DataFrame
>>   */
>> object base_data_df {
>>
>>    @transient private var instance: DataFrame = _
>>
>>    def getInstance(sqlContext: SQLContext): DataFrame = {
>>      if (instance == null) {
>>        // Load DataFrame with C* data-source
>>        instance = sqlContext.read
>>          .format("org.apache.spark.sql.cassandra")
>>          .options(Map("table" -> "cf", "keyspace" -> "ks"))
>>          .load()
>>      }
>>      instance
>>    }
>> }
>>
>> Bernhard
>>
>> Quoting Mohammed Guller <mo...@glassbeam.com>:
>>
>>> You may have better luck with this question on the Spark Cassandra
>>> Connector mailing list.
>>>
>>>
>>>
>>> One quick question about this code from your email:
>>>
>>>        // Load DataFrame from C* data-source
>>>
>>>        val base_data = base_data_df.getInstance(sqlContext)
>>>
>>>
>>>
>>> What exactly is base_data_df and how are you creating it?
>>>
>>> Mohammed
>>> Author: Big Data Analytics with
>>> Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp
>>> /
>>> 1484209656/>
>>>
>>>
>>>
>>> -----Original Message-----
>>> From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch]
>>> Sent: Tuesday, February 9, 2016 6:58 AM
>>> To: user@spark.apache.org
>>> Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>>>
>>>
>>>
>>> All,
>>>
>>>
>>>
>>> I'm new to Spark and I'm having a hard time doing a simple join of
>>> two DFs
>>>
>>>
>>>
>>> Intent:
>>>
>>> -  I'm receiving data from Kafka via direct stream and would like to
>>> enrich the messages with data from Cassandra. The Kafka messages
>>>
>>> (Protobufs) are decoded into DataFrames and then joined with a
>>> (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)
>>> streaming batch size to raw C* data is [several streaming messages to
>>> millions of C* rows], BUT the join always yields exactly ONE result
>>> [1:1] per message. After the join the resulting DF is eventually
>>> stored to another C* table.
>>>
>>>
>>>
>>> Problem:
>>>
>>> - Even though I'm joining the two DFs on the full Cassandra primary
>>> key and pushing the corresponding filter to C*, it seems that Spark
>>> is loading the whole C* data-set into memory before actually joining
>>> (which I'd like to prevent by using the filter/predicate pushdown).
>>>
>>> This leads to a lot of shuffling and tasks being spawned, hence the
>>> "simple" join takes forever...
>>>
>>>
>>>
>>> Could anyone shed some light on this? In my perception this should be
>>> a prime-example for DFs and Spark Streaming.
>>>
>>>
>>>
>>> Environment:
>>>
>>> - Spark 1.6
>>>
>>> - Cassandra 2.1.12
>>>
>>> - Cassandra-Spark-Connector 1.5-RC1
>>>
>>> - Kafka 0.8.2.2
>>>
>>>
>>>
>>> Code:
>>>
>>>
>>>
>>> def main(args: Array[String]) {
>>>
>>>      val conf = new SparkConf()
>>>
>>>        .setAppName("test")
>>>
>>>        .set("spark.cassandra.connection.host", "xxx")
>>>
>>>        .set("spark.cassandra.connection.keep_alive_ms", "30000")
>>>
>>>        .setMaster("local[*]")
>>>
>>>
>>>
>>>      val ssc = new StreamingContext(conf, Seconds(10))
>>>
>>>      ssc.sparkContext.setLogLevel("INFO")
>>>
>>>
>>>
>>>      // Initialise Kafka
>>>
>>>      val kafkaTopics = Set[String]("xxx")
>>>
>>>      val kafkaParams = Map[String, String](
>>>
>>>        "metadata.broker.list" ->
>>> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
>>>
>>>        "auto.offset.reset" -> "smallest")
>>>
>>>
>>>
>>>      // Kafka stream
>>>
>>>      val messages = KafkaUtils.createDirectStream[String, MyMsg,
>>> StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
>>>
>>>
>>>
>>>      // Executed on the driver
>>>
>>>      messages.foreachRDD { rdd =>
>>>
>>>
>>>
>>>        // Create an instance of SQLContext
>>>
>>>        val sqlContext =
>>> SQLContextSingleton.getInstance(rdd.sparkContext)
>>>
>>>        import sqlContext.implicits._
>>>
>>>
>>>
>>>        // Map MyMsg RDD
>>>
>>>        val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
>>>
>>>
>>>
>>>        // Convert RDD[MyMsg] to DataFrame
>>>
>>>        val MyMsgDf = MyMsgRdd.toDF()
>>>
>>>         .select(
>>>
>>>              $"prim1Id" as 'prim1_id,
>>>
>>>              $"prim2Id" as 'prim2_id,
>>>
>>>              $...
>>>
>>>        )
>>>
>>>
>>>
>>>        // Load DataFrame from C* data-source
>>>
>>>        val base_data = base_data_df.getInstance(sqlContext)
>>>
>>>
>>>
>>>        // Inner join on prim1Id and prim2Id
>>>
>>>        val joinedDf = MyMsgDf.join(base_data,
>>>
>>>              MyMsgDf("prim1_id") === base_data("prim1_id") &&
>>>
>>>              MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
>>>
>>>              .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
>>>
>>>                  && base_data("prim2_id").isin(MyMsgDf("prim2_id")))
>>>
>>>
>>>
>>>        joinedDf.show()
>>>
>>>        joinedDf.printSchema()
>>>
>>>
>>>
>>>        // Select relevant fields
>>>
>>>
>>>
>>>        // Persist
>>>
>>>
>>>
>>>      }
>>>
>>>
>>>
>>>      // Start the computation
>>>
>>>      ssc.start()
>>>
>>>      ssc.awaitTermination()
>>>
>>> }
>>>
>>>
>>>
>>> SO:
>>>
>>> http://stackoverflow.com/questions/35295182/joining-kafka-and-cassand
>>> r a-dataframes-in-spark-streaming-ignores-c-predicate-p
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>>
>>> To unsubscribe, e-mail:
>>> user-unsubscribe@spark.apache.org<mailto:user-unsubscribe@spark.apach
>>> e
>>> .org>
>>> For additional commands, e-mail:
>>> user-help@spark.apache.org<ma...@spark.apache.org>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For
>> additional commands, e-mail: user-help@spark.apache.org




---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Posted by Mohammed Guller <mo...@glassbeam.com>.
Moving the spark mailing list to BCC since this is not really related to Spark.

May be I am missing something, but where are you calling the filter method on the base_data DF to push down the predicates to Cassandra before calling the join method? 

Mohammed
Author: Big Data Analytics with Spark


-----Original Message-----
From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch] 
Sent: Tuesday, February 9, 2016 10:47 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

I'm aware of that documentation, what are you hinting at specifically?  
I'm pushing all elements of the partition key, so that should work. As user zero323 on SO pointed out it the problem is most probably related to the dynamic nature of the predicate elements (two distributed collections per filter per join).

The statement "To push down partition keys, all of them must be included, but not more than one predicate per partition key, otherwise nothing is pushed down."

Does not apply IMO?

Bernhard

Quoting Mohammed Guller <mo...@glassbeam.com>:

> Hi Bernhard,
>
> Take a look at the examples shown under the "Pushing down clauses to 
> Cassandra" sections on this page:
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/
> 14_data_frames.md
>
>
> Mohammed
> Author: Big Data Analytics with Spark
>
> -----Original Message-----
> From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch]
> Sent: Tuesday, February 9, 2016 10:05 PM
> To: Mohammed Guller
> Cc: user@spark.apache.org
> Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>
> Hi Mohammed
>
> Thanks for hint, I should probably do that :)
>
> As for the DF singleton:
>
> /**
>   * Lazily instantiated singleton instance of base_data DataFrame
>   */
> object base_data_df {
>
>    @transient private var instance: DataFrame = _
>
>    def getInstance(sqlContext: SQLContext): DataFrame = {
>      if (instance == null) {
>        // Load DataFrame with C* data-source
>        instance = sqlContext.read
>          .format("org.apache.spark.sql.cassandra")
>          .options(Map("table" -> "cf", "keyspace" -> "ks"))
>          .load()
>      }
>      instance
>    }
> }
>
> Bernhard
>
> Quoting Mohammed Guller <mo...@glassbeam.com>:
>
>> You may have better luck with this question on the Spark Cassandra 
>> Connector mailing list.
>>
>>
>>
>> One quick question about this code from your email:
>>
>>        // Load DataFrame from C* data-source
>>
>>        val base_data = base_data_df.getInstance(sqlContext)
>>
>>
>>
>> What exactly is base_data_df and how are you creating it?
>>
>> Mohammed
>> Author: Big Data Analytics with
>> Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp
>> /
>> 1484209656/>
>>
>>
>>
>> -----Original Message-----
>> From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch]
>> Sent: Tuesday, February 9, 2016 6:58 AM
>> To: user@spark.apache.org
>> Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>>
>>
>>
>> All,
>>
>>
>>
>> I'm new to Spark and I'm having a hard time doing a simple join of 
>> two DFs
>>
>>
>>
>> Intent:
>>
>> -  I'm receiving data from Kafka via direct stream and would like to 
>> enrich the messages with data from Cassandra. The Kafka messages
>>
>> (Protobufs) are decoded into DataFrames and then joined with a 
>> (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) 
>> streaming batch size to raw C* data is [several streaming messages to 
>> millions of C* rows], BUT the join always yields exactly ONE result 
>> [1:1] per message. After the join the resulting DF is eventually 
>> stored to another C* table.
>>
>>
>>
>> Problem:
>>
>> - Even though I'm joining the two DFs on the full Cassandra primary 
>> key and pushing the corresponding filter to C*, it seems that Spark 
>> is loading the whole C* data-set into memory before actually joining 
>> (which I'd like to prevent by using the filter/predicate pushdown).
>>
>> This leads to a lot of shuffling and tasks being spawned, hence the 
>> "simple" join takes forever...
>>
>>
>>
>> Could anyone shed some light on this? In my perception this should be 
>> a prime-example for DFs and Spark Streaming.
>>
>>
>>
>> Environment:
>>
>> - Spark 1.6
>>
>> - Cassandra 2.1.12
>>
>> - Cassandra-Spark-Connector 1.5-RC1
>>
>> - Kafka 0.8.2.2
>>
>>
>>
>> Code:
>>
>>
>>
>> def main(args: Array[String]) {
>>
>>      val conf = new SparkConf()
>>
>>        .setAppName("test")
>>
>>        .set("spark.cassandra.connection.host", "xxx")
>>
>>        .set("spark.cassandra.connection.keep_alive_ms", "30000")
>>
>>        .setMaster("local[*]")
>>
>>
>>
>>      val ssc = new StreamingContext(conf, Seconds(10))
>>
>>      ssc.sparkContext.setLogLevel("INFO")
>>
>>
>>
>>      // Initialise Kafka
>>
>>      val kafkaTopics = Set[String]("xxx")
>>
>>      val kafkaParams = Map[String, String](
>>
>>        "metadata.broker.list" ->
>> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
>>
>>        "auto.offset.reset" -> "smallest")
>>
>>
>>
>>      // Kafka stream
>>
>>      val messages = KafkaUtils.createDirectStream[String, MyMsg, 
>> StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
>>
>>
>>
>>      // Executed on the driver
>>
>>      messages.foreachRDD { rdd =>
>>
>>
>>
>>        // Create an instance of SQLContext
>>
>>        val sqlContext =
>> SQLContextSingleton.getInstance(rdd.sparkContext)
>>
>>        import sqlContext.implicits._
>>
>>
>>
>>        // Map MyMsg RDD
>>
>>        val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
>>
>>
>>
>>        // Convert RDD[MyMsg] to DataFrame
>>
>>        val MyMsgDf = MyMsgRdd.toDF()
>>
>>         .select(
>>
>>              $"prim1Id" as 'prim1_id,
>>
>>              $"prim2Id" as 'prim2_id,
>>
>>              $...
>>
>>        )
>>
>>
>>
>>        // Load DataFrame from C* data-source
>>
>>        val base_data = base_data_df.getInstance(sqlContext)
>>
>>
>>
>>        // Inner join on prim1Id and prim2Id
>>
>>        val joinedDf = MyMsgDf.join(base_data,
>>
>>              MyMsgDf("prim1_id") === base_data("prim1_id") &&
>>
>>              MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
>>
>>              .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
>>
>>                  && base_data("prim2_id").isin(MyMsgDf("prim2_id")))
>>
>>
>>
>>        joinedDf.show()
>>
>>        joinedDf.printSchema()
>>
>>
>>
>>        // Select relevant fields
>>
>>
>>
>>        // Persist
>>
>>
>>
>>      }
>>
>>
>>
>>      // Start the computation
>>
>>      ssc.start()
>>
>>      ssc.awaitTermination()
>>
>> }
>>
>>
>>
>> SO:
>>
>> http://stackoverflow.com/questions/35295182/joining-kafka-and-cassand
>> r a-dataframes-in-spark-streaming-ignores-c-predicate-p
>>
>>
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>>
>> To unsubscribe, e-mail:
>> user-unsubscribe@spark.apache.org<mailto:user-unsubscribe@spark.apach
>> e
>> .org>
>> For additional commands, e-mail:
>> user-help@spark.apache.org<ma...@spark.apache.org>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For 
> additional commands, e-mail: user-help@spark.apache.org




---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Posted by be...@chapter7.ch.
Hi Mohammed

I'm aware of that documentation, what are you hinting at specifically?  
I'm pushing all elements of the partition key, so that should work. As  
user zero323 on SO pointed out it the problem is most probably related  
to the dynamic nature of the predicate elements (two distributed  
collections per filter per join).

The statement "To push down partition keys, all of them must be  
included, but not more than one predicate per partition key, otherwise  
nothing is pushed down."

Does not apply IMO?

Bernhard

Quoting Mohammed Guller <mo...@glassbeam.com>:

> Hi Bernhard,
>
> Take a look at the examples shown under the "Pushing down clauses to  
> Cassandra" sections on this page:
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
>
>
> Mohammed
> Author: Big Data Analytics with Spark
>
> -----Original Message-----
> From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch]
> Sent: Tuesday, February 9, 2016 10:05 PM
> To: Mohammed Guller
> Cc: user@spark.apache.org
> Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>
> Hi Mohammed
>
> Thanks for hint, I should probably do that :)
>
> As for the DF singleton:
>
> /**
>   * Lazily instantiated singleton instance of base_data DataFrame
>   */
> object base_data_df {
>
>    @transient private var instance: DataFrame = _
>
>    def getInstance(sqlContext: SQLContext): DataFrame = {
>      if (instance == null) {
>        // Load DataFrame with C* data-source
>        instance = sqlContext.read
>          .format("org.apache.spark.sql.cassandra")
>          .options(Map("table" -> "cf", "keyspace" -> "ks"))
>          .load()
>      }
>      instance
>    }
> }
>
> Bernhard
>
> Quoting Mohammed Guller <mo...@glassbeam.com>:
>
>> You may have better luck with this question on the Spark Cassandra
>> Connector mailing list.
>>
>>
>>
>> One quick question about this code from your email:
>>
>>        // Load DataFrame from C* data-source
>>
>>        val base_data = base_data_df.getInstance(sqlContext)
>>
>>
>>
>> What exactly is base_data_df and how are you creating it?
>>
>> Mohammed
>> Author: Big Data Analytics with
>> Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/
>> 1484209656/>
>>
>>
>>
>> -----Original Message-----
>> From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch]
>> Sent: Tuesday, February 9, 2016 6:58 AM
>> To: user@spark.apache.org
>> Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>>
>>
>>
>> All,
>>
>>
>>
>> I'm new to Spark and I'm having a hard time doing a simple join of two
>> DFs
>>
>>
>>
>> Intent:
>>
>> -  I'm receiving data from Kafka via direct stream and would like to
>> enrich the messages with data from Cassandra. The Kafka messages
>>
>> (Protobufs) are decoded into DataFrames and then joined with a
>> (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)
>> streaming batch size to raw C* data is [several streaming messages to
>> millions of C* rows], BUT the join always yields exactly ONE result
>> [1:1] per message. After the join the resulting DF is eventually
>> stored to another C* table.
>>
>>
>>
>> Problem:
>>
>> - Even though I'm joining the two DFs on the full Cassandra primary
>> key and pushing the corresponding filter to C*, it seems that Spark is
>> loading the whole C* data-set into memory before actually joining
>> (which I'd like to prevent by using the filter/predicate pushdown).
>>
>> This leads to a lot of shuffling and tasks being spawned, hence the
>> "simple" join takes forever...
>>
>>
>>
>> Could anyone shed some light on this? In my perception this should be
>> a prime-example for DFs and Spark Streaming.
>>
>>
>>
>> Environment:
>>
>> - Spark 1.6
>>
>> - Cassandra 2.1.12
>>
>> - Cassandra-Spark-Connector 1.5-RC1
>>
>> - Kafka 0.8.2.2
>>
>>
>>
>> Code:
>>
>>
>>
>> def main(args: Array[String]) {
>>
>>      val conf = new SparkConf()
>>
>>        .setAppName("test")
>>
>>        .set("spark.cassandra.connection.host", "xxx")
>>
>>        .set("spark.cassandra.connection.keep_alive_ms", "30000")
>>
>>        .setMaster("local[*]")
>>
>>
>>
>>      val ssc = new StreamingContext(conf, Seconds(10))
>>
>>      ssc.sparkContext.setLogLevel("INFO")
>>
>>
>>
>>      // Initialise Kafka
>>
>>      val kafkaTopics = Set[String]("xxx")
>>
>>      val kafkaParams = Map[String, String](
>>
>>        "metadata.broker.list" ->
>> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
>>
>>        "auto.offset.reset" -> "smallest")
>>
>>
>>
>>      // Kafka stream
>>
>>      val messages = KafkaUtils.createDirectStream[String, MyMsg,
>> StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
>>
>>
>>
>>      // Executed on the driver
>>
>>      messages.foreachRDD { rdd =>
>>
>>
>>
>>        // Create an instance of SQLContext
>>
>>        val sqlContext =
>> SQLContextSingleton.getInstance(rdd.sparkContext)
>>
>>        import sqlContext.implicits._
>>
>>
>>
>>        // Map MyMsg RDD
>>
>>        val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
>>
>>
>>
>>        // Convert RDD[MyMsg] to DataFrame
>>
>>        val MyMsgDf = MyMsgRdd.toDF()
>>
>>         .select(
>>
>>              $"prim1Id" as 'prim1_id,
>>
>>              $"prim2Id" as 'prim2_id,
>>
>>              $...
>>
>>        )
>>
>>
>>
>>        // Load DataFrame from C* data-source
>>
>>        val base_data = base_data_df.getInstance(sqlContext)
>>
>>
>>
>>        // Inner join on prim1Id and prim2Id
>>
>>        val joinedDf = MyMsgDf.join(base_data,
>>
>>              MyMsgDf("prim1_id") === base_data("prim1_id") &&
>>
>>              MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
>>
>>              .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
>>
>>                  && base_data("prim2_id").isin(MyMsgDf("prim2_id")))
>>
>>
>>
>>        joinedDf.show()
>>
>>        joinedDf.printSchema()
>>
>>
>>
>>        // Select relevant fields
>>
>>
>>
>>        // Persist
>>
>>
>>
>>      }
>>
>>
>>
>>      // Start the computation
>>
>>      ssc.start()
>>
>>      ssc.awaitTermination()
>>
>> }
>>
>>
>>
>> SO:
>>
>> http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandr
>> a-dataframes-in-spark-streaming-ignores-c-predicate-p
>>
>>
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>>
>> To unsubscribe, e-mail:
>> user-unsubscribe@spark.apache.org<mailto:user-unsubscribe@spark.apache
>> .org>
>> For additional commands, e-mail:
>> user-help@spark.apache.org<ma...@spark.apache.org>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org




---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Posted by Mohammed Guller <mo...@glassbeam.com>.
Hi Bernhard,

Take a look at the examples shown under the "Pushing down clauses to Cassandra" sections on this page:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md


Mohammed
Author: Big Data Analytics with Spark

-----Original Message-----
From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch] 
Sent: Tuesday, February 9, 2016 10:05 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
  * Lazily instantiated singleton instance of base_data DataFrame
  */
object base_data_df {

   @transient private var instance: DataFrame = _

   def getInstance(sqlContext: SQLContext): DataFrame = {
     if (instance == null) {
       // Load DataFrame with C* data-source
       instance = sqlContext.read
         .format("org.apache.spark.sql.cassandra")
         .options(Map("table" -> "cf", "keyspace" -> "ks"))
         .load()
     }
     instance
   }
}

Bernhard

Quoting Mohammed Guller <mo...@glassbeam.com>:

> You may have better luck with this question on the Spark Cassandra 
> Connector mailing list.
>
>
>
> One quick question about this code from your email:
>
>        // Load DataFrame from C* data-source
>
>        val base_data = base_data_df.getInstance(sqlContext)
>
>
>
> What exactly is base_data_df and how are you creating it?
>
> Mohammed
> Author: Big Data Analytics with
> Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/
> 1484209656/>
>
>
>
> -----Original Message-----
> From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch]
> Sent: Tuesday, February 9, 2016 6:58 AM
> To: user@spark.apache.org
> Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>
>
>
> All,
>
>
>
> I'm new to Spark and I'm having a hard time doing a simple join of two 
> DFs
>
>
>
> Intent:
>
> -  I'm receiving data from Kafka via direct stream and would like to 
> enrich the messages with data from Cassandra. The Kafka messages
>
> (Protobufs) are decoded into DataFrames and then joined with a 
> (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) 
> streaming batch size to raw C* data is [several streaming messages to 
> millions of C* rows], BUT the join always yields exactly ONE result 
> [1:1] per message. After the join the resulting DF is eventually 
> stored to another C* table.
>
>
>
> Problem:
>
> - Even though I'm joining the two DFs on the full Cassandra primary 
> key and pushing the corresponding filter to C*, it seems that Spark is 
> loading the whole C* data-set into memory before actually joining 
> (which I'd like to prevent by using the filter/predicate pushdown).
>
> This leads to a lot of shuffling and tasks being spawned, hence the 
> "simple" join takes forever...
>
>
>
> Could anyone shed some light on this? In my perception this should be 
> a prime-example for DFs and Spark Streaming.
>
>
>
> Environment:
>
> - Spark 1.6
>
> - Cassandra 2.1.12
>
> - Cassandra-Spark-Connector 1.5-RC1
>
> - Kafka 0.8.2.2
>
>
>
> Code:
>
>
>
> def main(args: Array[String]) {
>
>      val conf = new SparkConf()
>
>        .setAppName("test")
>
>        .set("spark.cassandra.connection.host", "xxx")
>
>        .set("spark.cassandra.connection.keep_alive_ms", "30000")
>
>        .setMaster("local[*]")
>
>
>
>      val ssc = new StreamingContext(conf, Seconds(10))
>
>      ssc.sparkContext.setLogLevel("INFO")
>
>
>
>      // Initialise Kafka
>
>      val kafkaTopics = Set[String]("xxx")
>
>      val kafkaParams = Map[String, String](
>
>        "metadata.broker.list" -> 
> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
>
>        "auto.offset.reset" -> "smallest")
>
>
>
>      // Kafka stream
>
>      val messages = KafkaUtils.createDirectStream[String, MyMsg, 
> StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
>
>
>
>      // Executed on the driver
>
>      messages.foreachRDD { rdd =>
>
>
>
>        // Create an instance of SQLContext
>
>        val sqlContext = 
> SQLContextSingleton.getInstance(rdd.sparkContext)
>
>        import sqlContext.implicits._
>
>
>
>        // Map MyMsg RDD
>
>        val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
>
>
>
>        // Convert RDD[MyMsg] to DataFrame
>
>        val MyMsgDf = MyMsgRdd.toDF()
>
>         .select(
>
>              $"prim1Id" as 'prim1_id,
>
>              $"prim2Id" as 'prim2_id,
>
>              $...
>
>        )
>
>
>
>        // Load DataFrame from C* data-source
>
>        val base_data = base_data_df.getInstance(sqlContext)
>
>
>
>        // Inner join on prim1Id and prim2Id
>
>        val joinedDf = MyMsgDf.join(base_data,
>
>              MyMsgDf("prim1_id") === base_data("prim1_id") &&
>
>              MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
>
>              .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
>
>                  && base_data("prim2_id").isin(MyMsgDf("prim2_id")))
>
>
>
>        joinedDf.show()
>
>        joinedDf.printSchema()
>
>
>
>        // Select relevant fields
>
>
>
>        // Persist
>
>
>
>      }
>
>
>
>      // Start the computation
>
>      ssc.start()
>
>      ssc.awaitTermination()
>
> }
>
>
>
> SO:
>
> http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandr
> a-dataframes-in-spark-streaming-ignores-c-predicate-p
>
>
>
>
>
>
>
> ---------------------------------------------------------------------
>
> To unsubscribe, e-mail:  
> user-unsubscribe@spark.apache.org<mailto:user-unsubscribe@spark.apache
> .org>
> For additional commands, e-mail:  
> user-help@spark.apache.org<ma...@spark.apache.org>




---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Posted by be...@chapter7.ch.
Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
  * Lazily instantiated singleton instance of base_data DataFrame
  */
object base_data_df {

   @transient private var instance: DataFrame = _

   def getInstance(sqlContext: SQLContext): DataFrame = {
     if (instance == null) {
       // Load DataFrame with C* data-source
       instance = sqlContext.read
         .format("org.apache.spark.sql.cassandra")
         .options(Map("table" -> "cf", "keyspace" -> "ks"))
         .load()
     }
     instance
   }
}

Bernhard

Quoting Mohammed Guller <mo...@glassbeam.com>:

> You may have better luck with this question on the Spark Cassandra  
> Connector mailing list.
>
>
>
> One quick question about this code from your email:
>
>        // Load DataFrame from C* data-source
>
>        val base_data = base_data_df.getInstance(sqlContext)
>
>
>
> What exactly is base_data_df and how are you creating it?
>
> Mohammed
> Author: Big Data Analytics with  
> Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> -----Original Message-----
> From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch]
> Sent: Tuesday, February 9, 2016 6:58 AM
> To: user@spark.apache.org
> Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>
>
>
> All,
>
>
>
> I'm new to Spark and I'm having a hard time doing a simple join of two DFs
>
>
>
> Intent:
>
> -  I'm receiving data from Kafka via direct stream and would like to  
> enrich the messages with data from Cassandra. The Kafka messages
>
> (Protobufs) are decoded into DataFrames and then joined with a  
> (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)  
> streaming batch size to raw C* data is [several streaming messages  
> to millions of C* rows], BUT the join always yields exactly ONE  
> result [1:1] per message. After the join the resulting DF is  
> eventually stored to another C* table.
>
>
>
> Problem:
>
> - Even though I'm joining the two DFs on the full Cassandra primary  
> key and pushing the corresponding filter to C*, it seems that Spark  
> is loading the whole C* data-set into memory before actually joining  
> (which I'd like to prevent by using the filter/predicate pushdown).
>
> This leads to a lot of shuffling and tasks being spawned, hence the  
> "simple" join takes forever...
>
>
>
> Could anyone shed some light on this? In my perception this should  
> be a prime-example for DFs and Spark Streaming.
>
>
>
> Environment:
>
> - Spark 1.6
>
> - Cassandra 2.1.12
>
> - Cassandra-Spark-Connector 1.5-RC1
>
> - Kafka 0.8.2.2
>
>
>
> Code:
>
>
>
> def main(args: Array[String]) {
>
>      val conf = new SparkConf()
>
>        .setAppName("test")
>
>        .set("spark.cassandra.connection.host", "xxx")
>
>        .set("spark.cassandra.connection.keep_alive_ms", "30000")
>
>        .setMaster("local[*]")
>
>
>
>      val ssc = new StreamingContext(conf, Seconds(10))
>
>      ssc.sparkContext.setLogLevel("INFO")
>
>
>
>      // Initialise Kafka
>
>      val kafkaTopics = Set[String]("xxx")
>
>      val kafkaParams = Map[String, String](
>
>        "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
>
>        "auto.offset.reset" -> "smallest")
>
>
>
>      // Kafka stream
>
>      val messages = KafkaUtils.createDirectStream[String, MyMsg,  
> StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
>
>
>
>      // Executed on the driver
>
>      messages.foreachRDD { rdd =>
>
>
>
>        // Create an instance of SQLContext
>
>        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
>
>        import sqlContext.implicits._
>
>
>
>        // Map MyMsg RDD
>
>        val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
>
>
>
>        // Convert RDD[MyMsg] to DataFrame
>
>        val MyMsgDf = MyMsgRdd.toDF()
>
>         .select(
>
>              $"prim1Id" as 'prim1_id,
>
>              $"prim2Id" as 'prim2_id,
>
>              $...
>
>        )
>
>
>
>        // Load DataFrame from C* data-source
>
>        val base_data = base_data_df.getInstance(sqlContext)
>
>
>
>        // Inner join on prim1Id and prim2Id
>
>        val joinedDf = MyMsgDf.join(base_data,
>
>              MyMsgDf("prim1_id") === base_data("prim1_id") &&
>
>              MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
>
>              .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
>
>                  && base_data("prim2_id").isin(MyMsgDf("prim2_id")))
>
>
>
>        joinedDf.show()
>
>        joinedDf.printSchema()
>
>
>
>        // Select relevant fields
>
>
>
>        // Persist
>
>
>
>      }
>
>
>
>      // Start the computation
>
>      ssc.start()
>
>      ssc.awaitTermination()
>
> }
>
>
>
> SO:
>
> http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p
>
>
>
>
>
>
>
> ---------------------------------------------------------------------
>
> To unsubscribe, e-mail:  
> user-unsubscribe@spark.apache.org<ma...@spark.apache.org>  
> For additional commands, e-mail:  
> user-help@spark.apache.org<ma...@spark.apache.org>




---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Posted by Mohammed Guller <mo...@glassbeam.com>.
You may have better luck with this question on the Spark Cassandra Connector mailing list.



One quick question about this code from your email:

       // Load DataFrame from C* data-source

       val base_data = base_data_df.getInstance(sqlContext)



What exactly is base_data_df and how are you creating it?

Mohammed
Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>



-----Original Message-----
From: bernhard@chapter7.ch [mailto:bernhard@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames



All,



I'm new to Spark and I'm having a hard time doing a simple join of two DFs



Intent:

-  I'm receiving data from Kafka via direct stream and would like to enrich the messages with data from Cassandra. The Kafka messages

(Protobufs) are decoded into DataFrames and then joined with a (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) streaming batch size to raw C* data is [several streaming messages to millions of C* rows], BUT the join always yields exactly ONE result [1:1] per message. After the join the resulting DF is eventually stored to another C* table.



Problem:

- Even though I'm joining the two DFs on the full Cassandra primary key and pushing the corresponding filter to C*, it seems that Spark is loading the whole C* data-set into memory before actually joining (which I'd like to prevent by using the filter/predicate pushdown).

This leads to a lot of shuffling and tasks being spawned, hence the "simple" join takes forever...



Could anyone shed some light on this? In my perception this should be a prime-example for DFs and Spark Streaming.



Environment:

- Spark 1.6

- Cassandra 2.1.12

- Cassandra-Spark-Connector 1.5-RC1

- Kafka 0.8.2.2



Code:



def main(args: Array[String]) {

     val conf = new SparkConf()

       .setAppName("test")

       .set("spark.cassandra.connection.host", "xxx")

       .set("spark.cassandra.connection.keep_alive_ms", "30000")

       .setMaster("local[*]")



     val ssc = new StreamingContext(conf, Seconds(10))

     ssc.sparkContext.setLogLevel("INFO")



     // Initialise Kafka

     val kafkaTopics = Set[String]("xxx")

     val kafkaParams = Map[String, String](

       "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",

       "auto.offset.reset" -> "smallest")



     // Kafka stream

     val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)



     // Executed on the driver

     messages.foreachRDD { rdd =>



       // Create an instance of SQLContext

       val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)

       import sqlContext.implicits._



       // Map MyMsg RDD

       val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}



       // Convert RDD[MyMsg] to DataFrame

       val MyMsgDf = MyMsgRdd.toDF()

        .select(

             $"prim1Id" as 'prim1_id,

             $"prim2Id" as 'prim2_id,

             $...

       )



       // Load DataFrame from C* data-source

       val base_data = base_data_df.getInstance(sqlContext)



       // Inner join on prim1Id and prim2Id

       val joinedDf = MyMsgDf.join(base_data,

             MyMsgDf("prim1_id") === base_data("prim1_id") &&

             MyMsgDf("prim2_id") === base_data("prim2_id"), "left")

             .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))

                 && base_data("prim2_id").isin(MyMsgDf("prim2_id")))



       joinedDf.show()

       joinedDf.printSchema()



       // Select relevant fields



       // Persist



     }



     // Start the computation

     ssc.start()

     ssc.awaitTermination()

}



SO:

http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p







---------------------------------------------------------------------

To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org> For additional commands, e-mail: user-help@spark.apache.org<ma...@spark.apache.org>