You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Anton Okolnychyi <ao...@apple.com.INVALID> on 2020/08/01 00:17:14 UTC

Iceberg community sync notes - 29 July 2020

Hey everyone,

Here are my notes from the last sync. Feel free to add/correct.

Conferences

There are three talks on Iceberg at the Dremio conference.
- "The Future of Intelligent Storage in Big Data" by Dan- "Hiveberg: Integrating Apache Iceberg with the Hive Metastore" by Adrian and Christine
- "Lessons learned from running Apache Iceberg at PB scale" by Anton

Hive integration

Adrien: Found a bug when MR job is launched in distributed mode, @guilload and @rdsr are taking a look at it and will propose a fix soon.
Adrien: It is hard to work with large tables as predicate push-down is not working. Waiting for a PR from @cmathiesen and @massdosage.

Flink integration

Junjie: There is some progress on the Flink sync and the work is split into smaller PRs that are getting merged into master.
Kyle: I’ll be interested to review.

Row-level deletes

Anton: Most of the work for core metadata is done. We have delete manifests, sequence numbers, updated manifest lists.
Junjie: There is progress on readers to project metadata columns like row position in Avro, Parquet, ORC.
Anton: I was supposed to start working on two-phase job planning approach but was distracted by other things. Plan to resume looking into that.
Anton: It seems like points raised by @openinx in the CDC pipelines doc must be resolved before moving on with any implementation.

Could not get more details as neither Ryan nor Zheng was present.

CDC open questions: https://docs.google.com/document/d/1bBKDD4l-pQFXaMb4nOyVK-Sl3N2NTTG37uOCQx8rKVc <https://docs.google.com/document/d/1bBKDD4l-pQFXaMb4nOyVK-Sl3N2NTTG37uOCQx8rKVc>

SQL extensions

Anton: Thanks everyone for the feedback. Looks like we almost have consensus on how that should look like. There is one open question raised by Carl.
Carl: How will the currently proposed approach that relies on stored procedures work with role-based access control? Presto has supports for this.
Anton: We can limit the access to stored procedures but I don’t know how we can limit calling a stored procedure on a particular table if the table name as passed as an argument.
Carl: It feels easier with ALTER TABLE syntax.
Carl: It is better to follow up with the Presto community on this.
Anton: Agreed. It is a blocker to move forward.

Dev list discussion: https://lists.apache.org/thread.html/rb3321727198d65246ec9eb0f938b121ec6fe5dd0face0b2fb899996a%40%3Cdev.iceberg.apache.org%3E <https://lists.apache.org/thread.html/rb3321727198d65246ec9eb0f938b121ec6fe5dd0face0b2fb899996a@%3Cdev.iceberg.apache.org%3E>

SQL extensions doc: https://docs.google.com/document/d/1Nf8c16R2hj4lSc-4sQg4oiUUV_F4XqZKth1woEo6TN8 <https://docs.google.com/document/d/1Nf8c16R2hj4lSc-4sQg4oiUUV_F4XqZKth1woEo6TN8> 

Vectorized reads for Parquet

Anton: Cannot use vectorized reads for tables with identity partitions.
Russell: Working on a fix.

ExpireSnapshotsAction

Russell: Working on an action for expiring snapshots as the current solution is slow for expiring a lot of snapshots. The work is split into multiple PRs that are being merged to master.
Ratandeep: We also face this problem. We will help to review.

Secondary indexes

Miao: Working on a doc for secondary indexes in Iceberg. The solution should be able to support multiple index implementations and should be independent from file formats.
Miao: We have a Bloom filter implementation internally.
Anton: Do you keep a Bloom filter per file?
Miao: Yes.
Anton: Do you store it separately so it can be loaded on demand?
Miao: Yes.
Anton: Bloom filters are too big to be incorporated into the metadata directly but it will be great to be able to load some of them on demand and use during job planning. One of the use cases we were looking to solve is to speed up queries with predicates on the sort key. Right now, we do min/max file pruning but if you have 10-20 possible keys you are looking for (and they cover the full range of values), filtering is not very effective. We want to leverage bloom filters for this task and avoid touching data files completely. Right now, we still have to read a dictionary before we can discard a file. That takes ±1 sec per false data file since query engines have to spin up a task.
Kyle: We have use cases where people are looking for way more than 20 keys.
Anton: May need a very large Bloom filter to get an acceptable false positive ratio if looking for a big number of keys at the same time. At least, 10-20 would be a great start.
Xinli: It would be great to leverage existing bloom filters for file formats that support them (e.g. Parquet, ORC).
Anton: Can we derive a Bloom filter per file based on Bloom filters for row groups?
Miao: Depends on Bloom filter implementation.
Anton: Is it better to keep a list of bloom filters per row group or one per file in the metadata?

