You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by "Kruger, Scott" <sc...@paypal.com.INVALID> on 2020/11/20 22:40:48 UTC

Bucket partitioning in addition to regular partitioning

I want to have a table that’s partitioned by the following, in order:


  *   Low-cardinality identity
  *   Day
  *   Bucketed long ID, 16 buckets

Is this possible? If so, how should I do the dataframe write? This is what I’ve tried so far:


  1.  df.orderBy(“identity”, “day”).sortWithinPartitions(expr(“iceberg_bucket16(id)”))
  2.  df.orderBy(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  3.  df.repartition(“identity”, “day”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  4.  df.repartition(“identity”,  “day”, “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  5.  df.repartitionByRange(“identity”, “day”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  6.  df.repartitionByRange(“identity”,  “day”, “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))

But I keep getting the error indicating that a partition has already been closed.

Re: Bucket partitioning in addition to regular partitioning

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Data needs to be clustered so the Iceberg writer receives data for one
table partition at a time. If it isn't clustered, Iceberg would need to
either keep multiple files open (for all unfinished partitions) or would
need to close and open new files for the same partition resulting in small
files.

To do that, you need to make sure the data fed to the writer are grouped or
clustered by the Iceberg partition. The easiest way to do that is a global
ORDER BY with your partition expressions, or something equivalent. You can
also locally sort, but I recommend the global sort.

On Tue, Nov 24, 2020 at 1:39 PM Kruger, Scott <sc...@paypal.com> wrote:

> By “task receives data clustered by partition”, do you mean that I should
> repartition using the same colums I order by? For example:
>
>
>
> df
>
>   .repartition(col(“category”), col(“ts”), expr(“iceberg_bucket16(id)”))
>
>   .orderBy(col(“category”), col(“ts”), expr(“iceberg_bucket16(id)”))
>
>
>
> …or am I misunderstanding what you’re saying? FWIW this is spark 2.4.x
> with Iceberg 0.10.0 using the dataframe API.
>
>
>
> *From: *Ryan Blue <rb...@netflix.com>
> *Reply-To: *"rblue@netflix.com" <rb...@netflix.com>
> *Date: *Tuesday, November 24, 2020 at 11:47 AM
> *To: *"Kruger, Scott" <sc...@paypal.com>
> *Cc: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
> *Subject: *Re: Bucket partitioning in addition to regular partitioning
>
>
>
> This message contains hyperlinks, take precaution before opening these
> links.
>
> It should work if you use `ORDER BY category, ts, iceberg_bucket16(id)`.
> You just need to ensure that each task receives data clustered by partition.
>
>
>
> On Tue, Nov 24, 2020 at 7:25 AM Kruger, Scott <sc...@paypal.com> wrote:
>
> I did register the bucket UDF (you can see me using it in the examples),
> and the docs were helpful to an extent, but the issue is that it only shows
> how to use bucketing when it’s the only partitioning scheme, not the
> innermost of a multi-level partitioning scheme. That’s what I’m having
> trouble with (I can get things to work just fine if I follow the docs and
> only partition by the bucketed ID).
>
>
>
> *From: *Ryan Blue <rb...@netflix.com.INVALID>
> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>, "
> rblue@netflix.com" <rb...@netflix.com>
> *Date: *Friday, November 20, 2020 at 8:11 PM
> *To: *Iceberg Dev List <de...@iceberg.apache.org>
> *Subject: *Re: Bucket partitioning in addition to regular partitioning
>
>
>
> This message contains hyperlinks, take precaution before opening these
> links.
>
> Hi Scott,
>
>
>
> There are some docs to help with this situation:
> https://iceberg.apache.org/spark/#writing-against-partitioned-table
> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Ficeberg.apache.org%2Fspark%2F%23writing-against-partitioned-table&data=04%7C01%7Csckruger%40paypal.com%7C8d1735d1b2db43e157f308d890a10392%7Cfb00791460204374977e21bac5f3f4c8%7C1%7C0%7C637418368515916452%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=QghhQr2lLuGjaxSbgvzOzqGpIZf9KMgXbhfy%2BIEhgA4%3D&reserved=0>
>
>
>
> We added a helper function, IcebergSpark.registerBucketUDF, to register
> the UDF that you need for the bucket column. That's probably the source of
> the problem.
>
>
>
> I always recommend an orderBy with the partition expressions to write.
> Spark seems to do best when it produces a global ordering.
>
>
>
> rb
>
>
>
> On Fri, Nov 20, 2020 at 2:40 PM Kruger, Scott <sc...@paypal.com.invalid>
> wrote:
>
> I want to have a table that’s partitioned by the following, in order:
>
>
>
>    - Low-cardinality identity
>    - Day
>    - Bucketed long ID, 16 buckets
>
>
>
> Is this possible? If so, how should I do the dataframe write? This is what
> I’ve tried so far:
>
>
>
>    1. df.orderBy(“identity”,
>    “day”).sortWithinPartitions(expr(“iceberg_bucket16(id)”))
>    2. df.orderBy(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>    3. df.repartition(“identity”, “day”).sortWithinPartitions(“identity”,
>    “day”, expr(“iceberg_bucket16(id)”))
>    4. df.repartition(“identity”,  “day”,
>    “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>    5. df.repartitionByRange(“identity”,
>    “day”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>    6. df.repartitionByRange(“identity”,  “day”,
>    “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>
>
>
> But I keep getting the error indicating that a partition has already been
> closed.
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Bucket partitioning in addition to regular partitioning

Posted by "Kruger, Scott" <sc...@paypal.com.INVALID>.
By “task receives data clustered by partition”, do you mean that I should repartition using the same colums I order by? For example:

df
  .repartition(col(“category”), col(“ts”), expr(“iceberg_bucket16(id)”))
  .orderBy(col(“category”), col(“ts”), expr(“iceberg_bucket16(id)”))

…or am I misunderstanding what you’re saying? FWIW this is spark 2.4.x with Iceberg 0.10.0 using the dataframe API.

From: Ryan Blue <rb...@netflix.com>
Reply-To: "rblue@netflix.com" <rb...@netflix.com>
Date: Tuesday, November 24, 2020 at 11:47 AM
To: "Kruger, Scott" <sc...@paypal.com>
Cc: "dev@iceberg.apache.org" <de...@iceberg.apache.org>
Subject: Re: Bucket partitioning in addition to regular partitioning

This message contains hyperlinks, take precaution before opening these links.
It should work if you use `ORDER BY category, ts, iceberg_bucket16(id)`. You just need to ensure that each task receives data clustered by partition.

On Tue, Nov 24, 2020 at 7:25 AM Kruger, Scott <sc...@paypal.com>> wrote:
I did register the bucket UDF (you can see me using it in the examples), and the docs were helpful to an extent, but the issue is that it only shows how to use bucketing when it’s the only partitioning scheme, not the innermost of a multi-level partitioning scheme. That’s what I’m having trouble with (I can get things to work just fine if I follow the docs and only partition by the bucketed ID).

From: Ryan Blue <rb...@netflix.com.INVALID>
Reply-To: "dev@iceberg.apache.org<ma...@iceberg.apache.org>" <de...@iceberg.apache.org>>, "rblue@netflix.com<ma...@netflix.com>" <rb...@netflix.com>>
Date: Friday, November 20, 2020 at 8:11 PM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Subject: Re: Bucket partitioning in addition to regular partitioning

This message contains hyperlinks, take precaution before opening these links.
Hi Scott,

There are some docs to help with this situation: https://iceberg.apache.org/spark/#writing-against-partitioned-table<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Ficeberg.apache.org%2Fspark%2F%23writing-against-partitioned-table&data=04%7C01%7Csckruger%40paypal.com%7C8d1735d1b2db43e157f308d890a10392%7Cfb00791460204374977e21bac5f3f4c8%7C1%7C0%7C637418368515916452%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=QghhQr2lLuGjaxSbgvzOzqGpIZf9KMgXbhfy%2BIEhgA4%3D&reserved=0>

We added a helper function, IcebergSpark.registerBucketUDF, to register the UDF that you need for the bucket column. That's probably the source of the problem.

I always recommend an orderBy with the partition expressions to write. Spark seems to do best when it produces a global ordering.

rb

On Fri, Nov 20, 2020 at 2:40 PM Kruger, Scott <sc...@paypal.com.invalid> wrote:
I want to have a table that’s partitioned by the following, in order:


  *   Low-cardinality identity
  *   Day
  *   Bucketed long ID, 16 buckets

Is this possible? If so, how should I do the dataframe write? This is what I’ve tried so far:


  1.  df.orderBy(“identity”, “day”).sortWithinPartitions(expr(“iceberg_bucket16(id)”))
  2.  df.orderBy(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  3.  df.repartition(“identity”, “day”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  4.  df.repartition(“identity”,  “day”, “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  5.  df.repartitionByRange(“identity”, “day”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  6.  df.repartitionByRange(“identity”,  “day”, “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))

But I keep getting the error indicating that a partition has already been closed.


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix

Re: Bucket partitioning in addition to regular partitioning

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
It should work if you use `ORDER BY category, ts, iceberg_bucket16(id)`.
You just need to ensure that each task receives data clustered by partition.

On Tue, Nov 24, 2020 at 7:25 AM Kruger, Scott <sc...@paypal.com> wrote:

> I did register the bucket UDF (you can see me using it in the examples),
> and the docs were helpful to an extent, but the issue is that it only shows
> how to use bucketing when it’s the only partitioning scheme, not the
> innermost of a multi-level partitioning scheme. That’s what I’m having
> trouble with (I can get things to work just fine if I follow the docs and
> only partition by the bucketed ID).
>
>
>
> *From: *Ryan Blue <rb...@netflix.com.INVALID>
> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>, "
> rblue@netflix.com" <rb...@netflix.com>
> *Date: *Friday, November 20, 2020 at 8:11 PM
> *To: *Iceberg Dev List <de...@iceberg.apache.org>
> *Subject: *Re: Bucket partitioning in addition to regular partitioning
>
>
>
> This message contains hyperlinks, take precaution before opening these
> links.
>
> Hi Scott,
>
>
>
> There are some docs to help with this situation:
> https://iceberg.apache.org/spark/#writing-against-partitioned-table
> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Ficeberg.apache.org%2Fspark%2F%23writing-against-partitioned-table&data=04%7C01%7Csckruger%40paypal.com%7C7f069b9f8f34493744a708d88dc2b53e%7Cfb00791460204374977e21bac5f3f4c8%7C1%7C0%7C637415214691902926%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=kV1aAAall4L5Nv7z%2BSOnlkoOKtz4LWjp4SHrmMPTPpE%3D&reserved=0>
>
>
>
> We added a helper function, IcebergSpark.registerBucketUDF, to register
> the UDF that you need for the bucket column. That's probably the source of
> the problem.
>
>
>
> I always recommend an orderBy with the partition expressions to write.
> Spark seems to do best when it produces a global ordering.
>
>
>
> rb
>
>
>
> On Fri, Nov 20, 2020 at 2:40 PM Kruger, Scott <sc...@paypal.com.invalid>
> wrote:
>
> I want to have a table that’s partitioned by the following, in order:
>
>
>
>    - Low-cardinality identity
>    - Day
>    - Bucketed long ID, 16 buckets
>
>
>
> Is this possible? If so, how should I do the dataframe write? This is what
> I’ve tried so far:
>
>
>
>    1. df.orderBy(“identity”,
>    “day”).sortWithinPartitions(expr(“iceberg_bucket16(id)”))
>    2. df.orderBy(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>    3. df.repartition(“identity”, “day”).sortWithinPartitions(“identity”,
>    “day”, expr(“iceberg_bucket16(id)”))
>    4. df.repartition(“identity”,  “day”,
>    “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>    5. df.repartitionByRange(“identity”,
>    “day”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>    6. df.repartitionByRange(“identity”,  “day”,
>    “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>
>
>
> But I keep getting the error indicating that a partition has already been
> closed.
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Bucket partitioning in addition to regular partitioning

Posted by "Kruger, Scott" <sc...@paypal.com.INVALID>.
I did register the bucket UDF (you can see me using it in the examples), and the docs were helpful to an extent, but the issue is that it only shows how to use bucketing when it’s the only partitioning scheme, not the innermost of a multi-level partitioning scheme. That’s what I’m having trouble with (I can get things to work just fine if I follow the docs and only partition by the bucketed ID).

From: Ryan Blue <rb...@netflix.com.INVALID>
Reply-To: "dev@iceberg.apache.org" <de...@iceberg.apache.org>, "rblue@netflix.com" <rb...@netflix.com>
Date: Friday, November 20, 2020 at 8:11 PM
To: Iceberg Dev List <de...@iceberg.apache.org>
Subject: Re: Bucket partitioning in addition to regular partitioning

This message contains hyperlinks, take precaution before opening these links.
Hi Scott,

There are some docs to help with this situation: https://iceberg.apache.org/spark/#writing-against-partitioned-table<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Ficeberg.apache.org%2Fspark%2F%23writing-against-partitioned-table&data=04%7C01%7Csckruger%40paypal.com%7C7f069b9f8f34493744a708d88dc2b53e%7Cfb00791460204374977e21bac5f3f4c8%7C1%7C0%7C637415214691902926%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=kV1aAAall4L5Nv7z%2BSOnlkoOKtz4LWjp4SHrmMPTPpE%3D&reserved=0>

We added a helper function, IcebergSpark.registerBucketUDF, to register the UDF that you need for the bucket column. That's probably the source of the problem.

I always recommend an orderBy with the partition expressions to write. Spark seems to do best when it produces a global ordering.

rb

On Fri, Nov 20, 2020 at 2:40 PM Kruger, Scott <sc...@paypal.com.invalid> wrote:
I want to have a table that’s partitioned by the following, in order:


  *   Low-cardinality identity
  *   Day
  *   Bucketed long ID, 16 buckets

Is this possible? If so, how should I do the dataframe write? This is what I’ve tried so far:


  1.  df.orderBy(“identity”, “day”).sortWithinPartitions(expr(“iceberg_bucket16(id)”))
  2.  df.orderBy(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  3.  df.repartition(“identity”, “day”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  4.  df.repartition(“identity”,  “day”, “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  5.  df.repartitionByRange(“identity”, “day”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
  6.  df.repartitionByRange(“identity”,  “day”, “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))

But I keep getting the error indicating that a partition has already been closed.


--
Ryan Blue
Software Engineer
Netflix

Re: Bucket partitioning in addition to regular partitioning

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Hi Scott,

There are some docs to help with this situation:
https://iceberg.apache.org/spark/#writing-against-partitioned-table

We added a helper function, IcebergSpark.registerBucketUDF, to register the
UDF that you need for the bucket column. That's probably the source of the
problem.

I always recommend an orderBy with the partition expressions to write.
Spark seems to do best when it produces a global ordering.

rb

On Fri, Nov 20, 2020 at 2:40 PM Kruger, Scott <sc...@paypal.com.invalid>
wrote:

> I want to have a table that’s partitioned by the following, in order:
>
>
>
>    - Low-cardinality identity
>    - Day
>    - Bucketed long ID, 16 buckets
>
>
>
> Is this possible? If so, how should I do the dataframe write? This is what
> I’ve tried so far:
>
>
>
>    1. df.orderBy(“identity”,
>    “day”).sortWithinPartitions(expr(“iceberg_bucket16(id)”))
>    2. df.orderBy(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>    3. df.repartition(“identity”, “day”).sortWithinPartitions(“identity”,
>    “day”, expr(“iceberg_bucket16(id)”))
>    4. df.repartition(“identity”,  “day”,
>    “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>    5. df.repartitionByRange(“identity”,
>    “day”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>    6. df.repartitionByRange(“identity”,  “day”,
>    “id”).sortWithinPartitions(“identity”, “day”, expr(“iceberg_bucket16(id)”))
>
>
>
> But I keep getting the error indicating that a partition has already been
> closed.
>


-- 
Ryan Blue
Software Engineer
Netflix