You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by suds <su...@gmail.com> on 2019/11/26 19:17:24 UTC

passing clustering spec to datasource v2

I looked at open issue and discussion around sort spec
https://github.com/apache/incubator-iceberg/issues/317

for now we have added sort spec external to iceberg and made it work by
adding additional logic to sort dataframe before writing to iceberg table (
its a hack until above issue gets resolved)

I am trying to see if I can use sorted data to some how hint join operation
that data is presorted.

v1 datasource has ability to pass bucketSpec and Hive and spark bucked
table use this feature , so that join operation can use sortmerge join and
no additional sort step is needed.

class HadoopFsRelation(
    location: FileIndex,
    partitionSchema: StructType,
    dataSchema: StructType,
    bucketSpec: Option[BucketSpec],
    fileFormat: FileFormat,
    options: Map[String, String])(val sparkSession: SparkSession)
  extends BaseRelation with FileRelation


does anyone on this forum looked into V2 api and how similar hint can be
passed? I can work on creating proof of concept PR for sort spec but I am
not able to find support for sort spec in V2 api.

I also tried to use another hack using following code which seems to show
sortMergeJoin is used but for some reason sort within partition is taking
too long ( assuming spark uses timsort I was expecting it to be no-op)

val df1 = readIcebergTable("table1").sortWithinPartitions(col("col1")).cache()

val df2 = readIcebergTable("table2").sortWithinPartitions(col("col1")).cache()

val finalDF = df1.join(df2, df1("col1") === df2("col1"))


Any suggestions to make join work without additional sort?


--
Thanks

Re: passing clustering spec to datasource v2

Posted by suds <su...@gmail.com>.
Thanks for reply Anton,

I was looking more into spark part. I see that V1 Api has support for
adding sort spec but I can't find similar API in V2. can you give me any
pointers how to add sort spec support for iceberg datasource?

On Wed, Dec 4, 2019 at 5:15 AM Anton Okolnychyi
<ao...@apple.com.invalid> wrote:

> Hi,
>
> Bucketed joins are on the roadmap. I think [1] gives a pretty good summary
> of how that should look like.
> I believe the only remaining part in Iceberg is to add the sort spec (in
> progress). Then we can switch to the Spark part.
>
> -- Anton
>
> [1] -
> https://github.com/apache/incubator-iceberg/issues/430#issuecomment-533360026
>
>
> > On 26 Nov 2019, at 21:17, suds <su...@gmail.com> wrote:
> >
> > I looked at open issue and discussion around sort spec
> https://github.com/apache/incubator-iceberg/issues/317
> >
> > for now we have added sort spec external to iceberg and made it work by
> adding additional logic to sort dataframe before writing to iceberg table (
> its a hack until above issue gets resolved)
> >
> > I am trying to see if I can use sorted data to some how hint join
> operation that data is presorted.
> >
> > v1 datasource has ability to pass bucketSpec and Hive and spark bucked
> table use this feature , so that join operation can use sortmerge join and
> no additional sort step is needed.
> >
> > class HadoopFsRelation(
> >     location: FileIndex,
> >     partitionSchema: StructType,
> >     dataSchema: StructType,
> >     bucketSpec: Option[BucketSpec],
> >     fileFormat: FileFormat,
> >     options: Map[String, String])(val sparkSession: SparkSession)
> >   extends BaseRelation with FileRelation
> >
> > does anyone on this forum looked into V2 api and how similar hint can be
> passed? I can work on creating proof of concept PR for sort spec but I am
> not able to find support for sort spec in V2 api.
> >
> > I also tried to use another hack using following code which seems to
> show sortMergeJoin is used but for some reason sort within partition is
> taking too long ( assuming spark uses timsort I was expecting it to be
> no-op)
> >
> > val df1 =
> readIcebergTable("table1").sortWithinPartitions(col("col1")).cache()
> >
> > val df2 =
> readIcebergTable("table2").sortWithinPartitions(col("col1")).cache()
> >
> > val finalDF = df1.join(df2, df1("col1") === df2("col1"))
> >
> > Any suggestions to make join work without additional sort?
> >
> >
> > --
> > Thanks
> >
> >
> >
> >
> >
> >
> >
>
>

Re: passing clustering spec to datasource v2

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
The link from Anton does have a good description of the Iceberg side. And I
have a little more detail on the Spark side now.

Spark has _very_ limited support for bucketed joins. The only thing it can
do is avoid a shuffle for data that was written out using Spark's internal
hash partitioning function by gathering all files for a particular hash
into a partition. If the number of buckets doesn't match the join
parallelism, it doesn't work and It can't skip the sort-merge join's sort
phase. So Spark is going to need quite a bit of work to support bucketed
joins. I'm hoping that we can get it working in Spark 3.1.

