You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Something Something <ma...@gmail.com> on 2013/11/18 08:41:49 UTC

Joining files

I am a newbie to both Spark & Scala, but I've been working with Hadoop/Pig
for quite some time.

We've quite a few ETL processes running in production that use Pig, but now
we're evaluating Spark to see if they would indeed run faster.

A very common use case in our Pig script is joining a file containing Facts
to a file containing Dimension data.  The joins are of course, inner, left
& outer.

I thought I would start simple.  Let's say I've 2 files:

1)  Students:  student_id, course_id, score
2)  Course:  course_id, course_title

We want to produce a file that contains:  student_id, course_title, score

(Note:  This is a hypothetical case.  The real files have millions of facts
& thousands of dimensions)

Would something like this work?  Note:  I did say I am a newbie ;)

val students = sc.textFile("./students.txt")
val courses = sc.textFile("./courses.txt")
val s = students.map(x => x.split(','))
val left = students.map(x => x.split(',')).map(y => (y(1), y))
val right = courses.map(x => x.split(',')).map(y => (y(0), y))
val joined = left.join(right)


Any pointers in this regard would be greatly appreciated.  Thanks.

Re: Joining files

Posted by Alex Boisvert <al...@gmail.com>.
On Nov 20, 2013 8:34 AM, "Something Something" <ma...@gmail.com>
wrote:
>
> Questions:
>
> 1)  I don't see APIs for LEFT, FULL OUTER Joins.  True?
> 2)  Apache Pig provides different join types such as 'replicated',
'skewed'.  Now 'replicated' may not be a concern in Spark 'cause everything
happens in memory (possibly).
> 3)  Does the 'join' (which seems to work like INNER Join) guarantee
order?  For example, can I assume that columns from the left side will
appear before columns on left & their order will be preserved?

Sorry I misunderstood your question in my prior email... I somehow thought
you were talking about row order.

Column order is preserved, yes. Take a look at the join method signatures
and their types.

And spark isn't limited to "columns" but rather any user-defined type (that
is serializable).

>
> On a side note, it appears, as of now Spark cannot be used as a
replacement for Pig - without some major coding.  Agree?
>
>
>
>
> On Mon, Nov 18, 2013 at 10:47 PM, Horia <ho...@alum.berkeley.edu> wrote:
>>
>> It seems to me that what you want is the following procedure
>> - parse each file line by line
>> - generate key, value pairs
>> - join by key
>>
>> I think the following should accomplish what you're looking for
>>
>> val students = sc.textFile("./students.txt")    // mapping over this RDD
already maps over lines
>> val courses = sc.textFile("./courses.txt")    // mapping over this RDD
already maps over lines
>> val left = students.map( x => {
>>     columns = x.split(",")
>>     (columns(1), (columns(0), columns(2)))
>> } )
>> val right = courses.map( x => {
>>     columns = x.split(",")
>>     (columns(0), columns(1))
>> } )
>> val joined = left.join(right)
>>
>>
>> The major difference is selectively returning the fields which you
actually want to join, rather than all the fields. A secondary difference
is syntactic: you don't need a .map().map() since you can use a slightly
more complex function block as illustrated. I think Spark is smart enough
to optimize the .map().map() to basically what I've explicitly written...
>>
>> Horia
>>
>>
>>
>> On Mon, Nov 18, 2013 at 10:34 PM, Something Something <
mailinglists19@gmail.com> wrote:
>>>
>>> Was my question so dumb?  Or, is this not a good use case for Spark?
>>>
>>>
>>> On Sun, Nov 17, 2013 at 11:41 PM, Something Something <
mailinglists19@gmail.com> wrote:
>>>>
>>>> I am a newbie to both Spark & Scala, but I've been working with
Hadoop/Pig for quite some time.
>>>>
>>>> We've quite a few ETL processes running in production that use Pig,
but now we're evaluating Spark to see if they would indeed run faster.
>>>>
>>>> A very common use case in our Pig script is joining a file containing
Facts to a file containing Dimension data.  The joins are of course, inner,
left & outer.
>>>>
>>>> I thought I would start simple.  Let's say I've 2 files:
>>>>
>>>> 1)  Students:  student_id, course_id, score
>>>> 2)  Course:  course_id, course_title
>>>>
>>>> We want to produce a file that contains:  student_id, course_title,
score
>>>>
>>>> (Note:  This is a hypothetical case.  The real files have millions of
facts & thousands of dimensions)
>>>>
>>>> Would something like this work?  Note:  I did say I am a newbie ;)
>>>>
>>>> val students = sc.textFile("./students.txt")
>>>> val courses = sc.textFile("./courses.txt")
>>>> val s = students.map(x => x.split(','))
>>>> val left = students.map(x => x.split(',')).map(y => (y(1), y))
>>>> val right = courses.map(x => x.split(',')).map(y => (y(0), y))
>>>> val joined = left.join(right)
>>>>
>>>>
>>>> Any pointers in this regard would be greatly appreciated.  Thanks.
>>>
>>>
>>
>

