You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shyam Parimal Katti <sp...@nyu.edu> on 2015/10/17 02:01:17 UTC

Multiple joins in Spark

Hello All,

I have a following SQL query like this:

select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
b.a_id join table_c c on b.b_id = c.b_id

In scala i have done this so far:

table_a_rdd = sc.textFile(...)
table_b_rdd = sc.textFile(...)
table_c_rdd = sc.textFile(...)

val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
(line(0), line))
val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
(line(0), line))
val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
(line(0), line))

Each line has the first value at its primary key.

While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to join,
is it possible to join multiple RDDs in a single expression? like
table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
specify the column on which I can join multiple RDDs?

Re: Multiple joins in Spark

Posted by Xiao Li <ga...@gmail.com>.
Are you using hiveContext?

First, build your Spark using the following command:
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver
-DskipTests clean package

Then, try this sample program

object SimpleApp {
  case class Individual(name: String, surname: String, birthDate: String)

  def main(args: Array[String]) {
    val sc = new SparkContext("local", "join DFs")
    //val sqlContext = new SQLContext(sc)
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

    val rdd = sc.parallelize(Seq(
      Individual("a", "c", "10/10/1972"),
      Individual("b", "d", "10/11/1970"),
    ))

    val df = hiveContext.createDataFrame(rdd)

    df.registerTempTable("tab")

    val dfHive = hiveContext.sql("select * from tab")

    dfHive.show()
  }
}


2015-10-20 6:24 GMT-07:00 Shyam Parimal Katti <sp...@nyu.edu>:

> When I do the steps above and run a query like this:
>
> sqlContext.sql("select * from ...")
>
> I get exception:
>
> org.apache.spark.sql.AnalysisException: Non-local session path expected to
> be non-null;
>    at org.apache.spark.sql.hive.HiveQL$.createPlan(HiveQl.scala:260)
>    .....
>
> I cannot paste the entire stack since it's on a company laptop and I am
> not allowed to copy paste things! Though if absolutely needed to help, I
> can figure out some way to provide it.
>
> On Sat, Oct 17, 2015 at 1:13 AM, Xiao Li <ga...@gmail.com> wrote:
>
>> Hi, Shyam,
>>
>> The method registerTempTable is to register a [DataFrame as a temporary
>> table in the Catalog using the given table name.
>>
>> In the Catalog, Spark maintains a concurrent hashmap, which contains the
>> pair of the table names and the logical plan.
>>
>> For example, when we submit the following query,
>>
>> SELECT * FROM inMemoryDF
>>
>> The concurrent hashmap contains one map from name to the Logical Plan:
>>
>> "inMemoryDF" -> "LogicalRDD [c1#0,c2#1,c3#2,c4#3], MapPartitionsRDD[1] at
>> createDataFrame at SimpleApp.scala:42
>>
>> Therefore, using SQL will not hurt your performance. The actual physical
>> plan to execute your SQL query is generated by the result of Catalyst
>> optimizer.
>>
>> Good luck,
>>
>> Xiao Li
>>
>>
>>
>> 2015-10-16 20:53 GMT-07:00 Shyam Parimal Katti <sp...@nyu.edu>:
>>
>>> Thanks Xiao! Question about the internals, would you know what happens
>>> when createTempTable() is called? I. E.  Does it create an RDD internally
>>> or some internal representation that lets it join with  an RDD?
>>>
>>> Again, thanks for the answer.
>>> On Oct 16, 2015 8:15 PM, "Xiao Li" <ga...@gmail.com> wrote:
>>>
>>>> Hi, Shyam,
>>>>
>>>> You still can use SQL to do the same thing in Spark:
>>>>
>>>> For example,
>>>>
>>>>     val df1 = sqlContext.createDataFrame(rdd)
>>>>     val df2 = sqlContext.createDataFrame(rdd2)
>>>>     val df3 = sqlContext.createDataFrame(rdd3)
>>>>     df1.registerTempTable("tab1")
>>>>     df2.registerTempTable("tab2")
>>>>     df3.registerTempTable("tab3")
>>>>     val exampleSQL = sqlContext.sql("select * from tab1, tab2, tab3
>>>> where tab1.name = tab2.name and tab2.id = tab3.id")
>>>>
>>>> Good luck,
>>>>
>>>> Xiao Li
>>>>
>>>> 2015-10-16 17:01 GMT-07:00 Shyam Parimal Katti <sp...@nyu.edu>:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I have a following SQL query like this:
>>>>>
>>>>> select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id
>>>>> = b.a_id join table_c c on b.b_id = c.b_id
>>>>>
>>>>> In scala i have done this so far:
>>>>>
>>>>> table_a_rdd = sc.textFile(...)
>>>>> table_b_rdd = sc.textFile(...)
>>>>> table_c_rdd = sc.textFile(...)
>>>>>
>>>>> val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>>>> (line(0), line))
>>>>> val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>>>> (line(0), line))
>>>>> val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>>>> (line(0), line))
>>>>>
>>>>> Each line has the first value at its primary key.
>>>>>
>>>>> While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to
>>>>> join, is it possible to join multiple RDDs in a single expression? like
>>>>> table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
>>>>> specify the column on which I can join multiple RDDs?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>
>

