You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Stuart White <st...@gmail.com> on 2016/11/10 22:45:02 UTC

Joining to a large, pre-sorted file

I have a large "master" file (~700m records) that I frequently join smaller
"transaction" files to.  (The transaction files have 10's of millions of
records, so too large for a broadcast join).

I would like to pre-sort the master file, write it to disk, and then, in
subsequent jobs, read the file off disk and join to it without having to
re-sort it.  I'm using Spark SQL, and my understanding is that the Spark
Catalyst Optimizer will choose an optimal join algorithm if it is aware
that the datasets are sorted.  So, the trick is to make the optimizer aware
that the master file is already sorted.

I think SPARK-12394 <https://issues.apache.org/jira/browse/SPARK-12394>
provides this functionality, but I can't seem to put the pieces together
for how to use it.

Could someone possibly provide a simple example of how to:

   1. Sort a master file by a key column and write it to disk in such a way
   that its "sorted-ness" is preserved.
   2. In a later job, read a transaction file, sort/partition it as
   necessary.  Read the master file, preserving its sorted-ness.  Join the two
   DataFrames in such a way that the master rows are not sorted again.

Thanks!

Re: Joining to a large, pre-sorted file

Posted by Rohit Verma <ro...@rokittech.com>.
You can try coalesce on join statement.
val result = master.join(transaction,”key”). coalesce(# number of partitions in master)
On Nov 15, 2016, at 8:07 PM, Stuart White <st...@gmail.com>> wrote:

It seems that the number of files could possibly get out of hand using this approach.

For example, in the job that buckets and writes master, assuming we use the default number of shuffle partitions (200), and assuming that in the next job (the job where we join to transaction), we're also going to want to use 200 partitions, that means master would be written to disk in 40,000 files (200 partitions, each writing 200 bucket files).  Am I mistaken?

Is there some way to avoid this explosion of the number of files?  Or is this just an unavoidable side-effect of Spark's bucketing implementation?

Thanks again!

On Sun, Nov 13, 2016 at 9:24 AM, Silvio Fiorito <si...@granturing.com>> wrote:

Hi Stuart,

Yes that's the query plan but if you take a look at my screenshot it skips the first stage since the datasets are co-partitioned.

Thanks,
Silvio

________________________________
From: Stuart White <st...@gmail.com>>
Sent: Saturday, November 12, 2016 11:20:28 AM
To: Silvio Fiorito
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Joining to a large, pre-sorted file

Hi Silvio,

Thanks very much for the response!

I'm pretty new at reading explain plans, so maybe I'm misunderstanding what I'm seeing.

Remember my goal is to sort master, write it out, later read it back in and have Spark "remember" that it's sorted, so I can do joins and Spark will not sort it again.

Looking at the explain plan for the example job you provided, it looks to me like Spark is re-sorted master after reading it back in.  See the attachment for the Sort step I'm referring to.

Am I misunderstanding the explain plan?

Thanks again!

On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito <si...@granturing.com>> wrote:

Hi Stuart,



You don’t need the sortBy or sortWithinPartitions.



https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/6861830365114179/latest.html





This is what the job should look like:



<image001.png>

On 11/12/16, 8:40 AM, "Stuart White" <st...@gmail.com>> wrote:



    Thanks for the reply.



    I understand that I need to use bucketBy() to write my master file,

    but I still can't seem to make it work as expected.  Here's a code

    example for how I'm writing my master file:



    Range(0, 1000000)

      .map(i => (i, s"master_$i"))

      .toDF("key", "value")

      .write

      .format("json")

      .bucketBy(3, "key")

      .sortBy("key")

      .saveAsTable("master")



    And here's how I'm reading it later and attempting to join to a

    transaction dataset:



    val master = spark

      .read

      .format("json")

      .json("spark-warehouse/master")

      .cache



    val transaction = Range(0, 1000000)

      .map(i => (i, s"transaction_$i"))

      .toDF("key", "value")

      .repartition(3, 'key)

      .sortWithinPartitions('key)

      .cache



    val results = master.join(transaction, "key")



    When I call results.explain(), I see that it is sorting both datasets

    before sending them through SortMergeJoin.



    == Physical Plan ==

    *Project [key#0L, value#1, value#53]

    +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner

      :- *Sort [key#0L ASC], false, 0

       :  +- Exchange hashpartitioning(key#0L, 200)

       :     +- *Filter isnotnull(key#0L)

       :        +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]

       :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,

    StorageLevel(disk, memory, deserialized, 1 replicas)

       :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,

    InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],

    PushedFilters: [], ReadSchema: struct<key:bigint,value:string>

       +- *Sort [cast(key#52 as bigint) ASC], false, 0

          +- Exchange hashpartitioning(cast(key#52 as bigint), 200)

             +- InMemoryTableScan [key#52, value#53]

                :  +- InMemoryRelation [key#52, value#53], true, 10000,

    StorageLevel(disk, memory, deserialized, 1 replicas)

                :     :  +- *Sort [key#52 ASC], false, 0

                :     :     +- Exchange hashpartitioning(key#52, 3)

                :     :        +- LocalTableScan [key#52, value#53]



    Here are my thoughts:

    1. I think I'm probably reading the master file back into memory

    incorrectly.  I think maybe I should be reading it as a Hive table

    rather than just a plain json file, but I can't seem to figure out how

    to do that.

    2. I don't understand exactly when partition counts/bucket counts are

    important.  For example, in this example, at the time it's written,

    master has 1 partition and is written into 3 buckets, resulting in 3

    files being written out.  Later when I generated my transaction

    dataset, I repartitioned it into 3 partitions.  Was that the correct

    thing to do (3 transaction partitions == 3 master buckets)?  Or should

    I have repartitioned master into 3 partitions before writing

    (resulting in 9 files if I still create 3 buckets)?  Basically, I

    don't understand how partitions and buckets should be handled.



    So, I feel like I'm close, but there are a few ways in which I don't

    understand how these pieces are supposed to fit together.  If this is

    explained somewhere, with a simple example, that would be great.






Re: Joining to a large, pre-sorted file

Posted by Stuart White <st...@gmail.com>.
It seems that the number of files could possibly get out of hand using this
approach.

For example, in the job that buckets and writes master, assuming we use the
default number of shuffle partitions (200), and assuming that in the next
job (the job where we join to transaction), we're also going to want to use
200 partitions, that means master would be written to disk in 40,000 files
(200 partitions, each writing 200 bucket files).  Am I mistaken?

Is there some way to avoid this explosion of the number of files?  Or is
this just an unavoidable side-effect of Spark's bucketing implementation?

Thanks again!

On Sun, Nov 13, 2016 at 9:24 AM, Silvio Fiorito <
silvio.fiorito@granturing.com> wrote:

> Hi Stuart,
>
> Yes that's the query plan but if you take a look at my screenshot it skips
> the first stage since the datasets are co-partitioned.
>
> Thanks,
> Silvio
> ------------------------------
> *From:* Stuart White <st...@gmail.com>
> *Sent:* Saturday, November 12, 2016 11:20:28 AM
> *To:* Silvio Fiorito
> *Cc:* user@spark.apache.org
> *Subject:* Re: Joining to a large, pre-sorted file
>
> Hi Silvio,
>
> Thanks very much for the response!
>
> I'm pretty new at reading explain plans, so maybe I'm misunderstanding
> what I'm seeing.
>
> Remember my goal is to sort master, write it out, later read it back in
> and have Spark "remember" that it's sorted, so I can do joins and Spark
> will not sort it again.
>
> Looking at the explain plan for the example job you provided, it looks to
> me like Spark is re-sorted master after reading it back in.  See the
> attachment for the Sort step I'm referring to.
>
> Am I misunderstanding the explain plan?
>
> Thanks again!
>
> On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito <
> silvio.fiorito@granturing.com> wrote:
>
>> Hi Stuart,
>>
>>
>>
>> You don’t need the sortBy or sortWithinPartitions.
>>
>>
>>
>> https://databricks-prod-cloudfront.cloud.databricks.com/
>> public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/8799
>> 01972425732/6861830365114179/latest.html
>>
>>
>>
>>
>>
>> This is what the job should look like:
>>
>>
>>
>> On 11/12/16, 8:40 AM, "Stuart White" <st...@gmail.com> wrote:
>>
>>
>>
>>     Thanks for the reply.
>>
>>
>>
>>     I understand that I need to use bucketBy() to write my master file,
>>
>>     but I still can't seem to make it work as expected.  Here's a code
>>
>>     example for how I'm writing my master file:
>>
>>
>>
>>     Range(0, 1000000)
>>
>>       .map(i => (i, s"master_$i"))
>>
>>       .toDF("key", "value")
>>
>>       .write
>>
>>       .format("json")
>>
>>       .bucketBy(3, "key")
>>
>>       .sortBy("key")
>>
>>       .saveAsTable("master")
>>
>>
>>
>>     And here's how I'm reading it later and attempting to join to a
>>
>>     transaction dataset:
>>
>>
>>
>>     val master = spark
>>
>>       .read
>>
>>       .format("json")
>>
>>       .json("spark-warehouse/master")
>>
>>       .cache
>>
>>
>>
>>     val transaction = Range(0, 1000000)
>>
>>       .map(i => (i, s"transaction_$i"))
>>
>>       .toDF("key", "value")
>>
>>       .repartition(3, 'key)
>>
>>       .sortWithinPartitions('key)
>>
>>       .cache
>>
>>
>>
>>     val results = master.join(transaction, "key")
>>
>>
>>
>>     When I call results.explain(), I see that it is sorting both datasets
>>
>>     before sending them through SortMergeJoin.
>>
>>
>>
>>     == Physical Plan ==
>>
>>     *Project [key#0L, value#1, value#53]
>>
>>     +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner
>>
>>       :- *Sort [key#0L ASC], false, 0
>>
>>        :  +- Exchange hashpartitioning(key#0L, 200)
>>
>>        :     +- *Filter isnotnull(key#0L)
>>
>>        :        +- InMemoryTableScan [key#0L, value#1],
>> [isnotnull(key#0L)]
>>
>>        :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,
>>
>>     StorageLevel(disk, memory, deserialized, 1 replicas)
>>
>>        :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,
>>
>>     InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],
>>
>>     PushedFilters: [], ReadSchema: struct<key:bigint,value:string>
>>
>>        +- *Sort [cast(key#52 as bigint) ASC], false, 0
>>
>>           +- Exchange hashpartitioning(cast(key#52 as bigint), 200)
>>
>>              +- InMemoryTableScan [key#52, value#53]
>>
>>                 :  +- InMemoryRelation [key#52, value#53], true, 10000,
>>
>>     StorageLevel(disk, memory, deserialized, 1 replicas)
>>
>>                 :     :  +- *Sort [key#52 ASC], false, 0
>>
>>                 :     :     +- Exchange hashpartitioning(key#52, 3)
>>
>>                 :     :        +- LocalTableScan [key#52, value#53]
>>
>>
>>
>>     Here are my thoughts:
>>
>>     1. I think I'm probably reading the master file back into memory
>>
>>     incorrectly.  I think maybe I should be reading it as a Hive table
>>
>>     rather than just a plain json file, but I can't seem to figure out how
>>
>>     to do that.
>>
>>     2. I don't understand exactly when partition counts/bucket counts are
>>
>>     important.  For example, in this example, at the time it's written,
>>
>>     master has 1 partition and is written into 3 buckets, resulting in 3
>>
>>     files being written out.  Later when I generated my transaction
>>
>>     dataset, I repartitioned it into 3 partitions.  Was that the correct
>>
>>     thing to do (3 transaction partitions == 3 master buckets)?  Or should
>>
>>     I have repartitioned master into 3 partitions before writing
>>
>>     (resulting in 9 files if I still create 3 buckets)?  Basically, I
>>
>>     don't understand how partitions and buckets should be handled.
>>
>>
>>
>>     So, I feel like I'm close, but there are a few ways in which I don't
>>
>>     understand how these pieces are supposed to fit together.  If this is
>>
>>     explained somewhere, with a simple example, that would be great.
>>
>>
>>
>
>

Re: Joining to a large, pre-sorted file

Posted by Silvio Fiorito <si...@granturing.com>.
Hi Stuart,

Yes that's the query plan but if you take a look at my screenshot it skips the first stage since the datasets are co-partitioned.

Thanks,
Silvio

________________________________
From: Stuart White <st...@gmail.com>
Sent: Saturday, November 12, 2016 11:20:28 AM
To: Silvio Fiorito
Cc: user@spark.apache.org
Subject: Re: Joining to a large, pre-sorted file

Hi Silvio,

Thanks very much for the response!

I'm pretty new at reading explain plans, so maybe I'm misunderstanding what I'm seeing.

Remember my goal is to sort master, write it out, later read it back in and have Spark "remember" that it's sorted, so I can do joins and Spark will not sort it again.

Looking at the explain plan for the example job you provided, it looks to me like Spark is re-sorted master after reading it back in.  See the attachment for the Sort step I'm referring to.

Am I misunderstanding the explain plan?

Thanks again!

On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito <si...@granturing.com>> wrote:

Hi Stuart,



You don’t need the sortBy or sortWithinPartitions.



https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/6861830365114179/latest.html





This is what the job should look like:



[cid:image001.png@01D23CD0.56997F50]

On 11/12/16, 8:40 AM, "Stuart White" <st...@gmail.com>> wrote:



    Thanks for the reply.



    I understand that I need to use bucketBy() to write my master file,

    but I still can't seem to make it work as expected.  Here's a code

    example for how I'm writing my master file:



    Range(0, 1000000)

      .map(i => (i, s"master_$i"))

      .toDF("key", "value")

      .write

      .format("json")

      .bucketBy(3, "key")

      .sortBy("key")

      .saveAsTable("master")



    And here's how I'm reading it later and attempting to join to a

    transaction dataset:



    val master = spark

      .read

      .format("json")

      .json("spark-warehouse/master")

      .cache



    val transaction = Range(0, 1000000)

      .map(i => (i, s"transaction_$i"))

      .toDF("key", "value")

      .repartition(3, 'key)

      .sortWithinPartitions('key)

      .cache



    val results = master.join(transaction, "key")



    When I call results.explain(), I see that it is sorting both datasets

    before sending them through SortMergeJoin.



    == Physical Plan ==

    *Project [key#0L, value#1, value#53]

    +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner

      :- *Sort [key#0L ASC], false, 0

       :  +- Exchange hashpartitioning(key#0L, 200)

       :     +- *Filter isnotnull(key#0L)

       :        +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]

       :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,

    StorageLevel(disk, memory, deserialized, 1 replicas)

       :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,

    InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],

    PushedFilters: [], ReadSchema: struct<key:bigint,value:string>

       +- *Sort [cast(key#52 as bigint) ASC], false, 0

          +- Exchange hashpartitioning(cast(key#52 as bigint), 200)

             +- InMemoryTableScan [key#52, value#53]

                :  +- InMemoryRelation [key#52, value#53], true, 10000,

    StorageLevel(disk, memory, deserialized, 1 replicas)

                :     :  +- *Sort [key#52 ASC], false, 0

                :     :     +- Exchange hashpartitioning(key#52, 3)

                :     :        +- LocalTableScan [key#52, value#53]



    Here are my thoughts:

    1. I think I'm probably reading the master file back into memory

    incorrectly.  I think maybe I should be reading it as a Hive table

    rather than just a plain json file, but I can't seem to figure out how

    to do that.

    2. I don't understand exactly when partition counts/bucket counts are

    important.  For example, in this example, at the time it's written,

    master has 1 partition and is written into 3 buckets, resulting in 3

    files being written out.  Later when I generated my transaction

    dataset, I repartitioned it into 3 partitions.  Was that the correct

    thing to do (3 transaction partitions == 3 master buckets)?  Or should

    I have repartitioned master into 3 partitions before writing

    (resulting in 9 files if I still create 3 buckets)?  Basically, I

    don't understand how partitions and buckets should be handled.



    So, I feel like I'm close, but there are a few ways in which I don't

    understand how these pieces are supposed to fit together.  If this is

    explained somewhere, with a simple example, that would be great.




Re: Joining to a large, pre-sorted file

Posted by Stuart White <st...@gmail.com>.
Hi Silvio,

Thanks very much for the response!

I'm pretty new at reading explain plans, so maybe I'm misunderstanding what
I'm seeing.

Remember my goal is to sort master, write it out, later read it back in and
have Spark "remember" that it's sorted, so I can do joins and Spark will
not sort it again.

Looking at the explain plan for the example job you provided, it looks to
me like Spark is re-sorted master after reading it back in.  See the
attachment for the Sort step I'm referring to.

Am I misunderstanding the explain plan?

Thanks again!

On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito <
silvio.fiorito@granturing.com> wrote:

> Hi Stuart,
>
>
>
> You don’t need the sortBy or sortWithinPartitions.
>
>
>
> https://databricks-prod-cloudfront.cloud.databricks.com/public/
> 4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/
> 6861830365114179/latest.html
>
>
>
>
>
> This is what the job should look like:
>
>
>
> On 11/12/16, 8:40 AM, "Stuart White" <st...@gmail.com> wrote:
>
>
>
>     Thanks for the reply.
>
>
>
>     I understand that I need to use bucketBy() to write my master file,
>
>     but I still can't seem to make it work as expected.  Here's a code
>
>     example for how I'm writing my master file:
>
>
>
>     Range(0, 1000000)
>
>       .map(i => (i, s"master_$i"))
>
>       .toDF("key", "value")
>
>       .write
>
>       .format("json")
>
>       .bucketBy(3, "key")
>
>       .sortBy("key")
>
>       .saveAsTable("master")
>
>
>
>     And here's how I'm reading it later and attempting to join to a
>
>     transaction dataset:
>
>
>
>     val master = spark
>
>       .read
>
>       .format("json")
>
>       .json("spark-warehouse/master")
>
>       .cache
>
>
>
>     val transaction = Range(0, 1000000)
>
>       .map(i => (i, s"transaction_$i"))
>
>       .toDF("key", "value")
>
>       .repartition(3, 'key)
>
>       .sortWithinPartitions('key)
>
>       .cache
>
>
>
>     val results = master.join(transaction, "key")
>
>
>
>     When I call results.explain(), I see that it is sorting both datasets
>
>     before sending them through SortMergeJoin.
>
>
>
>     == Physical Plan ==
>
>     *Project [key#0L, value#1, value#53]
>
>     +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner
>
>       :- *Sort [key#0L ASC], false, 0
>
>        :  +- Exchange hashpartitioning(key#0L, 200)
>
>        :     +- *Filter isnotnull(key#0L)
>
>        :        +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]
>
>        :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,
>
>     StorageLevel(disk, memory, deserialized, 1 replicas)
>
>        :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,
>
>     InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],
>
>     PushedFilters: [], ReadSchema: struct<key:bigint,value:string>
>
>        +- *Sort [cast(key#52 as bigint) ASC], false, 0
>
>           +- Exchange hashpartitioning(cast(key#52 as bigint), 200)
>
>              +- InMemoryTableScan [key#52, value#53]
>
>                 :  +- InMemoryRelation [key#52, value#53], true, 10000,
>
>     StorageLevel(disk, memory, deserialized, 1 replicas)
>
>                 :     :  +- *Sort [key#52 ASC], false, 0
>
>                 :     :     +- Exchange hashpartitioning(key#52, 3)
>
>                 :     :        +- LocalTableScan [key#52, value#53]
>
>
>
>     Here are my thoughts:
>
>     1. I think I'm probably reading the master file back into memory
>
>     incorrectly.  I think maybe I should be reading it as a Hive table
>
>     rather than just a plain json file, but I can't seem to figure out how
>
>     to do that.
>
>     2. I don't understand exactly when partition counts/bucket counts are
>
>     important.  For example, in this example, at the time it's written,
>
>     master has 1 partition and is written into 3 buckets, resulting in 3
>
>     files being written out.  Later when I generated my transaction
>
>     dataset, I repartitioned it into 3 partitions.  Was that the correct
>
>     thing to do (3 transaction partitions == 3 master buckets)?  Or should
>
>     I have repartitioned master into 3 partitions before writing
>
>     (resulting in 9 files if I still create 3 buckets)?  Basically, I
>
>     don't understand how partitions and buckets should be handled.
>
>
>
>     So, I feel like I'm close, but there are a few ways in which I don't
>
>     understand how these pieces are supposed to fit together.  If this is
>
>     explained somewhere, with a simple example, that would be great.
>
>
>

Re: Joining to a large, pre-sorted file

Posted by Silvio Fiorito <si...@granturing.com>.
Hi Stuart,



You don’t need the sortBy or sortWithinPartitions.



https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/6861830365114179/latest.html





This is what the job should look like:



[cid:image001.png@01D23CD0.56997F50]

On 11/12/16, 8:40 AM, "Stuart White" <st...@gmail.com> wrote:



    Thanks for the reply.



    I understand that I need to use bucketBy() to write my master file,

    but I still can't seem to make it work as expected.  Here's a code

    example for how I'm writing my master file:



    Range(0, 1000000)

      .map(i => (i, s"master_$i"))

      .toDF("key", "value")

      .write

      .format("json")

      .bucketBy(3, "key")

      .sortBy("key")

      .saveAsTable("master")



    And here's how I'm reading it later and attempting to join to a

    transaction dataset:



    val master = spark

      .read

      .format("json")

      .json("spark-warehouse/master")

      .cache



    val transaction = Range(0, 1000000)

      .map(i => (i, s"transaction_$i"))

      .toDF("key", "value")

      .repartition(3, 'key)

      .sortWithinPartitions('key)

      .cache



    val results = master.join(transaction, "key")



    When I call results.explain(), I see that it is sorting both datasets

    before sending them through SortMergeJoin.



    == Physical Plan ==

    *Project [key#0L, value#1, value#53]

    +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner

      :- *Sort [key#0L ASC], false, 0

       :  +- Exchange hashpartitioning(key#0L, 200)

       :     +- *Filter isnotnull(key#0L)

       :        +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]

       :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,

    StorageLevel(disk, memory, deserialized, 1 replicas)

       :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,

    InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],

    PushedFilters: [], ReadSchema: struct<key:bigint,value:string>

       +- *Sort [cast(key#52 as bigint) ASC], false, 0

          +- Exchange hashpartitioning(cast(key#52 as bigint), 200)

             +- InMemoryTableScan [key#52, value#53]

                :  +- InMemoryRelation [key#52, value#53], true, 10000,

    StorageLevel(disk, memory, deserialized, 1 replicas)

                :     :  +- *Sort [key#52 ASC], false, 0

                :     :     +- Exchange hashpartitioning(key#52, 3)

                :     :        +- LocalTableScan [key#52, value#53]



    Here are my thoughts:

    1. I think I'm probably reading the master file back into memory

    incorrectly.  I think maybe I should be reading it as a Hive table

    rather than just a plain json file, but I can't seem to figure out how

    to do that.

    2. I don't understand exactly when partition counts/bucket counts are

    important.  For example, in this example, at the time it's written,

    master has 1 partition and is written into 3 buckets, resulting in 3

    files being written out.  Later when I generated my transaction

    dataset, I repartitioned it into 3 partitions.  Was that the correct

    thing to do (3 transaction partitions == 3 master buckets)?  Or should

    I have repartitioned master into 3 partitions before writing

    (resulting in 9 files if I still create 3 buckets)?  Basically, I

    don't understand how partitions and buckets should be handled.



    So, I feel like I'm close, but there are a few ways in which I don't

    understand how these pieces are supposed to fit together.  If this is

    explained somewhere, with a simple example, that would be great.



Re: Joining to a large, pre-sorted file

Posted by Stuart White <st...@gmail.com>.
Thanks for the reply.

I understand that I need to use bucketBy() to write my master file,
but I still can't seem to make it work as expected.  Here's a code
example for how I'm writing my master file:

Range(0, 1000000)
  .map(i => (i, s"master_$i"))
  .toDF("key", "value")
  .write
  .format("json")
  .bucketBy(3, "key")
  .sortBy("key")
  .saveAsTable("master")

And here's how I'm reading it later and attempting to join to a
transaction dataset:

val master = spark
  .read
  .format("json")
  .json("spark-warehouse/master")
  .cache

val transaction = Range(0, 1000000)
  .map(i => (i, s"transaction_$i"))
  .toDF("key", "value")
  .repartition(3, 'key)
  .sortWithinPartitions('key)
  .cache

val results = master.join(transaction, "key")

When I call results.explain(), I see that it is sorting both datasets
before sending them through SortMergeJoin.

== Physical Plan ==
*Project [key#0L, value#1, value#53]
+- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner
   :- *Sort [key#0L ASC], false, 0
   :  +- Exchange hashpartitioning(key#0L, 200)
   :     +- *Filter isnotnull(key#0L)
   :        +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]
   :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas)
   :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,
InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<key:bigint,value:string>
   +- *Sort [cast(key#52 as bigint) ASC], false, 0
      +- Exchange hashpartitioning(cast(key#52 as bigint), 200)
         +- InMemoryTableScan [key#52, value#53]
            :  +- InMemoryRelation [key#52, value#53], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas)
            :     :  +- *Sort [key#52 ASC], false, 0
            :     :     +- Exchange hashpartitioning(key#52, 3)
            :     :        +- LocalTableScan [key#52, value#53]

Here are my thoughts:
1. I think I'm probably reading the master file back into memory
incorrectly.  I think maybe I should be reading it as a Hive table
rather than just a plain json file, but I can't seem to figure out how
to do that.
2. I don't understand exactly when partition counts/bucket counts are
important.  For example, in this example, at the time it's written,
master has 1 partition and is written into 3 buckets, resulting in 3
files being written out.  Later when I generated my transaction
dataset, I repartitioned it into 3 partitions.  Was that the correct
thing to do (3 transaction partitions == 3 master buckets)?  Or should
I have repartitioned master into 3 partitions before writing
(resulting in 9 files if I still create 3 buckets)?  Basically, I
don't understand how partitions and buckets should be handled.

So, I feel like I'm close, but there are a few ways in which I don't
understand how these pieces are supposed to fit together.  If this is
explained somewhere, with a simple example, that would be great.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Joining to a large, pre-sorted file

Posted by Silvio Fiorito <si...@granturing.com>.
You want to look at the bucketBy option when you save the master file out. That way it will be pre-partitioned by the join column, eliminating the shuffle on the larger file.



From: Stuart White <st...@gmail.com>
Date: Thursday, November 10, 2016 at 8:39 PM
To: Jörn Franke <jo...@gmail.com>
Cc: "user@spark.apache.org" <us...@spark.apache.org>
Subject: Re: Joining to a large, pre-sorted file

Yes.  In my original question, when I said I wanted to pre-sort the master file, I should have said "pre-sort and pre-partition the file".
Years ago, I did this with Hadoop MapReduce.  I pre-sorted/partitioned the master file into N partitions.  Then, when a transaction file would arrive, I would sort/partition the transaction file on the join key into N partitions.  Then I could perform what was called a mapside join.
Basically, I want to do the same thing in Spark.  And it looks like all the pieces to accomplish this exist, but I can't figure out how to connect all the dots.  It seems like this functionality is pretty new so there aren't a lot of examples available.

On Thu, Nov 10, 2016 at 7:33 PM, Jörn Franke <jo...@gmail.com>> wrote:
Can you split the files beforehand in several files (e.g. By the column you do the join on?) ?

On 10 Nov 2016, at 23:45, Stuart White <st...@gmail.com>> wrote:
I have a large "master" file (~700m records) that I frequently join smaller "transaction" files to.  (The transaction files have 10's of millions of records, so too large for a broadcast join).
I would like to pre-sort the master file, write it to disk, and then, in subsequent jobs, read the file off disk and join to it without having to re-sort it.  I'm using Spark SQL, and my understanding is that the Spark Catalyst Optimizer will choose an optimal join algorithm if it is aware that the datasets are sorted.  So, the trick is to make the optimizer aware that the master file is already sorted.
I think SPARK-12394<https://issues.apache.org/jira/browse/SPARK-12394> provides this functionality, but I can't seem to put the pieces together for how to use it.
Could someone possibly provide a simple example of how to:

  1.  Sort a master file by a key column and write it to disk in such a way that its "sorted-ness" is preserved.
  2.  In a later job, read a transaction file, sort/partition it as necessary.  Read the master file, preserving its sorted-ness.  Join the two DataFrames in such a way that the master rows are not sorted again.
Thanks!


Re: Joining to a large, pre-sorted file

Posted by Stuart White <st...@gmail.com>.
Yes.  In my original question, when I said I wanted to pre-sort the master
file, I should have said "pre-sort and pre-partition the file".

Years ago, I did this with Hadoop MapReduce.  I pre-sorted/partitioned the
master file into N partitions.  Then, when a transaction file would arrive,
I would sort/partition the transaction file on the join key into N
partitions.  Then I could perform what was called a mapside join.

Basically, I want to do the same thing in Spark.  And it looks like all the
pieces to accomplish this exist, but I can't figure out how to connect all
the dots.  It seems like this functionality is pretty new so there aren't a
lot of examples available.


On Thu, Nov 10, 2016 at 7:33 PM, Jörn Franke <jo...@gmail.com> wrote:

> Can you split the files beforehand in several files (e.g. By the column
> you do the join on?) ?
>
> On 10 Nov 2016, at 23:45, Stuart White <st...@gmail.com> wrote:
>
> I have a large "master" file (~700m records) that I frequently join
> smaller "transaction" files to.  (The transaction files have 10's of
> millions of records, so too large for a broadcast join).
>
> I would like to pre-sort the master file, write it to disk, and then, in
> subsequent jobs, read the file off disk and join to it without having to
> re-sort it.  I'm using Spark SQL, and my understanding is that the Spark
> Catalyst Optimizer will choose an optimal join algorithm if it is aware
> that the datasets are sorted.  So, the trick is to make the optimizer aware
> that the master file is already sorted.
>
> I think SPARK-12394 <https://issues.apache.org/jira/browse/SPARK-12394>
> provides this functionality, but I can't seem to put the pieces together
> for how to use it.
>
> Could someone possibly provide a simple example of how to:
>
>    1. Sort a master file by a key column and write it to disk in such a
>    way that its "sorted-ness" is preserved.
>    2. In a later job, read a transaction file, sort/partition it as
>    necessary.  Read the master file, preserving its sorted-ness.  Join the two
>    DataFrames in such a way that the master rows are not sorted again.
>
> Thanks!
>
>

Re: Joining to a large, pre-sorted file

Posted by Jörn Franke <jo...@gmail.com>.
Can you split the files beforehand in several files (e.g. By the column you do the join on?) ? 

> On 10 Nov 2016, at 23:45, Stuart White <st...@gmail.com> wrote:
> 
> I have a large "master" file (~700m records) that I frequently join smaller "transaction" files to.  (The transaction files have 10's of millions of records, so too large for a broadcast join).
> 
> I would like to pre-sort the master file, write it to disk, and then, in subsequent jobs, read the file off disk and join to it without having to re-sort it.  I'm using Spark SQL, and my understanding is that the Spark Catalyst Optimizer will choose an optimal join algorithm if it is aware that the datasets are sorted.  So, the trick is to make the optimizer aware that the master file is already sorted.
> 
> I think SPARK-12394 provides this functionality, but I can't seem to put the pieces together for how to use it. 
> 
> Could someone possibly provide a simple example of how to:
> Sort a master file by a key column and write it to disk in such a way that its "sorted-ness" is preserved.
> In a later job, read a transaction file, sort/partition it as necessary.  Read the master file, preserving its sorted-ness.  Join the two DataFrames in such a way that the master rows are not sorted again.
> Thanks!
>