Re: Joining files

Posted by Alex Boisvert <al...@gmail.com>.
On Nov 20, 2013 8:34 AM, "Something Something" <ma...@gmail.com>
wrote:
>
> Questions:
>
> 1)  I don't see APIs for LEFT, FULL OUTER Joins.  True?

The join operations are so documented here:
http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions

> 2)  Apache Pig provides different join types such as 'replicated',
'skewed'.  Now 'replicated' may not be a concern in Spark 'cause everything
happens in memory (possibly).

Spark gives you the ability to plug in your own partitioner (for shuffles)
which gives you several options to address data skew.

http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.Partitioner

> 3)  Does the 'join' (which seems to work like INNER Join) guarantee
order?  For example, can I assume that columns from the left side will
appear before columns on left & their order will be preserved?

I don't think spark provides this guarantee from a specification
perspective.  It may be that the implementation does but as a matter of
coincidence.  See also pluggable partitioner above.

Seems like a risky assumption to me but would be glad to see the behavior
specified either way.

>
> On a side note, it appears, as of now Spark cannot be used as a
replacement for Pig - without some major coding.  Agree?

I think once you wrap your head around spark, you'll find it to be a more
generally applicable framework... with the caveat that you may need to
compose several operations/features together at times whereas Pig may give
you pre-defined higher-level operations as a single unit of abstraction.

In other words, for some use-cases Pig may afford you more convenience but
I believe spark offers more expressive power and control over your
computation.

(and with the additional caveat that spark is not as mature as pig and
therefore you may run into issues/limitations that the spark developers are
still working on / pending on the roadmap)

>
>
>
>
> On Mon, Nov 18, 2013 at 10:47 PM, Horia <ho...@alum.berkeley.edu> wrote:
>>
>> It seems to me that what you want is the following procedure
>> - parse each file line by line
>> - generate key, value pairs
>> - join by key
>>
>> I think the following should accomplish what you're looking for
>>
>> val students = sc.textFile("./students.txt")    // mapping over this RDD
already maps over lines
>> val courses = sc.textFile("./courses.txt")    // mapping over this RDD
already maps over lines
>> val left = students.map( x => {
>>     columns = x.split(",")
>>     (columns(1), (columns(0), columns(2)))
>> } )
>> val right = courses.map( x => {
>>     columns = x.split(",")
>>     (columns(0), columns(1))
>> } )
>> val joined = left.join(right)
>>
>>
>> The major difference is selectively returning the fields which you
actually want to join, rather than all the fields. A secondary difference
is syntactic: you don't need a .map().map() since you can use a slightly
more complex function block as illustrated. I think Spark is smart enough
to optimize the .map().map() to basically what I've explicitly written...
>>
>> Horia
>>
>>
>>
>> On Mon, Nov 18, 2013 at 10:34 PM, Something Something <
mailinglists19@gmail.com> wrote:
>>>
>>> Was my question so dumb?  Or, is this not a good use case for Spark?
>>>
>>>
>>> On Sun, Nov 17, 2013 at 11:41 PM, Something Something <
mailinglists19@gmail.com> wrote:
>>>>
>>>> I am a newbie to both Spark & Scala, but I've been working with
Hadoop/Pig for quite some time.
>>>>
>>>> We've quite a few ETL processes running in production that use Pig,
but now we're evaluating Spark to see if they would indeed run faster.
>>>>
>>>> A very common use case in our Pig script is joining a file containing
Facts to a file containing Dimension data.  The joins are of course, inner,
left & outer.
>>>>
>>>> I thought I would start simple.  Let's say I've 2 files:
>>>>
>>>> 1)  Students:  student_id, course_id, score
>>>> 2)  Course:  course_id, course_title
>>>>
>>>> We want to produce a file that contains:  student_id, course_title,
score
>>>>
>>>> (Note:  This is a hypothetical case.  The real files have millions of
facts & thousands of dimensions)
>>>>
>>>> Would something like this work?  Note:  I did say I am a newbie ;)
>>>>
>>>> val students = sc.textFile("./students.txt")
>>>> val courses = sc.textFile("./courses.txt")
>>>> val s = students.map(x => x.split(','))
>>>> val left = students.map(x => x.split(',')).map(y => (y(1), y))
>>>> val right = courses.map(x => x.split(',')).map(y => (y(0), y))
>>>> val joined = left.join(right)
>>>>
>>>>
>>>> Any pointers in this regard would be greatly appreciated.  Thanks.
>>>
>>>
>>
>