Sort spec

Anton: I have submitted a proposal for Spark that should allow data sources to request a specific distribution and ordering on write. Iceberg SortSpec should be based on that. Any feedback on that proposal would help.

Spark proposal: https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs <https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs>
Spark PR: https://github.com/apache/spark/pull/29066 <https://github.com/apache/spark/pull/29066>

Data Compaction

Anton: I am going to submit a new proposal that will be a follow-up to the SQL extensions doc. It should cover sort-based data compaction in addition to bin-packing.


Thanks,
Anton

Re: Iceberg community sync notes - 29 July 2020

Posted by OpenInx <op...@gmail.com>.
Sorry about that I missed the community online sync.

> Flink integration

Let me provide more details about the flink integration progress. The work
we've done:
1.  the data type conversion between flink and iceberg.
2.  Currently we've parquet/avro row readers and writers, but since we've
upgraded flink to 1.11 which have turned its data type row RowData.  so
we ( Junjie -> parquet, Jingsong -> avro, Zheng Hu -> orc) are doing the
refactor to support parquet/avro/orc RowData readers and writers .
3.  We've abstracted the partitioned task writer and unpartitioned task
writer between flink and spark, so that different compute engines can share
it in the write path.
4.  Jingsong has committed a flink catalog implementation,  similar to the
spark catalog.

The next things we will work:
1. The flink sink connector will be composed by two operators in flink. The
first one is IcebergStreamWriter (collecting records and emitting DataFiles
to downstream IcebergFileCommitter),  the next one is IcebergFileCommitter,
which will commit the data files to iceberg in a transaction periodically.
Finishing the two operators is the first thing we need to do.
2.  Once we've finished the flink DataStream iceberg sink,  we will create
PRs  to make the flink table sql work.
3.  Flink streaming reader / batch reader etc.

> Kyle: I’ll be interested to review.

Thanks for your time to review those PR.

> It seems like points raised by @openinx in the CDC pipelines doc must be
resolved before moving on with any implementation.

The key problems described in this document are:
1. Fast ingestion;
2. Acceptable Batch Read performance;
3. An equivalent stream so that we could keep the eventual consistency
between source table and sink table.

In this issue[1], Ryan and I  have reached a rough agreement about the
design, saying we will propose a mixed equality-delete and pos-delete to
accomplish that three goals.  we may need to spend some time splitting up
these tasks.
1. https://github.com/apache/iceberg/issues/360

Thanks.

On Sat, Aug 1, 2020 at 8:17 AM Anton Okolnychyi
<ao...@apple.com.invalid> wrote:

