You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ilya Karpov <i....@cleverdata.ru> on 2015/08/24 11:21:40 UTC

Joining using mulitimap or array

Hi, guys
I'm confused about joining columns in SparkSQL and need your advice.
I want to join 2 datasets of profiles. Each profile has name and array of attributes(age, gender, email etc).
There can be mutliple instances of attribute with the same name, e.g. profile has 2 emails - so 2 attributes with name = 'email' in 
array. Now I want to join 2 datasets using 'email' attribute. I cant find the way to do it :(

The code is below. Now result of join is empty, while I expect to see 1 row with all Alice emails.

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

case class Attribute(name: String, value: String, weight: Float)
case class Profile(name: String, attributes: Seq[Attribute])

object SparkJoinArrayColumn {
  def main(args: Array[String]) {
    val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName(getClass.getSimpleName))
    val sqlContext: SQLContext = new SQLContext(sc)

    import sqlContext.implicits._

    val a: DataFrame = sc.parallelize(Seq(
      Profile("Alice", Seq(Attribute("email", "alice@mail.com", 1.0f), Attribute("email", "a.jones@mail.com", 1.0f)))
    )).toDF.as("a")

    val b: DataFrame = sc.parallelize(Seq(
      Profile("Alice", Seq(Attribute("email", "alice@mail.com", 1.0f), Attribute("age", "29", 0.2f)))
    )).toDF.as("b")


    a.where($"a.attributes.name" === "email")
      .join(
        b.where($"b.attributes.name" === "email"),
        $"a.attributes.value" === $"b.attributes.value"
      )
    .show()
  }
}

Thanks forward!
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Joining using mulitimap or array

Posted by Hemant Bhanawat <he...@gmail.com>.
In your example, a.attributes.name is a list and is not a string . Run this
to find it out :

a.select($"a.attributes.name").show()


On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov <i....@cleverdata.ru> wrote:

> Hi, guys
> I'm confused about joining columns in SparkSQL and need your advice.
> I want to join 2 datasets of profiles. Each profile has name and array of
> attributes(age, gender, email etc).
> There can be mutliple instances of attribute with the same name, e.g.
> profile has 2 emails - so 2 attributes with name = 'email' in
> array. Now I want to join 2 datasets using 'email' attribute. I cant find
> the way to do it :(
>
> The code is below. Now result of join is empty, while I expect to see 1
> row with all Alice emails.
>
> import org.apache.spark.sql.{DataFrame, SQLContext}
> import org.apache.spark.{SparkConf, SparkContext}
>
> case class Attribute(name: String, value: String, weight: Float)
> case class Profile(name: String, attributes: Seq[Attribute])
>
> object SparkJoinArrayColumn {
>   def main(args: Array[String]) {
>     val sc: SparkContext = new SparkContext(new
> SparkConf().setMaster("local").setAppName(getClass.getSimpleName))
>     val sqlContext: SQLContext = new SQLContext(sc)
>
>     import sqlContext.implicits._
>
>     val a: DataFrame = sc.parallelize(Seq(
>       Profile("Alice", Seq(Attribute("email", "alice@mail.com", 1.0f),
> Attribute("email", "a.jones@mail.com", 1.0f)))
>     )).toDF.as("a")
>
>     val b: DataFrame = sc.parallelize(Seq(
>       Profile("Alice", Seq(Attribute("email", "alice@mail.com", 1.0f),
> Attribute("age", "29", 0.2f)))
>     )).toDF.as("b")
>
>
>     a.where($"a.attributes.name" === "email")
>       .join(
>         b.where($"b.attributes.name" === "email"),
>         $"a.attributes.value" === $"b.attributes.value"
>       )
>     .show()
>   }
> }
>
> Thanks forward!
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Joining using mulitimap or array

Posted by Ilya Karpov <i....@cleverdata.ru>.
Thanks,
but I think this is not the case of multiple spark contexts (never the less I tried your suggestion - didn’t worked). The problem is join to datasets using array items value: attribute.value in my case. Has anyone ideas?


> 24 авг. 2015 г., в 15:01, satish chandra j <js...@gmail.com> написал(а):
> 
> Hi,
> If you join logic is correct, it seems to be a similar issue which i faced recently
> 
> Can you try by SparkContext(conf).set("spark.driver.allowMultipleContexts","true")
> 
> Regards,
> Satish Chandra
> 
> On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov <i.karpov@cleverdata.ru <ma...@cleverdata.ru>> wrote:
> Hi, guys
> I'm confused about joining columns in SparkSQL and need your advice.
> I want to join 2 datasets of profiles. Each profile has name and array of attributes(age, gender, email etc).
> There can be mutliple instances of attribute with the same name, e.g. profile has 2 emails - so 2 attributes with name = 'email' in
> array. Now I want to join 2 datasets using 'email' attribute. I cant find the way to do it :(
> 
> The code is below. Now result of join is empty, while I expect to see 1 row with all Alice emails.
> 
> import org.apache.spark.sql.{DataFrame, SQLContext}
> import org.apache.spark.{SparkConf, SparkContext}
> 
> case class Attribute(name: String, value: String, weight: Float)
> case class Profile(name: String, attributes: Seq[Attribute])
> 
> object SparkJoinArrayColumn {
>   def main(args: Array[String]) {
>     val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName(getClass.getSimpleName))
>     val sqlContext: SQLContext = new SQLContext(sc)
> 
>     import sqlContext.implicits._
> 
>     val a: DataFrame = sc.parallelize(Seq(
>       Profile("Alice", Seq(Attribute("email", "alice@mail.com <ma...@mail.com>", 1.0f), Attribute("email", "a.jones@mail.com <ma...@mail.com>", 1.0f)))
>     )).toDF.as("a")
> 
>     val b: DataFrame = sc.parallelize(Seq(
>       Profile("Alice", Seq(Attribute("email", "alice@mail.com <ma...@mail.com>", 1.0f), Attribute("age", "29", 0.2f)))
>     )).toDF.as("b")
> 
> 
>     a.where($"a.attributes.name <http://a.attributes.name/>" === "email")
>       .join(
>         b.where($"b.attributes.name <http://b.attributes.name/>" === "email"),
>         $"a.attributes.value" === $"b.attributes.value"
>       )
>     .show()
>   }
> }
> 
> Thanks forward!
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
> 
> 


Re: Joining using mulitimap or array

Posted by satish chandra j <js...@gmail.com>.
Hi,
If you join logic is correct, it seems to be a similar issue which i faced
recently

Can you try by
*SparkContext(conf).set("spark.driver.allowMultipleContexts","true")*

Regards,
Satish Chandra

On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov <i....@cleverdata.ru> wrote:

> Hi, guys
> I'm confused about joining columns in SparkSQL and need your advice.
> I want to join 2 datasets of profiles. Each profile has name and array of
> attributes(age, gender, email etc).
> There can be mutliple instances of attribute with the same name, e.g.
> profile has 2 emails - so 2 attributes with name = 'email' in
> array. Now I want to join 2 datasets using 'email' attribute. I cant find
> the way to do it :(
>
> The code is below. Now result of join is empty, while I expect to see 1
> row with all Alice emails.
>
> import org.apache.spark.sql.{DataFrame, SQLContext}
> import org.apache.spark.{SparkConf, SparkContext}
>
> case class Attribute(name: String, value: String, weight: Float)
> case class Profile(name: String, attributes: Seq[Attribute])
>
> object SparkJoinArrayColumn {
>   def main(args: Array[String]) {
>     val sc: SparkContext = new SparkContext(new
> SparkConf().setMaster("local").setAppName(getClass.getSimpleName))
>     val sqlContext: SQLContext = new SQLContext(sc)
>
>     import sqlContext.implicits._
>
>     val a: DataFrame = sc.parallelize(Seq(
>       Profile("Alice", Seq(Attribute("email", "alice@mail.com", 1.0f),
> Attribute("email", "a.jones@mail.com", 1.0f)))
>     )).toDF.as("a")
>
>     val b: DataFrame = sc.parallelize(Seq(
>       Profile("Alice", Seq(Attribute("email", "alice@mail.com", 1.0f),
> Attribute("age", "29", 0.2f)))
>     )).toDF.as("b")
>
>
>     a.where($"a.attributes.name" === "email")
>       .join(
>         b.where($"b.attributes.name" === "email"),
>         $"a.attributes.value" === $"b.attributes.value"
>       )
>     .show()
>   }
> }
>
> Thanks forward!
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>