Re: Joining files

Posted by Something Something <ma...@gmail.com>.
Questions:

1)  I don't see APIs for LEFT, FULL OUTER Joins.  True?
2)  Apache Pig provides different join types such as 'replicated',
'skewed'.  Now 'replicated' may not be a concern in Spark 'cause everything
happens in memory (possibly).
3)  Does the 'join' (which seems to work like INNER Join) guarantee order?
 For example, can I assume that columns from the left side will appear
before columns on left & their order will be preserved?

On a side note, it appears, as of now Spark cannot be used as a replacement
for Pig - without some major coding.  Agree?




On Mon, Nov 18, 2013 at 10:47 PM, Horia <ho...@alum.berkeley.edu> wrote:

> It seems to me that what you want is the following procedure
> - parse each file line by line
> - generate key, value pairs
> - join by key
>
> I think the following should accomplish what you're looking for
>
> val students = sc.textFile("./students.txt")    // mapping over this RDD
> already maps over lines
> val courses = sc.textFile("./courses.txt")    // mapping over this RDD
> already maps over lines
> val left = students.map( x => {
>     columns = x.split(",")
>     (columns(1), (columns(0), columns(2)))
> } )
> val right = courses.map( x => {
>     columns = x.split(",")
>     (columns(0), columns(1))
> } )
> val joined = left.join(right)
>
>
> The major difference is selectively returning the fields which you
> actually want to join, rather than all the fields. A secondary difference
> is syntactic: you don't need a .map().map() since you can use a slightly
> more complex function block as illustrated. I think Spark is smart enough
> to optimize the .map().map() to basically what I've explicitly written...
>
> Horia
>
>
>
> On Mon, Nov 18, 2013 at 10:34 PM, Something Something <
> mailinglists19@gmail.com> wrote:
>
>> Was my question so dumb?  Or, is this not a good use case for Spark?
>>
>>
>> On Sun, Nov 17, 2013 at 11:41 PM, Something Something <
>> mailinglists19@gmail.com> wrote:
>>
>>> I am a newbie to both Spark & Scala, but I've been working with
>>> Hadoop/Pig for quite some time.
>>>
>>> We've quite a few ETL processes running in production that use Pig, but
>>> now we're evaluating Spark to see if they would indeed run faster.
>>>
>>> A very common use case in our Pig script is joining a file containing
>>> Facts to a file containing Dimension data.  The joins are of course, inner,
>>> left & outer.
>>>
>>> I thought I would start simple.  Let's say I've 2 files:
>>>
>>> 1)  Students:  student_id, course_id, score
>>> 2)  Course:  course_id, course_title
>>>
>>> We want to produce a file that contains:  student_id, course_title, score
>>>
>>> (Note:  This is a hypothetical case.  The real files have millions of
>>> facts & thousands of dimensions)
>>>
>>> Would something like this work?  Note:  I did say I am a newbie ;)
>>>
>>> val students = sc.textFile("./students.txt")
>>> val courses = sc.textFile("./courses.txt")
>>> val s = students.map(x => x.split(','))
>>> val left = students.map(x => x.split(',')).map(y => (y(1), y))
>>> val right = courses.map(x => x.split(',')).map(y => (y(0), y))
>>> val joined = left.join(right)
>>>
>>>
>>> Any pointers in this regard would be greatly appreciated.  Thanks.
>>>
>>
>>
>

Re: Joining files

Posted by Horia <ho...@alum.berkeley.edu>.
It seems to me that what you want is the following procedure
- parse each file line by line
- generate key, value pairs
- join by key

I think the following should accomplish what you're looking for

val students = sc.textFile("./students.txt")    // mapping over this RDD
already maps over lines
val courses = sc.textFile("./courses.txt")    // mapping over this RDD
already maps over lines
val left = students.map( x => {
    columns = x.split(",")
    (columns(1), (columns(0), columns(2)))
} )
val right = courses.map( x => {
    columns = x.split(",")
    (columns(0), columns(1))
} )
val joined = left.join(right)


The major difference is selectively returning the fields which you actually
want to join, rather than all the fields. A secondary difference is
syntactic: you don't need a .map().map() since you can use a slightly more
complex function block as illustrated. I think Spark is smart enough to
optimize the .map().map() to basically what I've explicitly written...

Horia



On Mon, Nov 18, 2013 at 10:34 PM, Something Something <
mailinglists19@gmail.com> wrote:

> Was my question so dumb?  Or, is this not a good use case for Spark?
>
>
> On Sun, Nov 17, 2013 at 11:41 PM, Something Something <
> mailinglists19@gmail.com> wrote:
>
>> I am a newbie to both Spark & Scala, but I've been working with
>> Hadoop/Pig for quite some time.
>>
>> We've quite a few ETL processes running in production that use Pig, but
>> now we're evaluating Spark to see if they would indeed run faster.
>>
>> A very common use case in our Pig script is joining a file containing
>> Facts to a file containing Dimension data.  The joins are of course, inner,
>> left & outer.
>>
>> I thought I would start simple.  Let's say I've 2 files:
>>
>> 1)  Students:  student_id, course_id, score
>> 2)  Course:  course_id, course_title
>>
>> We want to produce a file that contains:  student_id, course_title, score
>>
>> (Note:  This is a hypothetical case.  The real files have millions of
>> facts & thousands of dimensions)
>>
>> Would something like this work?  Note:  I did say I am a newbie ;)
>>
>> val students = sc.textFile("./students.txt")
>> val courses = sc.textFile("./courses.txt")
>> val s = students.map(x => x.split(','))
>> val left = students.map(x => x.split(',')).map(y => (y(1), y))
>> val right = courses.map(x => x.split(',')).map(y => (y(0), y))
>> val joined = left.join(right)
>>
>>
>> Any pointers in this regard would be greatly appreciated.  Thanks.
>>
>
>

Re: Joining files

Posted by Alex Boisvert <al...@gmail.com>.
Yes it would work and fit spark nicely... Pretty typical I think.
On Nov 18, 2013 10:34 PM, "Something Something" <ma...@gmail.com>
wrote:

> Was my question so dumb?  Or, is this not a good use case for Spark?
>
>
> On Sun, Nov 17, 2013 at 11:41 PM, Something Something <
> mailinglists19@gmail.com> wrote:
>
>> I am a newbie to both Spark & Scala, but I've been working with
>> Hadoop/Pig for quite some time.
>>
>> We've quite a few ETL processes running in production that use Pig, but
>> now we're evaluating Spark to see if they would indeed run faster.
>>
>> A very common use case in our Pig script is joining a file containing
>> Facts to a file containing Dimension data.  The joins are of course, inner,
>> left & outer.
>>
>> I thought I would start simple.  Let's say I've 2 files:
>>
>> 1)  Students:  student_id, course_id, score
>> 2)  Course:  course_id, course_title
>>
>> We want to produce a file that contains:  student_id, course_title, score
>>
>> (Note:  This is a hypothetical case.  The real files have millions of
>> facts & thousands of dimensions)
>>
>> Would something like this work?  Note:  I did say I am a newbie ;)
>>
>> val students = sc.textFile("./students.txt")
>> val courses = sc.textFile("./courses.txt")
>> val s = students.map(x => x.split(','))
>> val left = students.map(x => x.split(',')).map(y => (y(1), y))
>> val right = courses.map(x => x.split(',')).map(y => (y(0), y))
>> val joined = left.join(right)
>>
>>
>> Any pointers in this regard would be greatly appreciated.  Thanks.
>>
>
>

Re: Joining files

Posted by Something Something <ma...@gmail.com>.
Was my question so dumb?  Or, is this not a good use case for Spark?


On Sun, Nov 17, 2013 at 11:41 PM, Something Something <
mailinglists19@gmail.com> wrote:

> I am a newbie to both Spark & Scala, but I've been working with Hadoop/Pig
> for quite some time.
>
> We've quite a few ETL processes running in production that use Pig, but
> now we're evaluating Spark to see if they would indeed run faster.
>
> A very common use case in our Pig script is joining a file containing
> Facts to a file containing Dimension data.  The joins are of course, inner,
> left & outer.
>
> I thought I would start simple.  Let's say I've 2 files:
>
> 1)  Students:  student_id, course_id, score
> 2)  Course:  course_id, course_title
>
> We want to produce a file that contains:  student_id, course_title, score
>
> (Note:  This is a hypothetical case.  The real files have millions of
> facts & thousands of dimensions)
>
> Would something like this work?  Note:  I did say I am a newbie ;)
>
> val students = sc.textFile("./students.txt")
> val courses = sc.textFile("./courses.txt")
> val s = students.map(x => x.split(','))
> val left = students.map(x => x.split(',')).map(y => (y(1), y))
> val right = courses.map(x => x.split(',')).map(y => (y(0), y))
> val joined = left.join(right)
>
>
> Any pointers in this regard would be greatly appreciated.  Thanks.
>