> Hey everyone,
>
> Here are my notes from the last sync. Feel free to add/correct.
>
> *Conferences*
>
> There are three talks on Iceberg at the Dremio conference.
> - "The Future of Intelligent Storage in Big Data" by Dan- "Hiveberg:
> Integrating Apache Iceberg with the Hive Metastore" by Adrian and Christine
> - "Lessons learned from running Apache Iceberg at PB scale" by Anton
>
> *Hive integration*
>
> Adrien: Found a bug when MR job is launched in distributed mode, @guilload
> and @rdsr are taking a look at it and will propose a fix soon.
> Adrien: It is hard to work with large tables as predicate push-down is not
> working. Waiting for a PR from @cmathiesen and @massdosage.
>
> *Flink integration*
>
> Junjie: There is some progress on the Flink sync and the work is split
> into smaller PRs that are getting merged into master.
> Kyle: I’ll be interested to review.
>
>
> *Row-level deletes*
> Anton: Most of the work for core metadata is done. We have delete
> manifests, sequence numbers, updated manifest lists.
> Junjie: There is progress on readers to project metadata columns like row
> position in Avro, Parquet, ORC.
> Anton: I was supposed to start working on two-phase job planning approach
> but was distracted by other things. Plan to resume looking into that.
> Anton: It seems like points raised by @openinx in the CDC pipelines
> doc must be resolved before moving on with any implementation.
>
> Could not get more details as neither Ryan nor Zheng was present.
>
> CDC open questions:
> https://docs.google.com/document/d/1bBKDD4l-pQFXaMb4nOyVK-Sl3N2NTTG37uOCQx8rKVc
>
>
> *SQL extensions*
> Anton: Thanks everyone for the feedback. Looks like we almost have
> consensus on how that should look like. There is one open question raised
> by Carl.
> Carl: How will the currently proposed approach that relies on stored
> procedures work with role-based access control? Presto has supports for
> this.
> Anton: We can limit the access to stored procedures but I don’t know how
> we can limit calling a stored procedure on a particular table if the table
> name as passed as an argument.
> Carl: It feels easier with ALTER TABLE syntax.
> Carl: It is better to follow up with the Presto community on this.
> Anton: Agreed. It is a blocker to move forward.
>
> Dev list discussion:
> https://lists.apache.org/thread.html/rb3321727198d65246ec9eb0f938b121ec6fe5dd0face0b2fb899996a%40%3Cdev.iceberg.apache.org%3E
> <https://lists.apache.org/thread.html/rb3321727198d65246ec9eb0f938b121ec6fe5dd0face0b2fb899996a@%3Cdev.iceberg.apache.org%3E>
>
> SQL extensions doc:
> https://docs.google.com/document/d/1Nf8c16R2hj4lSc-4sQg4oiUUV_F4XqZKth1woEo6TN8
>
>
>
> *Vectorized reads for Parquet*
> Anton: Cannot use vectorized reads for tables with identity partitions.
> Russell: Working on a fix.
>
>
> *ExpireSnapshotsAction*
> Russell: Working on an action for expiring snapshots as the current
> solution is slow for expiring a lot of snapshots. The work is split into
> multiple PRs that are being merged to master.
> Ratandeep: We also face this problem. We will help to review.
>
>
> *Secondary indexes*
> Miao: Working on a doc for secondary indexes in Iceberg. The solution
> should be able to support multiple index implementations and should be
> independent from file formats.
> Miao: We have a Bloom filter implementation internally.
> Anton: Do you keep a Bloom filter per file?
> Miao: Yes.
> Anton: Do you store it separately so it can be loaded on demand?
> Miao: Yes.
> Anton: Bloom filters are too big to be incorporated into the metadata
> directly but it will be great to be able to load some of them on demand and
> use during job planning. One of the use cases we were looking to solve is
> to speed up queries with predicates on the sort key. Right now, we do
> min/max file pruning but if you have 10-20 possible keys you are looking
> for (and they cover the full range of values), filtering is not very
> effective. We want to leverage bloom filters for this task and avoid
> touching data files completely. Right now, we still have to read
> a dictionary before we can discard a file. That takes ±1 sec per false data
> file since query engines have to spin up a task.
> Kyle: We have use cases where people are looking for way more than 20 keys.
> Anton: May need a very large Bloom filter to get an acceptable false
> positive ratio if looking for a big number of keys at the same time. At
> least, 10-20 would be a great start.
> Xinli: It would be great to leverage existing bloom filters for file
> formats that support them (e.g. Parquet, ORC).
> Anton: Can we derive a Bloom filter per file based on Bloom filters for
> row groups?
> Miao: Depends on Bloom filter implementation.
> Anton: Is it better to keep a list of bloom filters per row group or one
> per file in the metadata?
>
>
> *Sort spec*
> Anton: I have submitted a proposal for Spark that should allow data
> sources to request a specific distribution and ordering on write. Iceberg
> SortSpec should be based on that. Any feedback on that proposal would help.
>
> Spark proposal:
> https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs
> Spark PR: https://github.com/apache/spark/pull/29066
>
> *Data Compaction*
>
> Anton: I am going to submit a new proposal that will be a follow-up to the
> SQL extensions doc. It should cover sort-based data compaction in addition
> to bin-packing.
>
>
> Thanks,
> Anton
>