Re: Multiple joins in Spark

Posted by Shyam Parimal Katti <sp...@nyu.edu>.
When I do the steps above and run a query like this:

sqlContext.sql("select * from ...")

I get exception:

org.apache.spark.sql.AnalysisException: Non-local session path expected to
be non-null;
   at org.apache.spark.sql.hive.HiveQL$.createPlan(HiveQl.scala:260)
   .....

I cannot paste the entire stack since it's on a company laptop and I am not
allowed to copy paste things! Though if absolutely needed to help, I can
figure out some way to provide it.

On Sat, Oct 17, 2015 at 1:13 AM, Xiao Li <ga...@gmail.com> wrote:

> Hi, Shyam,
>
> The method registerTempTable is to register a [DataFrame as a temporary
> table in the Catalog using the given table name.
>
> In the Catalog, Spark maintains a concurrent hashmap, which contains the
> pair of the table names and the logical plan.
>
> For example, when we submit the following query,
>
> SELECT * FROM inMemoryDF
>
> The concurrent hashmap contains one map from name to the Logical Plan:
>
> "inMemoryDF" -> "LogicalRDD [c1#0,c2#1,c3#2,c4#3], MapPartitionsRDD[1] at
> createDataFrame at SimpleApp.scala:42
>
> Therefore, using SQL will not hurt your performance. The actual physical
> plan to execute your SQL query is generated by the result of Catalyst
> optimizer.
>
> Good luck,
>
> Xiao Li
>
>
>
> 2015-10-16 20:53 GMT-07:00 Shyam Parimal Katti <sp...@nyu.edu>:
>
>> Thanks Xiao! Question about the internals, would you know what happens
>> when createTempTable() is called? I. E.  Does it create an RDD internally
>> or some internal representation that lets it join with  an RDD?
>>
>> Again, thanks for the answer.
>> On Oct 16, 2015 8:15 PM, "Xiao Li" <ga...@gmail.com> wrote:
>>
>>> Hi, Shyam,
>>>
>>> You still can use SQL to do the same thing in Spark:
>>>
>>> For example,
>>>
>>>     val df1 = sqlContext.createDataFrame(rdd)
>>>     val df2 = sqlContext.createDataFrame(rdd2)
>>>     val df3 = sqlContext.createDataFrame(rdd3)
>>>     df1.registerTempTable("tab1")
>>>     df2.registerTempTable("tab2")
>>>     df3.registerTempTable("tab3")
>>>     val exampleSQL = sqlContext.sql("select * from tab1, tab2, tab3
>>> where tab1.name = tab2.name and tab2.id = tab3.id")
>>>
>>> Good luck,
>>>
>>> Xiao Li
>>>
>>> 2015-10-16 17:01 GMT-07:00 Shyam Parimal Katti <sp...@nyu.edu>:
>>>
>>>> Hello All,
>>>>
>>>> I have a following SQL query like this:
>>>>
>>>> select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
>>>> b.a_id join table_c c on b.b_id = c.b_id
>>>>
>>>> In scala i have done this so far:
>>>>
>>>> table_a_rdd = sc.textFile(...)
>>>> table_b_rdd = sc.textFile(...)
>>>> table_c_rdd = sc.textFile(...)
>>>>
>>>> val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>>> (line(0), line))
>>>> val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>>> (line(0), line))
>>>> val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>>> (line(0), line))
>>>>
>>>> Each line has the first value at its primary key.
>>>>
>>>> While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to
>>>> join, is it possible to join multiple RDDs in a single expression? like
>>>> table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
>>>> specify the column on which I can join multiple RDDs?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>