On Wed, Dec 4, 2019 at 5:15 AM Anton Okolnychyi
<ao...@apple.com.invalid> wrote:

> Hi,
>
> Bucketed joins are on the roadmap. I think [1] gives a pretty good summary
> of how that should look like.
> I believe the only remaining part in Iceberg is to add the sort spec (in
> progress). Then we can switch to the Spark part.
>
> -- Anton
>
> [1] -
> https://github.com/apache/incubator-iceberg/issues/430#issuecomment-533360026
>
>
> > On 26 Nov 2019, at 21:17, suds <su...@gmail.com> wrote:
> >
> > I looked at open issue and discussion around sort spec
> https://github.com/apache/incubator-iceberg/issues/317
> >
> > for now we have added sort spec external to iceberg and made it work by
> adding additional logic to sort dataframe before writing to iceberg table (
> its a hack until above issue gets resolved)
> >
> > I am trying to see if I can use sorted data to some how hint join
> operation that data is presorted.
> >
> > v1 datasource has ability to pass bucketSpec and Hive and spark bucked
> table use this feature , so that join operation can use sortmerge join and
> no additional sort step is needed.
> >
> > class HadoopFsRelation(
> >     location: FileIndex,
> >     partitionSchema: StructType,
> >     dataSchema: StructType,
> >     bucketSpec: Option[BucketSpec],
> >     fileFormat: FileFormat,
> >     options: Map[String, String])(val sparkSession: SparkSession)
> >   extends BaseRelation with FileRelation
> >
> > does anyone on this forum looked into V2 api and how similar hint can be
> passed? I can work on creating proof of concept PR for sort spec but I am
> not able to find support for sort spec in V2 api.
> >
> > I also tried to use another hack using following code which seems to
> show sortMergeJoin is used but for some reason sort within partition is
> taking too long ( assuming spark uses timsort I was expecting it to be
> no-op)
> >
> > val df1 =
> readIcebergTable("table1").sortWithinPartitions(col("col1")).cache()
> >
> > val df2 =
> readIcebergTable("table2").sortWithinPartitions(col("col1")).cache()
> >
> > val finalDF = df1.join(df2, df1("col1") === df2("col1"))
> >
> > Any suggestions to make join work without additional sort?
> >
> >
> > --
> > Thanks
> >
> >
> >
> >
> >
> >
> >
>
>

-- 
Ryan Blue
Software Engineer
Netflix

Re: passing clustering spec to datasource v2

Posted by Anton Okolnychyi <ao...@apple.com.INVALID>.
Hi,

Bucketed joins are on the roadmap. I think [1] gives a pretty good summary of how that should look like.
I believe the only remaining part in Iceberg is to add the sort spec (in progress). Then we can switch to the Spark part.

-- Anton

[1] - https://github.com/apache/incubator-iceberg/issues/430#issuecomment-533360026


> On 26 Nov 2019, at 21:17, suds <su...@gmail.com> wrote:
> 
> I looked at open issue and discussion around sort spec https://github.com/apache/incubator-iceberg/issues/317
> 
> for now we have added sort spec external to iceberg and made it work by adding additional logic to sort dataframe before writing to iceberg table ( its a hack until above issue gets resolved)
> 
> I am trying to see if I can use sorted data to some how hint join operation that data is presorted. 
> 
> v1 datasource has ability to pass bucketSpec and Hive and spark bucked table use this feature , so that join operation can use sortmerge join and no additional sort step is needed.
> 
> class HadoopFsRelation(
>     location: FileIndex,
>     partitionSchema: StructType,
>     dataSchema: StructType,
>     bucketSpec: Option[BucketSpec],
>     fileFormat: FileFormat,
>     options: Map[String, String])(val sparkSession: SparkSession)
>   extends BaseRelation with FileRelation
> 
> does anyone on this forum looked into V2 api and how similar hint can be passed? I can work on creating proof of concept PR for sort spec but I am not able to find support for sort spec in V2 api.
> 
> I also tried to use another hack using following code which seems to show sortMergeJoin is used but for some reason sort within partition is taking too long ( assuming spark uses timsort I was expecting it to be no-op)
> 
> val df1 = readIcebergTable("table1").sortWithinPartitions(col("col1")).cache()
> 
> val df2 = readIcebergTable("table2").sortWithinPartitions(col("col1")).cache()
> 
> val finalDF = df1.join(df2, df1("col1") === df2("col1"))
> 
> Any suggestions to make join work without additional sort?
> 
> 
> --
> Thanks
> 
> 
> 
> 
> 
> 
>