Re: Multiple joins in Spark

Posted by Xiao Li <ga...@gmail.com>.
Hi, Shyam,

The method registerTempTable is to register a [DataFrame as a temporary
table in the Catalog using the given table name.

In the Catalog, Spark maintains a concurrent hashmap, which contains the
pair of the table names and the logical plan.

For example, when we submit the following query,

SELECT * FROM inMemoryDF

The concurrent hashmap contains one map from name to the Logical Plan:

"inMemoryDF" -> "LogicalRDD [c1#0,c2#1,c3#2,c4#3], MapPartitionsRDD[1] at
createDataFrame at SimpleApp.scala:42

Therefore, using SQL will not hurt your performance. The actual physical
plan to execute your SQL query is generated by the result of Catalyst
optimizer.

Good luck,

Xiao Li



2015-10-16 20:53 GMT-07:00 Shyam Parimal Katti <sp...@nyu.edu>:

> Thanks Xiao! Question about the internals, would you know what happens
> when createTempTable() is called? I. E.  Does it create an RDD internally
> or some internal representation that lets it join with  an RDD?
>
> Again, thanks for the answer.
> On Oct 16, 2015 8:15 PM, "Xiao Li" <ga...@gmail.com> wrote:
>
>> Hi, Shyam,
>>
>> You still can use SQL to do the same thing in Spark:
>>
>> For example,
>>
>>     val df1 = sqlContext.createDataFrame(rdd)
>>     val df2 = sqlContext.createDataFrame(rdd2)
>>     val df3 = sqlContext.createDataFrame(rdd3)
>>     df1.registerTempTable("tab1")
>>     df2.registerTempTable("tab2")
>>     df3.registerTempTable("tab3")
>>     val exampleSQL = sqlContext.sql("select * from tab1, tab2, tab3 where
>> tab1.name = tab2.name and tab2.id = tab3.id")
>>
>> Good luck,
>>
>> Xiao Li
>>
>> 2015-10-16 17:01 GMT-07:00 Shyam Parimal Katti <sp...@nyu.edu>:
>>
>>> Hello All,
>>>
>>> I have a following SQL query like this:
>>>
>>> select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
>>> b.a_id join table_c c on b.b_id = c.b_id
>>>
>>> In scala i have done this so far:
>>>
>>> table_a_rdd = sc.textFile(...)
>>> table_b_rdd = sc.textFile(...)
>>> table_c_rdd = sc.textFile(...)
>>>
>>> val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>> (line(0), line))
>>> val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>> (line(0), line))
>>> val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>> (line(0), line))
>>>
>>> Each line has the first value at its primary key.
>>>
>>> While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to
>>> join, is it possible to join multiple RDDs in a single expression? like
>>> table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
>>> specify the column on which I can join multiple RDDs?
>>>
>>>
>>>
>>>
>>>
>>

Re: Multiple joins in Spark

Posted by Xiao Li <ga...@gmail.com>.
Hi, Shyam,

You still can use SQL to do the same thing in Spark:

For example,

    val df1 = sqlContext.createDataFrame(rdd)
    val df2 = sqlContext.createDataFrame(rdd2)
    val df3 = sqlContext.createDataFrame(rdd3)
    df1.registerTempTable("tab1")
    df2.registerTempTable("tab2")
    df3.registerTempTable("tab3")
    val exampleSQL = sqlContext.sql("select * from tab1, tab2, tab3 where
tab1.name = tab2.name and tab2.id = tab3.id")

Good luck,

Xiao Li

2015-10-16 17:01 GMT-07:00 Shyam Parimal Katti <sp...@nyu.edu>:

> Hello All,
>
> I have a following SQL query like this:
>
> select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
> b.a_id join table_c c on b.b_id = c.b_id
>
> In scala i have done this so far:
>
> table_a_rdd = sc.textFile(...)
> table_b_rdd = sc.textFile(...)
> table_c_rdd = sc.textFile(...)
>
> val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
> val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
> val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
>
> Each line has the first value at its primary key.
>
> While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to join,
> is it possible to join multiple RDDs in a single expression? like
> table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
> specify the column on which I can join multiple RDDs?
>
>
>
>
>