You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Anton Okolnychyi <ao...@apple.com.INVALID> on 2019/05/02 04:42:10 UTC

Re: Reading dataset with stats making lots of network traffic..

Hey Gautam,

Out of my curiosity, did you manage to confirm the root cause of the issue?

P.S. I created [1] so that we can make collection of lower/upper bounds configurable.

Thanks,
Anton

[1] - https://github.com/apache/incubator-iceberg/issues/173 <https://github.com/apache/incubator-iceberg/issues/173>

> On 22 Apr 2019, at 09:15, Gautam <ga...@gmail.com> wrote:
> 
> Thanks guys for the insights ..
> 
> > I like Anton's idea to have an optional list of columns for which we keep stats. That would allow us to avoid storing stats for thousands of columns that won't ever be used. Another option here is to add a flag to keep stats only for top-level columns. That's much less configuration for users and probably does the right thing in many cases. Simpler to use but not as fast in all cases is sometimes a good compromise.
> 
> This makes sense to me. It adds a variable that data pipelines can tweak on to improve performance. I will add an issue on Github to add a stats config/flag. Although, having said that, I would try to optimize around this coz read patterns are hardly ever known a priori and adding a column to this list means having to re-write the entire data again. So i'l try the other suggestion which is parallelizing on multiple manifests. 
> 
> >  To clarify my comment on changing the storage: the idea is to use separate columns instead of a map and then use a columnar storage format so we can project those columns independently. Avro can't project columns independently. This wouldn't help on the write side and may just cause a lot of seeking on the read side that diminishes the benefits.
> 
> Gotcha.
> 
> > Also, now that we have more details, I think there is a second problem. Because we expect several manifests in a table, we parallelize split planning on manifests instead of splits of manifest files. This planning operation is happening in a single thread instead of in parallel. I think if you split the write across several manifests, you'd improve wall time.
> 
> This might actually be the issue here, this was a test bench dataset so the writer job created a single manifest for all the data in the dataset which isn't really how we will do things in prod. I'l try and create the metadata based on productions expected commit pattern.
> 
> 
> Regarding Iceberg not truncating large bounded column values https://github.com/apache/incubator-iceberg/issues/113 <https://github.com/apache/incubator-iceberg/issues/113> .. I didn't consider this with our dataset. The current evidence is leading towards the number of columns and the sheer number of files that the manifest is maintaining but this is a good thing to look into.
> 
> Thanks again guys. 
> 
> -Gautam.
> 
> 
> 
> 
> 
> 
> 
> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rblue@netflix.com <ma...@netflix.com>> wrote:
> I like Anton's idea to have an optional list of columns for which we keep stats. That would allow us to avoid storing stats for thousands of columns that won't ever be used. Another option here is to add a flag to keep stats only for top-level columns. That's much less configuration for users and probably does the right thing in many cases. Simpler to use but not as fast in all cases is sometimes a good compromise.
> 
> To clarify my comment on changing the storage: the idea is to use separate columns instead of a map and then use a columnar storage format so we can project those columns independently. Avro can't project columns independently. This wouldn't help on the write side and may just cause a lot of seeking on the read side that diminishes the benefits.
> 
> Also, now that we have more details, I think there is a second problem. Because we expect several manifests in a table, we parallelize split planning on manifests instead of splits of manifest files. This planning operation is happening in a single thread instead of in parallel. I think if you split the write across several manifests, you'd improve wall time.
> 
> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <aokolnychyi@apple.com <ma...@apple.com>> wrote:
> No, we haven’t experienced it yet. The manifest size is huge in your case. To me, Ryan is correct: it might be either big lower/upper bounds (then truncation will help) or a big number columns (then collecting lower/upper bounds only for specific columns will help). I think both optimizations are needed and will reduce the manifest size.
> 
> Since you mentioned you have a lot of columns and we collect bounds for nested struct fields, I am wondering if you could revert [1] locally and compare the manifest size.
> 
> [1] - https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f <https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f>
> 
>> On 19 Apr 2019, at 15:42, Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Thanks for responding Anton! Do we think the delay is mainly due to lower/upper bound filtering? have you faced this? I haven't exactly found where the slowness is yet. It's generally due to the stats filtering but what part of it is causing this much network traffic. There's CloseableIteratable  that takes a ton of time on the next() and hasNext() calls. My guess is the expression evaluation on each manifest entry is what's doing it. 
>> 
>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <aokolnychyi@apple.com <ma...@apple.com>> wrote:
>> I think we need to have a list of columns for which we want to collect stats and that should be configurable by the user. Maybe, this config should be applicable only to lower/upper bounds. As we now collect stats even for nested struct fields, this might generate a lot of data. In most cases, users cluster/sort their data by a subset of data columns to have fast queries with predicates on those columns. So, being able to configure columns for which to collect lower/upper bounds seems reasonable.
>> 
>>> On 19 Apr 2019, at 08:03, Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> >  The length in bytes of the schema is 109M as compared to 687K of the non-stats dataset. 
>>> 
>>> Typo, length in bytes of *manifest*. schema is the same. 
>>> 
>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>> Correction, partition count = 4308.
>>> 
>>> > Re: Changing the way we keep stats. Avro is a block splittable format and is friendly with parallel compute frameworks like Spark. 
>>> 
>>> Here I am trying to say that we don't need to change the format to columnar right? The current format is already friendly for parallelization. 
>>> 
>>> thanks.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>> Ah, my bad. I missed adding in the schema details .. Here are some details on the dataset with stats :
>>> 
>>>  Iceberg Schema Columns : 20
>>>  Spark Schema fields : 20
>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037, changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>>>  Manifest files :1
>>>  Manifest details:
>>>      => manifest file path: adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro <>
>>>      => manifest file length: 109,028,885
>>>      => existing files count: 0
>>>      => added files count: 4308
>>>      => deleted files count: 0
>>>      => partitions count: 4
>>>      => partition fields count: 4
>>> 
>>> Re: Num data files. It has a single manifest keep track of 4308 files. Total record count is 11.4 Million.
>>> 
>>> Re: Columns. You are right that this table has many columns.. although it has only 20 top-level columns,  num leaf columns are in order of thousands. This Schema is heavy on structs (in the thousands) and has deep levels of nesting.  I know Iceberg keeps  column_sizes, value_counts, null_value_counts for all leaf fields and additionally lower-bounds, upper-bounds for native, struct types (not yet for map KVs and arrays).  The length in bytes of the schema is 109M as compared to 687K of the non-stats dataset. 
>>> 
>>> Re: Turning off stats. I am looking to leverage stats coz for our datasets with much larger number of data files we want to leverage iceberg's ability to skip entire files based on these stats. This is one of the big incentives for us to use Iceberg. 
>>> 
>>> Re: Changing the way we keep stats. Avro is a block splittable format and is friendly with parallel compute frameworks like Spark. So would it make sense for instance to have add an option to have Spark job / Futures  handle split planning?   In a larger context, 109M is not that much metadata given that Iceberg is meant for datasets where the metadata itself is Bigdata scale.  I'm curious on how folks with larger sized metadata (in GB) are optimizing this today. 
>>> 
>>> 
>>> Cheers,
>>> -Gautam.
>>> 
>>>  
>>> 
>>> 
>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <rblue@netflix.com.invalid <ma...@netflix.com.invalid>> wrote:
>>> Thanks for bringing this up! My initial theory is that this table has a ton of stats data that you have to read. That could happen in a couple of cases.
>>> 
>>> First, you might have large values in some columns. Parquet will suppress its stats if values are larger than 4k and those are what Iceberg uses. But that could still cause you to store two 1k+ objects for each large column (lower and upper bounds). With a lot of data files, that could add up quickly. The solution here is to implement #113 <https://github.com/apache/incubator-iceberg/issues/113> so that we don't store the actual min and max for string or binary columns, but instead a truncated value that is just above or just below.
>>> 
>>> The second case is when you have a lot of columns. Each column stores both a lower and upper bound, so 1,000 columns could easily take 8k per file. If this is the problem, then maybe we want to have a way to turn off column stats. We could also think of ways to change the way stats are stored in the manifest files, but that only helps if we move to a columnar format to store manifests, so this is probably not a short-term fix.
>>> 
>>> If you can share a bit more information about this table, we can probably tell which one is the problem. I'm guessing it is the large values problem.
>>> 
>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>> Hello folks, 
>>> 
>>> I have been testing Iceberg reading with and without stats built into Iceberg dataset manifest and found that there's a huge jump in network traffic with the latter..
>>> 
>>> 
>>> In my test I am comparing two Iceberg datasets, both written in Iceberg format. One with and the other without stats collected in Iceberg manifests. In particular the difference between the writers used for the two datasets is this PR: https://github.com/apache/incubator-iceberg/pull/63/files <https://github.com/apache/incubator-iceberg/pull/63/files> which uses Iceberg's writers for writing Parquet data. I captured tcpdump from query scans run on these two datasets.  The partition being scanned contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets. There's a 30x jump in network traffic to the remote filesystem (ADLS) when i switch to stats based Iceberg dataset. Both queries used the same Iceberg reader code to access both datasets. 
>>> 
>>> ```
>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep perfanalysis.adlus15.projectcabostore.net <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l
>>> reading from file iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB (Ethernet)
>>> 
>>> 8844
>>> 
>>> 
>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r iceberg_scratch_pad_demo_11_batch_query.pcap | grep perfanalysis.adlus15.projectcabostore.net <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l
>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap, link-type EN10MB (Ethernet)
>>> 
>>> 269708
>>> 
>>> ```
>>> 
>>> As a consequence of this the query response times get affected drastically (illustrated below). I must confess that I am on a slow internet connection via VPN connecting to the remote FS. But the dataset without stats took just 1m 49s while the dataset with stats took 26m 48s to read the same sized data. Most of that time in the latter dataset was spent split planning in Manifest reading and stats evaluation.
>>> 
>>> ```
>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>  count(1)
>>> ----------
>>>      3627
>>> (1 row)
>>> Time: 109673.202 ms (01:49.673)
>>> 
>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId = '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>  count(1)
>>> ----------
>>>      3808
>>> (1 row)
>>> Time: 1608058.616 ms (26:48.059)
>>> 
>>> ```
>>> 
>>> Has anyone faced this? I'm wondering if there's some caching or parallelism option here that can be leveraged.  Would appreciate some guidance. If there isn't a straightforward fix and others feel this is an issue I can raise an issue and look into it further. 
>>> 
>>> 
>>> Cheers,
>>> -Gautam.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>> 
> 
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix


Re: Reading dataset with stats making lots of network traffic..

Posted by Anton Okolnychyi <ao...@apple.com.INVALID>.
Yeah, the moment I sent my email I realized that the table was partitioned by another column :) My bad, it works correctly and filters out manifests.

> On 2 May 2019, at 16:12, Ryan Blue <rb...@netflix.com> wrote:
> 
> Is "id" a partition column? The inclusive manifest evaluator will use an inclusive projection of the filter, which is true for all non-partition column predicates.
> 
> On Thu, May 2, 2019 at 4:08 PM Anton Okolnychyi <aokolnychyi@apple.com <ma...@apple.com>> wrote:
> Hm, but why don’t we filter out manifests using stats for partition columns? Is there a bug in InclusiveManifestEvaluator? I just tested it locally. InclusiveManifestEvaluator gets '(not_null(ref(name="id")) and ref(name="id") == 1)' as rowFilter, which is transformed into ‘true’ after that expression is bound. Apparently, the bound expression doesn’t allow us to filter any manifests.
>  
> 
>> On 2 May 2019, at 15:33, Ryan Blue <rblue@netflix.com <ma...@netflix.com>> wrote:
>> 
>> Yes, the append code tries to keep the existing order. That way, if you have a time-based append pattern, it just works. Similarly, if you've spent the time to optimize the order, the append doesn't rewrite and ruin it.
>> 
>> Sounds like you just need to sort the file list by partition to group as much as possible, then append. We do this when we convert tables to iceberg to start with a good split across manifests.
>> 
>> On Thu, May 2, 2019 at 3:17 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>> Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic. So it preserves the natrual order of manifests so i guess it groups based on when manifests were created so the answer is whatever order the commits were done. If batches within multiple days were committed out of order then a manifest could end up with multiple days. 
>> 
>> 
>> On Thu, May 2, 2019 at 2:23 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>> Ok, thanks for the tip on not having to by tied to a hierarchical  partition spec. 
>> 
>> Although this still doesn't explain why all the manifests are scanned,  coz it should be pruning the list of manifests and it's not. Is my understanding correct that the manifest grouping might be re-shuffling up the days so query on 1 day might map to all manifests even? Does manifest merging optimize for partition boundaries or is it based on manifest's natural order?
>> 
>> On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rblue@netflix.com <ma...@netflix.com>> wrote:
>> You also don't need to use year, month, and day. You can just use day.
>> 
>> The time-based partition functions all produce ordinals, not local values: month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day and hour. In fact, I should open a PR to throw an exception when there are duplicate partition functions...
>> 
>> On Thu, May 2, 2019 at 1:52 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>> FYI .. The test Partition Spec is  : 
>> [
>>   YEAR: identity(21)
>>   MONTH: identity(22)
>>   DAY: identity(23)
>>   batchId: identity(24)
>> ]
>> 
>> 
>> 
>> On Thu, May 2, 2019 at 1:46 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>> > Using those, you should be able to control parallelism. If you want to test with 4,000, then you can set the min count to 5,000 so Iceberg won’t compact manifests.
>> 
>> This is helpful. Thanks for the pointer on increasing parallelism. Will try this out. So I understand the behaviour, if a different dataset has >=5000  batches then the resultant # manifests would be (total_num_batches % 5000 ) ? 
>> 
>> > What surprises me is that you’re not getting much benefit from filtering out manifests that aren’t helpful. We see a lot of benefit from it. 
>> 
>> Pardon the verbose example but i think it'l help explain what i'm seeing .. 
>> 
>> Regarding manifest filtering:  I tested if partition filters in sql query actually reduce manifests being inspected. In my example, i have 16 manifests that point to 4000 batch partitions ( each file is restricted to one partition as we'r using physical partitioning in the table ).  So when querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be read coz 1 batch == 1 file which should be tracked by 1 manifest (among the 16) , right? But i see that all 16 are being inspected in BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1] that should be giving me the manifests that match a partition. When I inspect this  it says `matchingManifests = 16` ,  which is all the manifests in the table.  This could be due to the fact that our batch ids are random UUIDs so lower/upper bounds may not help coz there's no inherent ordering amongst batches. 
>> But then  i tried year = 2019 and month = 01 and day = 01 which also scanned all manifests. Could this be due to the way Iceberg manifests are re-grouped and merged? If so, shouldn't re-grouping honour partition boundaries and optimize for it?
>> 
>> 
>> Cheers,
>> -Gautam.
>> 
>> [1] - https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173 <https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173>
>>  
>> 
>> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rblue@netflix.com <ma...@netflix.com>> wrote:
>> Good questions. Grouping manifests is configurable at the table level. There are 2 settings:
>> 
>> commit.manifest.target-size-bytes defaults to 8MB, this is the target size that Iceberg will compact to
>> commit.manifest.min-count-to-merge defaults to 100, this is the minimum number of files before a compaction is triggered
>> Using those, you should be able to control parallelism. If you want to test with 4,000, then you can set the min count to 5,000 so Iceberg won’t compact manifests.
>> 
>> What surprises me is that you’re not getting much benefit from filtering out manifests that aren’t helpful. We see a lot of benefit from it. You might try sorting the data files by partition before adding them to the table. That will cluster data files in the same partition so you can read fewer manifests.
>> 
>> 
>> On Thu, May 2, 2019 at 12:09 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>> Hey Anton,
>>             Sorry bout the delay on this. Been caught up with some other things. Thanks for raising issue#173 . 
>> 
>> So the root cause is indeed the density and size of the schema. While I agree the option to configure stats for columns is good (although i'm not fully convinced that this is purely due to lower/upper bounds). For instance, maybe it's just taking a while to iterate over manifest rows and deserialize the DataFile stats in each read?  The solution i'm using right now is to parallelize the manifest reading in split planning. We regenerated the Iceberg table with more manifests. Now the code enables the ParallelIterator which uses a worker pool of threads (1 thread per cpu by default, configurable using 'iceberg.worker.num-threads' ) to read manifests. 
>> 
>> On that note, the ability to parallelize is limited to how many manifests are in the table. So as a test, for a table with 4000 files we created one manifest per file (think of one file as a single batch commit in this case). So I was hoping to get a parallelism factor of 4000. But Iceberg summarizes manifests into fewer manifests with each commit so we instead ended up with 16 manifests. So now split planning is limited to reading at most 16 units of parallelism. Is this grouping of manifests into fewer configurable? if not should we allow making this configurable? 
>> 
>> Sorry if this is forking a different conversation. If so, I can start a separate conversation thread on this. 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <aokolnychyi@apple.com <ma...@apple.com>> wrote:
>> Hey Gautam,
>> 
>> Out of my curiosity, did you manage to confirm the root cause of the issue?
>> 
>> P.S. I created [1] so that we can make collection of lower/upper bounds configurable.
>> 
>> Thanks,
>> Anton
>> 
>> [1] - https://github.com/apache/incubator-iceberg/issues/173 <https://github.com/apache/incubator-iceberg/issues/173>
>> 
>>> On 22 Apr 2019, at 09:15, Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Thanks guys for the insights ..
>>> 
>>> > I like Anton's idea to have an optional list of columns for which we keep stats. That would allow us to avoid storing stats for thousands of columns that won't ever be used. Another option here is to add a flag to keep stats only for top-level columns. That's much less configuration for users and probably does the right thing in many cases. Simpler to use but not as fast in all cases is sometimes a good compromise.
>>> 
>>> This makes sense to me. It adds a variable that data pipelines can tweak on to improve performance. I will add an issue on Github to add a stats config/flag. Although, having said that, I would try to optimize around this coz read patterns are hardly ever known a priori and adding a column to this list means having to re-write the entire data again. So i'l try the other suggestion which is parallelizing on multiple manifests. 
>>> 
>>> >  To clarify my comment on changing the storage: the idea is to use separate columns instead of a map and then use a columnar storage format so we can project those columns independently. Avro can't project columns independently. This wouldn't help on the write side and may just cause a lot of seeking on the read side that diminishes the benefits.
>>> 
>>> Gotcha.
>>> 
>>> > Also, now that we have more details, I think there is a second problem. Because we expect several manifests in a table, we parallelize split planning on manifests instead of splits of manifest files. This planning operation is happening in a single thread instead of in parallel. I think if you split the write across several manifests, you'd improve wall time.
>>> 
>>> This might actually be the issue here, this was a test bench dataset so the writer job created a single manifest for all the data in the dataset which isn't really how we will do things in prod. I'l try and create the metadata based on productions expected commit pattern.
>>> 
>>> 
>>> Regarding Iceberg not truncating large bounded column values https://github.com/apache/incubator-iceberg/issues/113 <https://github.com/apache/incubator-iceberg/issues/113> .. I didn't consider this with our dataset. The current evidence is leading towards the number of columns and the sheer number of files that the manifest is maintaining but this is a good thing to look into.
>>> 
>>> Thanks again guys. 
>>> 
>>> -Gautam.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rblue@netflix.com <ma...@netflix.com>> wrote:
>>> I like Anton's idea to have an optional list of columns for which we keep stats. That would allow us to avoid storing stats for thousands of columns that won't ever be used. Another option here is to add a flag to keep stats only for top-level columns. That's much less configuration for users and probably does the right thing in many cases. Simpler to use but not as fast in all cases is sometimes a good compromise.
>>> 
>>> To clarify my comment on changing the storage: the idea is to use separate columns instead of a map and then use a columnar storage format so we can project those columns independently. Avro can't project columns independently. This wouldn't help on the write side and may just cause a lot of seeking on the read side that diminishes the benefits.
>>> 
>>> Also, now that we have more details, I think there is a second problem. Because we expect several manifests in a table, we parallelize split planning on manifests instead of splits of manifest files. This planning operation is happening in a single thread instead of in parallel. I think if you split the write across several manifests, you'd improve wall time.
>>> 
>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <aokolnychyi@apple.com <ma...@apple.com>> wrote:
>>> No, we haven’t experienced it yet. The manifest size is huge in your case. To me, Ryan is correct: it might be either big lower/upper bounds (then truncation will help) or a big number columns (then collecting lower/upper bounds only for specific columns will help). I think both optimizations are needed and will reduce the manifest size.
>>> 
>>> Since you mentioned you have a lot of columns and we collect bounds for nested struct fields, I am wondering if you could revert [1] locally and compare the manifest size.
>>> 
>>> [1] - https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f <https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f>
>>> 
>>>> On 19 Apr 2019, at 15:42, Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Thanks for responding Anton! Do we think the delay is mainly due to lower/upper bound filtering? have you faced this? I haven't exactly found where the slowness is yet. It's generally due to the stats filtering but what part of it is causing this much network traffic. There's CloseableIteratable  that takes a ton of time on the next() and hasNext() calls. My guess is the expression evaluation on each manifest entry is what's doing it. 
>>>> 
>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <aokolnychyi@apple.com <ma...@apple.com>> wrote:
>>>> I think we need to have a list of columns for which we want to collect stats and that should be configurable by the user. Maybe, this config should be applicable only to lower/upper bounds. As we now collect stats even for nested struct fields, this might generate a lot of data. In most cases, users cluster/sort their data by a subset of data columns to have fast queries with predicates on those columns. So, being able to configure columns for which to collect lower/upper bounds seems reasonable.
>>>> 
>>>>> On 19 Apr 2019, at 08:03, Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> >  The length in bytes of the schema is 109M as compared to 687K of the non-stats dataset. 
>>>>> 
>>>>> Typo, length in bytes of *manifest*. schema is the same. 
>>>>> 
>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>>>> Correction, partition count = 4308.
>>>>> 
>>>>> > Re: Changing the way we keep stats. Avro is a block splittable format and is friendly with parallel compute frameworks like Spark. 
>>>>> 
>>>>> Here I am trying to say that we don't need to change the format to columnar right? The current format is already friendly for parallelization. 
>>>>> 
>>>>> thanks.
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>>>> Ah, my bad. I missed adding in the schema details .. Here are some details on the dataset with stats :
>>>>> 
>>>>>  Iceberg Schema Columns : 20
>>>>>  Spark Schema fields : 20
>>>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037, changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>>>>>  Manifest files :1
>>>>>  Manifest details:
>>>>>      => manifest file path: adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro <>
>>>>>      => manifest file length: 109,028,885
>>>>>      => existing files count: 0
>>>>>      => added files count: 4308
>>>>>      => deleted files count: 0
>>>>>      => partitions count: 4
>>>>>      => partition fields count: 4
>>>>> 
>>>>> Re: Num data files. It has a single manifest keep track of 4308 files. Total record count is 11.4 Million.
>>>>> 
>>>>> Re: Columns. You are right that this table has many columns.. although it has only 20 top-level columns,  num leaf columns are in order of thousands. This Schema is heavy on structs (in the thousands) and has deep levels of nesting.  I know Iceberg keeps  column_sizes, value_counts, null_value_counts for all leaf fields and additionally lower-bounds, upper-bounds for native, struct types (not yet for map KVs and arrays).  The length in bytes of the schema is 109M as compared to 687K of the non-stats dataset. 
>>>>> 
>>>>> Re: Turning off stats. I am looking to leverage stats coz for our datasets with much larger number of data files we want to leverage iceberg's ability to skip entire files based on these stats. This is one of the big incentives for us to use Iceberg. 
>>>>> 
>>>>> Re: Changing the way we keep stats. Avro is a block splittable format and is friendly with parallel compute frameworks like Spark. So would it make sense for instance to have add an option to have Spark job / Futures  handle split planning?   In a larger context, 109M is not that much metadata given that Iceberg is meant for datasets where the metadata itself is Bigdata scale.  I'm curious on how folks with larger sized metadata (in GB) are optimizing this today. 
>>>>> 
>>>>> 
>>>>> Cheers,
>>>>> -Gautam.
>>>>> 
>>>>>  
>>>>> 
>>>>> 
>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <rblue@netflix.com.invalid <ma...@netflix.com.invalid>> wrote:
>>>>> Thanks for bringing this up! My initial theory is that this table has a ton of stats data that you have to read. That could happen in a couple of cases.
>>>>> 
>>>>> First, you might have large values in some columns. Parquet will suppress its stats if values are larger than 4k and those are what Iceberg uses. But that could still cause you to store two 1k+ objects for each large column (lower and upper bounds). With a lot of data files, that could add up quickly. The solution here is to implement #113 <https://github.com/apache/incubator-iceberg/issues/113> so that we don't store the actual min and max for string or binary columns, but instead a truncated value that is just above or just below.
>>>>> 
>>>>> The second case is when you have a lot of columns. Each column stores both a lower and upper bound, so 1,000 columns could easily take 8k per file. If this is the problem, then maybe we want to have a way to turn off column stats. We could also think of ways to change the way stats are stored in the manifest files, but that only helps if we move to a columnar format to store manifests, so this is probably not a short-term fix.
>>>>> 
>>>>> If you can share a bit more information about this table, we can probably tell which one is the problem. I'm guessing it is the large values problem.
>>>>> 
>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hello folks, 
>>>>> 
>>>>> I have been testing Iceberg reading with and without stats built into Iceberg dataset manifest and found that there's a huge jump in network traffic with the latter..
>>>>> 
>>>>> 
>>>>> In my test I am comparing two Iceberg datasets, both written in Iceberg format. One with and the other without stats collected in Iceberg manifests. In particular the difference between the writers used for the two datasets is this PR: https://github.com/apache/incubator-iceberg/pull/63/files <https://github.com/apache/incubator-iceberg/pull/63/files> which uses Iceberg's writers for writing Parquet data. I captured tcpdump from query scans run on these two datasets.  The partition being scanned contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets. There's a 30x jump in network traffic to the remote filesystem (ADLS) when i switch to stats based Iceberg dataset. Both queries used the same Iceberg reader code to access both datasets. 
>>>>> 
>>>>> ```
>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep perfanalysis.adlus15.projectcabostore.net <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l
>>>>> reading from file iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB (Ethernet)
>>>>> 
>>>>> 8844
>>>>> 
>>>>> 
>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r iceberg_scratch_pad_demo_11_batch_query.pcap | grep perfanalysis.adlus15.projectcabostore.net <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l
>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap, link-type EN10MB (Ethernet)
>>>>> 
>>>>> 269708
>>>>> 
>>>>> ```
>>>>> 
>>>>> As a consequence of this the query response times get affected drastically (illustrated below). I must confess that I am on a slow internet connection via VPN connecting to the remote FS. But the dataset without stats took just 1m 49s while the dataset with stats took 26m 48s to read the same sized data. Most of that time in the latter dataset was spent split planning in Manifest reading and stats evaluation.
>>>>> 
>>>>> ```
>>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>>  count(1)
>>>>> ----------
>>>>>      3627
>>>>> (1 row)
>>>>> Time: 109673.202 ms (01:49.673)
>>>>> 
>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId = '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>  count(1)
>>>>> ----------
>>>>>      3808
>>>>> (1 row)
>>>>> Time: 1608058.616 ms (26:48.059)
>>>>> 
>>>>> ```
>>>>> 
>>>>> Has anyone faced this? I'm wondering if there's some caching or parallelism option here that can be leveraged.  Would appreciate some guidance. If there isn't a straightforward fix and others feel this is an issue I can raise an issue and look into it further. 
>>>>> 
>>>>> 
>>>>> Cheers,
>>>>> -Gautam.
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>> 
>> 
>> 
>> -- 
>> Ryan Blue
>> Software Engineer
>> Netflix
>> 
>> 
>> -- 
>> Ryan Blue
>> Software Engineer
>> Netflix
>> 
>> 
>> -- 
>> Ryan Blue
>> Software Engineer
>> Netflix
> 
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix


Re: Reading dataset with stats making lots of network traffic..

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Is "id" a partition column? The inclusive manifest evaluator will use an
inclusive projection of the filter, which is true for all non-partition
column predicates.

On Thu, May 2, 2019 at 4:08 PM Anton Okolnychyi <ao...@apple.com>
wrote:

> Hm, but why don’t we filter out manifests using stats for partition
> columns? Is there a bug in InclusiveManifestEvaluator? I just tested it
> locally. InclusiveManifestEvaluator gets '(not_null(ref(name="id")) and
> ref(name="id") == 1)' as rowFilter, which is transformed into ‘true’ after
> that expression is bound. Apparently, the bound expression doesn’t allow us
> to filter any manifests.
>
>
> On 2 May 2019, at 15:33, Ryan Blue <rb...@netflix.com> wrote:
>
> Yes, the append code tries to keep the existing order. That way, if you
> have a time-based append pattern, it just works. Similarly, if you've spent
> the time to optimize the order, the append doesn't rewrite and ruin it.
>
> Sounds like you just need to sort the file list by partition to group as
> much as possible, then append. We do this when we convert tables to iceberg
> to start with a good split across manifests.
>
> On Thu, May 2, 2019 at 3:17 PM Gautam <ga...@gmail.com> wrote:
>
>> Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic.
>> So it preserves the natrual order of manifests so i guess it groups based
>> on when manifests were created so the answer is whatever order the commits
>> were done. If batches within multiple days were committed out of order then
>> a manifest could end up with multiple days.
>>
>>
>> On Thu, May 2, 2019 at 2:23 PM Gautam <ga...@gmail.com> wrote:
>>
>>> Ok, thanks for the tip on not having to by tied to a hierarchical
>>> partition spec.
>>>
>>> Although this still doesn't explain why all the manifests are scanned,
>>> coz it should be pruning the list of manifests and it's not. Is my
>>> understanding correct that the manifest grouping might be re-shuffling up
>>> the days so query on 1 day might map to all manifests even? Does manifest
>>> merging optimize for partition boundaries or is it based on manifest's
>>> natural order?
>>>
>>> On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> You also don't need to use year, month, and day. You can just use day.
>>>>
>>>> The time-based partition functions all produce ordinals, not local
>>>> values: month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day
>>>> and hour. In fact, I should open a PR to throw an exception when there are
>>>> duplicate partition functions...
>>>>
>>>> On Thu, May 2, 2019 at 1:52 PM Gautam <ga...@gmail.com> wrote:
>>>>
>>>>> FYI .. The test Partition Spec is  :
>>>>> [
>>>>>   YEAR: identity(21)
>>>>>   MONTH: identity(22)
>>>>>   DAY: identity(23)
>>>>>   batchId: identity(24)
>>>>> ]
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 2, 2019 at 1:46 PM Gautam <ga...@gmail.com> wrote:
>>>>>
>>>>>> > Using those, you should be able to control parallelism. If you want
>>>>>> to test with 4,000, then you can set the min count to 5,000 so Iceberg
>>>>>> won’t compact manifests.
>>>>>>
>>>>>> This is helpful. Thanks for the pointer on increasing parallelism.
>>>>>> Will try this out. So I understand the behaviour, if a different dataset
>>>>>> has >=5000  batches then the resultant # manifests would be
>>>>>> (total_num_batches % 5000 ) ?
>>>>>>
>>>>>> > What surprises me is that you’re not getting much benefit from
>>>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>>>>> it.
>>>>>>
>>>>>> Pardon the verbose example but i think it'l help explain what i'm
>>>>>> seeing ..
>>>>>>
>>>>>> Regarding manifest filtering:  I tested if partition filters in sql
>>>>>> query actually reduce manifests being inspected. In my example, i have 16
>>>>>> manifests that point to 4000 batch partitions ( each file is restricted to
>>>>>> one partition as we'r using physical partitioning in the table ).  So when
>>>>>> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
>>>>>> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
>>>>>> 16) , right? But i see that all 16 are being inspected in
>>>>>> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
>>>>>> that should be giving me the manifests that match a partition. When I
>>>>>> inspect this  it says `matchingManifests = 16` ,  which is all the
>>>>>> manifests in the table.  This *could* be due to the fact that our
>>>>>> batch ids are random UUIDs so lower/upper bounds may not help coz there's
>>>>>> no inherent ordering amongst batches.
>>>>>> But then  i tried year = 2019 and month = 01 and day = 01 which also
>>>>>> scanned all manifests. Could this be due to the way Iceberg manifests are
>>>>>> re-grouped and merged? If so, shouldn't re-grouping honour partition
>>>>>> boundaries and optimize for it?
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> -Gautam.
>>>>>>
>>>>>> [1] -
>>>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>>>>>>
>>>>>>
>>>>>> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>>> Good questions. Grouping manifests is configurable at the table
>>>>>>> level. There are 2 settings:
>>>>>>>
>>>>>>>    - commit.manifest.target-size-bytes defaults to 8MB, this is the
>>>>>>>    target size that Iceberg will compact to
>>>>>>>    - commit.manifest.min-count-to-merge defaults to 100, this is
>>>>>>>    the minimum number of files before a compaction is triggered
>>>>>>>
>>>>>>> Using those, you should be able to control parallelism. If you want
>>>>>>> to test with 4,000, then you can set the min count to 5,000 so Iceberg
>>>>>>> won’t compact manifests.
>>>>>>>
>>>>>>> What surprises me is that you’re not getting much benefit from
>>>>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>>>>>> it. You might try sorting the data files by partition before adding them to
>>>>>>> the table. That will cluster data files in the same partition so you can
>>>>>>> read fewer manifests.
>>>>>>>
>>>>>>> On Thu, May 2, 2019 at 12:09 PM Gautam <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Anton,
>>>>>>>>             Sorry bout the delay on this. Been caught up with some
>>>>>>>> other things. Thanks for raising issue#173 .
>>>>>>>>
>>>>>>>> So the root cause is indeed the density and size of the schema.
>>>>>>>> While I agree the option to configure stats for columns is good (although
>>>>>>>> i'm not fully convinced that this is purely due to lower/upper bounds). For
>>>>>>>> instance, maybe it's just taking a while to iterate over manifest rows and
>>>>>>>> deserialize the DataFile stats in each read?  The solution i'm using right
>>>>>>>> now is to parallelize the manifest reading in split planning. We
>>>>>>>> regenerated the Iceberg table with more manifests. Now the code enables the
>>>>>>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>>>>>>>> default, configurable using 'iceberg.worker.num-threads' ) to read
>>>>>>>> manifests.
>>>>>>>>
>>>>>>>> On that note, the ability to parallelize is limited to how many
>>>>>>>> manifests are in the table. So as a test, for a table with 4000 files we
>>>>>>>> created one manifest per file (think of one file as a single batch commit
>>>>>>>> in this case). So I was hoping to get a parallelism factor of 4000. But
>>>>>>>> Iceberg summarizes manifests into fewer manifests with each commit so we
>>>>>>>> instead ended up with 16 manifests. So now split planning is limited to
>>>>>>>> reading at most 16 units of parallelism. Is this grouping of manifests into
>>>>>>>> fewer configurable? if not should we allow making this configurable?
>>>>>>>>
>>>>>>>> Sorry if this is forking a different conversation. If so, I can
>>>>>>>> start a separate conversation thread on this.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <
>>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>>
>>>>>>>>> Hey Gautam,
>>>>>>>>>
>>>>>>>>> Out of my curiosity, did you manage to confirm the root cause of
>>>>>>>>> the issue?
>>>>>>>>>
>>>>>>>>> P.S. I created [1] so that we can make collection of lower/upper
>>>>>>>>> bounds configurable.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Anton
>>>>>>>>>
>>>>>>>>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>>>>>>>>
>>>>>>>>> On 22 Apr 2019, at 09:15, Gautam <ga...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Thanks guys for the insights ..
>>>>>>>>>
>>>>>>>>> > I like Anton's idea to have an optional list of columns for
>>>>>>>>> which we keep stats. That would allow us to avoid storing stats for
>>>>>>>>> thousands of columns that won't ever be used. Another option here is to add
>>>>>>>>> a flag to keep stats only for top-level columns. That's much less
>>>>>>>>> configuration for users and probably does the right thing in many cases.
>>>>>>>>> Simpler to use but not as fast in all cases is sometimes a good compromise.
>>>>>>>>>
>>>>>>>>> This makes sense to me. It adds a variable that data pipelines can
>>>>>>>>> tweak on to improve performance. I will add an issue on Github to add a
>>>>>>>>> stats config/flag. Although, having said that, I would try to optimize
>>>>>>>>> around this coz read patterns are hardly ever known a priori and adding a
>>>>>>>>> column to this list means having to re-write the entire data again. So i'l
>>>>>>>>> try the other suggestion which is parallelizing on multiple manifests.
>>>>>>>>>
>>>>>>>>> >  To clarify my comment on changing the storage: the idea is to
>>>>>>>>> use separate columns instead of a map and then use a columnar storage
>>>>>>>>> format so we can project those columns independently. Avro can't project
>>>>>>>>> columns independently. This wouldn't help on the write side and may just
>>>>>>>>> cause a lot of seeking on the read side that diminishes the benefits.
>>>>>>>>>
>>>>>>>>> Gotcha.
>>>>>>>>>
>>>>>>>>> > Also, now that we have more details, I think there is a second
>>>>>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>>>>>> time.
>>>>>>>>>
>>>>>>>>> This might actually be the issue here, this was a test bench
>>>>>>>>> dataset so the writer job created a single manifest for all the data in the
>>>>>>>>> dataset which isn't really how we will do things in prod. I'l try and
>>>>>>>>> create the metadata based on productions expected commit pattern.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regarding Iceberg not truncating large bounded column values
>>>>>>>>> https://github.com/apache/incubator-iceberg/issues/113 .. I
>>>>>>>>> didn't consider this with our dataset. The current evidence is leading
>>>>>>>>> towards the number of columns and the sheer number of files that the
>>>>>>>>> manifest is maintaining but this is a good thing to look into.
>>>>>>>>>
>>>>>>>>> Thanks again guys.
>>>>>>>>>
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I like Anton's idea to have an optional list of columns for which
>>>>>>>>>> we keep stats. That would allow us to avoid storing stats for thousands of
>>>>>>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>>>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>>>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>>>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>>>>>>
>>>>>>>>>> To clarify my comment on changing the storage: the idea is to use
>>>>>>>>>> separate columns instead of a map and then use a columnar storage format so
>>>>>>>>>> we can project those columns independently. Avro can't project columns
>>>>>>>>>> independently. This wouldn't help on the write side and may just cause a
>>>>>>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>>>>>>
>>>>>>>>>> Also, now that we have more details, I think there is a second
>>>>>>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>>>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>>>>>>> time.
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <
>>>>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> No, we haven’t experienced it yet. The manifest size is huge in
>>>>>>>>>>> your case. To me, Ryan is correct: it might be either big lower/upper
>>>>>>>>>>> bounds (then truncation will help) or a big number columns (then collecting
>>>>>>>>>>> lower/upper bounds only for specific columns will help). I think both
>>>>>>>>>>> optimizations are needed and will reduce the manifest size.
>>>>>>>>>>>
>>>>>>>>>>> Since you mentioned you have a lot of columns and we collect
>>>>>>>>>>> bounds for nested struct fields, I am wondering if you could revert [1]
>>>>>>>>>>> locally and compare the manifest size.
>>>>>>>>>>>
>>>>>>>>>>> [1] -
>>>>>>>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>>>>>>>>
>>>>>>>>>>> On 19 Apr 2019, at 15:42, Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks for responding Anton! Do we think the delay is mainly due
>>>>>>>>>>> to lower/upper bound filtering? have you faced this? I haven't exactly
>>>>>>>>>>> found where the slowness is yet. It's generally due to the stats filtering
>>>>>>>>>>> but what part of it is causing this much network traffic. There's
>>>>>>>>>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>>>>>>>>>> calls. My guess is the expression evaluation on each manifest entry is
>>>>>>>>>>> what's doing it.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <
>>>>>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I think we need to have a list of columns for which we want to
>>>>>>>>>>>> collect stats and that should be configurable by the user. Maybe, this
>>>>>>>>>>>> config should be applicable only to lower/upper bounds. As we now collect
>>>>>>>>>>>> stats even for nested struct fields, this might generate a lot of data. In
>>>>>>>>>>>> most cases, users cluster/sort their data by a subset of data columns to
>>>>>>>>>>>> have fast queries with predicates on those columns. So, being able to
>>>>>>>>>>>> configure columns for which to collect lower/upper bounds seems reasonable.
>>>>>>>>>>>>
>>>>>>>>>>>> On 19 Apr 2019, at 08:03, Gautam <ga...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> >  The length in bytes of the schema is 109M as compared to
>>>>>>>>>>>> 687K of the non-stats dataset.
>>>>>>>>>>>>
>>>>>>>>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <
>>>>>>>>>>>> gautamkowshik@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Correction, partition count = 4308.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > Re: Changing the way we keep stats. Avro is a block
>>>>>>>>>>>>> splittable format and is friendly with parallel compute frameworks like
>>>>>>>>>>>>> Spark.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here I am trying to say that we don't need to change the
>>>>>>>>>>>>> format to columnar right? The current format is already friendly for
>>>>>>>>>>>>> parallelization.
>>>>>>>>>>>>>
>>>>>>>>>>>>> thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <
>>>>>>>>>>>>> gautamkowshik@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are
>>>>>>>>>>>>>> some details on the dataset with stats :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>>>>>>>>  Spark Schema fields : 20
>>>>>>>>>>>>>>  Snapshot Summary :{added-data-files=4308,
>>>>>>>>>>>>>> added-records=11494037, changed-partition-count=4308,
>>>>>>>>>>>>>> total-records=11494037, total-data-files=4308}
>>>>>>>>>>>>>>  Manifest files :1
>>>>>>>>>>>>>>  Manifest details:
>>>>>>>>>>>>>>      => manifest file path:
>>>>>>>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>>>>>>>>      => manifest file length: 109,028,885
>>>>>>>>>>>>>>      => existing files count: 0
>>>>>>>>>>>>>>      => added files count: 4308
>>>>>>>>>>>>>>      => deleted files count: 0
>>>>>>>>>>>>>>      => partitions count: 4
>>>>>>>>>>>>>>      => partition fields count: 4
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re: Num data files. It has a single manifest keep track of
>>>>>>>>>>>>>> 4308 files. Total record count is 11.4 Million.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>>>>>>>>> although it has only 20 top-level columns,  num leaf columns are in order
>>>>>>>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and has
>>>>>>>>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf
>>>>>>>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for
>>>>>>>>>>>>>> native, struct types (not yet for map KVs and arrays).  The length in bytes
>>>>>>>>>>>>>> of the schema is 109M as compared to 687K of the non-stats dataset.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for
>>>>>>>>>>>>>> our datasets with much larger number of data files we want to leverage
>>>>>>>>>>>>>> iceberg's ability to skip entire files based on these stats. This is one of
>>>>>>>>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re: Changing the way we keep stats. Avro is a block
>>>>>>>>>>>>>> splittable format and is friendly with parallel compute frameworks like
>>>>>>>>>>>>>> Spark. So would it make sense for instance to have add an option to have
>>>>>>>>>>>>>> Spark job / Futures  handle split planning?   In a larger context, 109M is
>>>>>>>>>>>>>> not that much metadata given that Iceberg is meant for datasets where the
>>>>>>>>>>>>>> metadata itself is Bigdata scale.  I'm curious on how folks with larger
>>>>>>>>>>>>>> sized metadata (in GB) are optimizing this today.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>>>>>>>>> rblue@netflix.com.invalid> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for bringing this up! My initial theory is that this
>>>>>>>>>>>>>>> table has a ton of stats data that you have to read. That could happen in a
>>>>>>>>>>>>>>> couple of cases.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> First, you might have large values in some columns. Parquet
>>>>>>>>>>>>>>> will suppress its stats if values are larger than 4k and those are what
>>>>>>>>>>>>>>> Iceberg uses. But that could still cause you to store two 1k+ objects for
>>>>>>>>>>>>>>> each large column (lower and upper bounds). With a lot of data files, that
>>>>>>>>>>>>>>> could add up quickly. The solution here is to implement #113
>>>>>>>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so
>>>>>>>>>>>>>>> that we don't store the actual min and max for string or binary columns,
>>>>>>>>>>>>>>> but instead a truncated value that is just above or just below.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The second case is when you have a lot of columns. Each
>>>>>>>>>>>>>>> column stores both a lower and upper bound, so 1,000 columns could easily
>>>>>>>>>>>>>>> take 8k per file. If this is the problem, then maybe we want to have a way
>>>>>>>>>>>>>>> to turn off column stats. We could also think of ways to change the way
>>>>>>>>>>>>>>> stats are stored in the manifest files, but that only helps if we move to a
>>>>>>>>>>>>>>> columnar format to store manifests, so this is probably not a short-term
>>>>>>>>>>>>>>> fix.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you can share a bit more information about this table, we
>>>>>>>>>>>>>>> can probably tell which one is the problem. I'm guessing it is the large
>>>>>>>>>>>>>>> values problem.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <
>>>>>>>>>>>>>>> gautamkowshik@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have been testing Iceberg reading with and without stats
>>>>>>>>>>>>>>>> built into Iceberg dataset manifest and found that there's a huge jump in
>>>>>>>>>>>>>>>> network traffic with the latter..
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In my test I am comparing two Iceberg datasets, both
>>>>>>>>>>>>>>>> written in Iceberg format. One with and the other without stats collected
>>>>>>>>>>>>>>>> in Iceberg manifests. In particular the difference between the writers used
>>>>>>>>>>>>>>>> for the two datasets is this PR:
>>>>>>>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump from
>>>>>>>>>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets.
>>>>>>>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem (ADLS) when
>>>>>>>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same Iceberg
>>>>>>>>>>>>>>>> reader code to access both datasets.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc
>>>>>>>>>>>>>>>> -l
>>>>>>>>>>>>>>>> reading from file
>>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>>>>>>>>>> (Ethernet)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *8844*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc
>>>>>>>>>>>>>>>> -l
>>>>>>>>>>>>>>>> reading from file
>>>>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap, link-type EN10MB (Ethernet)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *269708*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> As a consequence of this the query response times get
>>>>>>>>>>>>>>>> affected drastically (illustrated below). I must confess that I am on a
>>>>>>>>>>>>>>>> slow internet connection via VPN connecting to the remote FS. But the
>>>>>>>>>>>>>>>> dataset without stats took just 1m 49s while the dataset with stats took
>>>>>>>>>>>>>>>> 26m 48s to read the same sized data. Most of that time in the latter
>>>>>>>>>>>>>>>> dataset was spent split planning in Manifest reading and stats evaluation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>> all=> select count(*)  from
>>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues where batchId =
>>>>>>>>>>>>>>>> '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>>>>>>>>  count(1)
>>>>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>>>>      3627
>>>>>>>>>>>>>>>> (1 row)
>>>>>>>>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11
>>>>>>>>>>>>>>>> where _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>>>>>>>>  count(1)
>>>>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>>>>      3808
>>>>>>>>>>>>>>>> (1 row)
>>>>>>>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Has anyone faced this? I'm wondering if there's some
>>>>>>>>>>>>>>>> caching or parallelism option here that can be leveraged.  Would appreciate
>>>>>>>>>>>>>>>> some guidance. If there isn't a straightforward fix and others feel this is
>>>>>>>>>>>>>>>> an issue I can raise an issue and look into it further.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>>>> Netflix
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Ryan Blue
>>>>>>>>>> Software Engineer
>>>>>>>>>> Netflix
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
>
>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Reading dataset with stats making lots of network traffic..

Posted by Anton Okolnychyi <ao...@apple.com.INVALID>.
Hm, but why don’t we filter out manifests using stats for partition columns? Is there a bug in InclusiveManifestEvaluator? I just tested it locally. InclusiveManifestEvaluator gets '(not_null(ref(name="id")) and ref(name="id") == 1)' as rowFilter, which is transformed into ‘true’ after that expression is bound. Apparently, the bound expression doesn’t allow us to filter any manifests.
 

> On 2 May 2019, at 15:33, Ryan Blue <rb...@netflix.com> wrote:
> 
> Yes, the append code tries to keep the existing order. That way, if you have a time-based append pattern, it just works. Similarly, if you've spent the time to optimize the order, the append doesn't rewrite and ruin it.
> 
> Sounds like you just need to sort the file list by partition to group as much as possible, then append. We do this when we convert tables to iceberg to start with a good split across manifests.
> 
> On Thu, May 2, 2019 at 3:17 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
> Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic. So it preserves the natrual order of manifests so i guess it groups based on when manifests were created so the answer is whatever order the commits were done. If batches within multiple days were committed out of order then a manifest could end up with multiple days. 
> 
> 
> On Thu, May 2, 2019 at 2:23 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
> Ok, thanks for the tip on not having to by tied to a hierarchical  partition spec. 
> 
> Although this still doesn't explain why all the manifests are scanned,  coz it should be pruning the list of manifests and it's not. Is my understanding correct that the manifest grouping might be re-shuffling up the days so query on 1 day might map to all manifests even? Does manifest merging optimize for partition boundaries or is it based on manifest's natural order?
> 
> On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rblue@netflix.com <ma...@netflix.com>> wrote:
> You also don't need to use year, month, and day. You can just use day.
> 
> The time-based partition functions all produce ordinals, not local values: month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day and hour. In fact, I should open a PR to throw an exception when there are duplicate partition functions...
> 
> On Thu, May 2, 2019 at 1:52 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
> FYI .. The test Partition Spec is  : 
> [
>   YEAR: identity(21)
>   MONTH: identity(22)
>   DAY: identity(23)
>   batchId: identity(24)
> ]
> 
> 
> 
> On Thu, May 2, 2019 at 1:46 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
> > Using those, you should be able to control parallelism. If you want to test with 4,000, then you can set the min count to 5,000 so Iceberg won’t compact manifests.
> 
> This is helpful. Thanks for the pointer on increasing parallelism. Will try this out. So I understand the behaviour, if a different dataset has >=5000  batches then the resultant # manifests would be (total_num_batches % 5000 ) ? 
> 
> > What surprises me is that you’re not getting much benefit from filtering out manifests that aren’t helpful. We see a lot of benefit from it. 
> 
> Pardon the verbose example but i think it'l help explain what i'm seeing .. 
> 
> Regarding manifest filtering:  I tested if partition filters in sql query actually reduce manifests being inspected. In my example, i have 16 manifests that point to 4000 batch partitions ( each file is restricted to one partition as we'r using physical partitioning in the table ).  So when querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be read coz 1 batch == 1 file which should be tracked by 1 manifest (among the 16) , right? But i see that all 16 are being inspected in BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1] that should be giving me the manifests that match a partition. When I inspect this  it says `matchingManifests = 16` ,  which is all the manifests in the table.  This could be due to the fact that our batch ids are random UUIDs so lower/upper bounds may not help coz there's no inherent ordering amongst batches. 
> But then  i tried year = 2019 and month = 01 and day = 01 which also scanned all manifests. Could this be due to the way Iceberg manifests are re-grouped and merged? If so, shouldn't re-grouping honour partition boundaries and optimize for it?
> 
> 
> Cheers,
> -Gautam.
> 
> [1] - https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173 <https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173>
>  
> 
> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rblue@netflix.com <ma...@netflix.com>> wrote:
> Good questions. Grouping manifests is configurable at the table level. There are 2 settings:
> 
> commit.manifest.target-size-bytes defaults to 8MB, this is the target size that Iceberg will compact to
> commit.manifest.min-count-to-merge defaults to 100, this is the minimum number of files before a compaction is triggered
> Using those, you should be able to control parallelism. If you want to test with 4,000, then you can set the min count to 5,000 so Iceberg won’t compact manifests.
> 
> What surprises me is that you’re not getting much benefit from filtering out manifests that aren’t helpful. We see a lot of benefit from it. You might try sorting the data files by partition before adding them to the table. That will cluster data files in the same partition so you can read fewer manifests.
> 
> 
> On Thu, May 2, 2019 at 12:09 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
> Hey Anton,
>             Sorry bout the delay on this. Been caught up with some other things. Thanks for raising issue#173 . 
> 
> So the root cause is indeed the density and size of the schema. While I agree the option to configure stats for columns is good (although i'm not fully convinced that this is purely due to lower/upper bounds). For instance, maybe it's just taking a while to iterate over manifest rows and deserialize the DataFile stats in each read?  The solution i'm using right now is to parallelize the manifest reading in split planning. We regenerated the Iceberg table with more manifests. Now the code enables the ParallelIterator which uses a worker pool of threads (1 thread per cpu by default, configurable using 'iceberg.worker.num-threads' ) to read manifests. 
> 
> On that note, the ability to parallelize is limited to how many manifests are in the table. So as a test, for a table with 4000 files we created one manifest per file (think of one file as a single batch commit in this case). So I was hoping to get a parallelism factor of 4000. But Iceberg summarizes manifests into fewer manifests with each commit so we instead ended up with 16 manifests. So now split planning is limited to reading at most 16 units of parallelism. Is this grouping of manifests into fewer configurable? if not should we allow making this configurable? 
> 
> Sorry if this is forking a different conversation. If so, I can start a separate conversation thread on this. 
> 
> 
> 
> 
> 
> 
> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <aokolnychyi@apple.com <ma...@apple.com>> wrote:
> Hey Gautam,
> 
> Out of my curiosity, did you manage to confirm the root cause of the issue?
> 
> P.S. I created [1] so that we can make collection of lower/upper bounds configurable.
> 
> Thanks,
> Anton
> 
> [1] - https://github.com/apache/incubator-iceberg/issues/173 <https://github.com/apache/incubator-iceberg/issues/173>
> 
>> On 22 Apr 2019, at 09:15, Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Thanks guys for the insights ..
>> 
>> > I like Anton's idea to have an optional list of columns for which we keep stats. That would allow us to avoid storing stats for thousands of columns that won't ever be used. Another option here is to add a flag to keep stats only for top-level columns. That's much less configuration for users and probably does the right thing in many cases. Simpler to use but not as fast in all cases is sometimes a good compromise.
>> 
>> This makes sense to me. It adds a variable that data pipelines can tweak on to improve performance. I will add an issue on Github to add a stats config/flag. Although, having said that, I would try to optimize around this coz read patterns are hardly ever known a priori and adding a column to this list means having to re-write the entire data again. So i'l try the other suggestion which is parallelizing on multiple manifests. 
>> 
>> >  To clarify my comment on changing the storage: the idea is to use separate columns instead of a map and then use a columnar storage format so we can project those columns independently. Avro can't project columns independently. This wouldn't help on the write side and may just cause a lot of seeking on the read side that diminishes the benefits.
>> 
>> Gotcha.
>> 
>> > Also, now that we have more details, I think there is a second problem. Because we expect several manifests in a table, we parallelize split planning on manifests instead of splits of manifest files. This planning operation is happening in a single thread instead of in parallel. I think if you split the write across several manifests, you'd improve wall time.
>> 
>> This might actually be the issue here, this was a test bench dataset so the writer job created a single manifest for all the data in the dataset which isn't really how we will do things in prod. I'l try and create the metadata based on productions expected commit pattern.
>> 
>> 
>> Regarding Iceberg not truncating large bounded column values https://github.com/apache/incubator-iceberg/issues/113 <https://github.com/apache/incubator-iceberg/issues/113> .. I didn't consider this with our dataset. The current evidence is leading towards the number of columns and the sheer number of files that the manifest is maintaining but this is a good thing to look into.
>> 
>> Thanks again guys. 
>> 
>> -Gautam.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rblue@netflix.com <ma...@netflix.com>> wrote:
>> I like Anton's idea to have an optional list of columns for which we keep stats. That would allow us to avoid storing stats for thousands of columns that won't ever be used. Another option here is to add a flag to keep stats only for top-level columns. That's much less configuration for users and probably does the right thing in many cases. Simpler to use but not as fast in all cases is sometimes a good compromise.
>> 
>> To clarify my comment on changing the storage: the idea is to use separate columns instead of a map and then use a columnar storage format so we can project those columns independently. Avro can't project columns independently. This wouldn't help on the write side and may just cause a lot of seeking on the read side that diminishes the benefits.
>> 
>> Also, now that we have more details, I think there is a second problem. Because we expect several manifests in a table, we parallelize split planning on manifests instead of splits of manifest files. This planning operation is happening in a single thread instead of in parallel. I think if you split the write across several manifests, you'd improve wall time.
>> 
>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <aokolnychyi@apple.com <ma...@apple.com>> wrote:
>> No, we haven’t experienced it yet. The manifest size is huge in your case. To me, Ryan is correct: it might be either big lower/upper bounds (then truncation will help) or a big number columns (then collecting lower/upper bounds only for specific columns will help). I think both optimizations are needed and will reduce the manifest size.
>> 
>> Since you mentioned you have a lot of columns and we collect bounds for nested struct fields, I am wondering if you could revert [1] locally and compare the manifest size.
>> 
>> [1] - https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f <https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f>
>> 
>>> On 19 Apr 2019, at 15:42, Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Thanks for responding Anton! Do we think the delay is mainly due to lower/upper bound filtering? have you faced this? I haven't exactly found where the slowness is yet. It's generally due to the stats filtering but what part of it is causing this much network traffic. There's CloseableIteratable  that takes a ton of time on the next() and hasNext() calls. My guess is the expression evaluation on each manifest entry is what's doing it. 
>>> 
>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <aokolnychyi@apple.com <ma...@apple.com>> wrote:
>>> I think we need to have a list of columns for which we want to collect stats and that should be configurable by the user. Maybe, this config should be applicable only to lower/upper bounds. As we now collect stats even for nested struct fields, this might generate a lot of data. In most cases, users cluster/sort their data by a subset of data columns to have fast queries with predicates on those columns. So, being able to configure columns for which to collect lower/upper bounds seems reasonable.
>>> 
>>>> On 19 Apr 2019, at 08:03, Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> >  The length in bytes of the schema is 109M as compared to 687K of the non-stats dataset. 
>>>> 
>>>> Typo, length in bytes of *manifest*. schema is the same. 
>>>> 
>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>>> Correction, partition count = 4308.
>>>> 
>>>> > Re: Changing the way we keep stats. Avro is a block splittable format and is friendly with parallel compute frameworks like Spark. 
>>>> 
>>>> Here I am trying to say that we don't need to change the format to columnar right? The current format is already friendly for parallelization. 
>>>> 
>>>> thanks.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>>> Ah, my bad. I missed adding in the schema details .. Here are some details on the dataset with stats :
>>>> 
>>>>  Iceberg Schema Columns : 20
>>>>  Spark Schema fields : 20
>>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037, changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>>>>  Manifest files :1
>>>>  Manifest details:
>>>>      => manifest file path: adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro <>
>>>>      => manifest file length: 109,028,885
>>>>      => existing files count: 0
>>>>      => added files count: 4308
>>>>      => deleted files count: 0
>>>>      => partitions count: 4
>>>>      => partition fields count: 4
>>>> 
>>>> Re: Num data files. It has a single manifest keep track of 4308 files. Total record count is 11.4 Million.
>>>> 
>>>> Re: Columns. You are right that this table has many columns.. although it has only 20 top-level columns,  num leaf columns are in order of thousands. This Schema is heavy on structs (in the thousands) and has deep levels of nesting.  I know Iceberg keeps  column_sizes, value_counts, null_value_counts for all leaf fields and additionally lower-bounds, upper-bounds for native, struct types (not yet for map KVs and arrays).  The length in bytes of the schema is 109M as compared to 687K of the non-stats dataset. 
>>>> 
>>>> Re: Turning off stats. I am looking to leverage stats coz for our datasets with much larger number of data files we want to leverage iceberg's ability to skip entire files based on these stats. This is one of the big incentives for us to use Iceberg. 
>>>> 
>>>> Re: Changing the way we keep stats. Avro is a block splittable format and is friendly with parallel compute frameworks like Spark. So would it make sense for instance to have add an option to have Spark job / Futures  handle split planning?   In a larger context, 109M is not that much metadata given that Iceberg is meant for datasets where the metadata itself is Bigdata scale.  I'm curious on how folks with larger sized metadata (in GB) are optimizing this today. 
>>>> 
>>>> 
>>>> Cheers,
>>>> -Gautam.
>>>> 
>>>>  
>>>> 
>>>> 
>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <rblue@netflix.com.invalid <ma...@netflix.com.invalid>> wrote:
>>>> Thanks for bringing this up! My initial theory is that this table has a ton of stats data that you have to read. That could happen in a couple of cases.
>>>> 
>>>> First, you might have large values in some columns. Parquet will suppress its stats if values are larger than 4k and those are what Iceberg uses. But that could still cause you to store two 1k+ objects for each large column (lower and upper bounds). With a lot of data files, that could add up quickly. The solution here is to implement #113 <https://github.com/apache/incubator-iceberg/issues/113> so that we don't store the actual min and max for string or binary columns, but instead a truncated value that is just above or just below.
>>>> 
>>>> The second case is when you have a lot of columns. Each column stores both a lower and upper bound, so 1,000 columns could easily take 8k per file. If this is the problem, then maybe we want to have a way to turn off column stats. We could also think of ways to change the way stats are stored in the manifest files, but that only helps if we move to a columnar format to store manifests, so this is probably not a short-term fix.
>>>> 
>>>> If you can share a bit more information about this table, we can probably tell which one is the problem. I'm guessing it is the large values problem.
>>>> 
>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>>> Hello folks, 
>>>> 
>>>> I have been testing Iceberg reading with and without stats built into Iceberg dataset manifest and found that there's a huge jump in network traffic with the latter..
>>>> 
>>>> 
>>>> In my test I am comparing two Iceberg datasets, both written in Iceberg format. One with and the other without stats collected in Iceberg manifests. In particular the difference between the writers used for the two datasets is this PR: https://github.com/apache/incubator-iceberg/pull/63/files <https://github.com/apache/incubator-iceberg/pull/63/files> which uses Iceberg's writers for writing Parquet data. I captured tcpdump from query scans run on these two datasets.  The partition being scanned contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets. There's a 30x jump in network traffic to the remote filesystem (ADLS) when i switch to stats based Iceberg dataset. Both queries used the same Iceberg reader code to access both datasets. 
>>>> 
>>>> ```
>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep perfanalysis.adlus15.projectcabostore.net <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l
>>>> reading from file iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB (Ethernet)
>>>> 
>>>> 8844
>>>> 
>>>> 
>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r iceberg_scratch_pad_demo_11_batch_query.pcap | grep perfanalysis.adlus15.projectcabostore.net <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l
>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap, link-type EN10MB (Ethernet)
>>>> 
>>>> 269708
>>>> 
>>>> ```
>>>> 
>>>> As a consequence of this the query response times get affected drastically (illustrated below). I must confess that I am on a slow internet connection via VPN connecting to the remote FS. But the dataset without stats took just 1m 49s while the dataset with stats took 26m 48s to read the same sized data. Most of that time in the latter dataset was spent split planning in Manifest reading and stats evaluation.
>>>> 
>>>> ```
>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>  count(1)
>>>> ----------
>>>>      3627
>>>> (1 row)
>>>> Time: 109673.202 ms (01:49.673)
>>>> 
>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId = '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>  count(1)
>>>> ----------
>>>>      3808
>>>> (1 row)
>>>> Time: 1608058.616 ms (26:48.059)
>>>> 
>>>> ```
>>>> 
>>>> Has anyone faced this? I'm wondering if there's some caching or parallelism option here that can be leveraged.  Would appreciate some guidance. If there isn't a straightforward fix and others feel this is an issue I can raise an issue and look into it further. 
>>>> 
>>>> 
>>>> Cheers,
>>>> -Gautam.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>> 
>> 
>> 
>> 
>> -- 
>> Ryan Blue
>> Software Engineer
>> Netflix
> 
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix


Re: Reading dataset with stats making lots of network traffic..

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Yes, the append code tries to keep the existing order. That way, if you
have a time-based append pattern, it just works. Similarly, if you've spent
the time to optimize the order, the append doesn't rewrite and ruin it.

Sounds like you just need to sort the file list by partition to group as
much as possible, then append. We do this when we convert tables to iceberg
to start with a good split across manifests.

On Thu, May 2, 2019 at 3:17 PM Gautam <ga...@gmail.com> wrote:

> Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic.
> So it preserves the natrual order of manifests so i guess it groups based
> on when manifests were created so the answer is whatever order the commits
> were done. If batches within multiple days were committed out of order then
> a manifest could end up with multiple days.
>
>
> On Thu, May 2, 2019 at 2:23 PM Gautam <ga...@gmail.com> wrote:
>
>> Ok, thanks for the tip on not having to by tied to a hierarchical
>> partition spec.
>>
>> Although this still doesn't explain why all the manifests are scanned,
>> coz it should be pruning the list of manifests and it's not. Is my
>> understanding correct that the manifest grouping might be re-shuffling up
>> the days so query on 1 day might map to all manifests even? Does manifest
>> merging optimize for partition boundaries or is it based on manifest's
>> natural order?
>>
>> On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> You also don't need to use year, month, and day. You can just use day.
>>>
>>> The time-based partition functions all produce ordinals, not local
>>> values: month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day
>>> and hour. In fact, I should open a PR to throw an exception when there are
>>> duplicate partition functions...
>>>
>>> On Thu, May 2, 2019 at 1:52 PM Gautam <ga...@gmail.com> wrote:
>>>
>>>> FYI .. The test Partition Spec is  :
>>>> [
>>>>   YEAR: identity(21)
>>>>   MONTH: identity(22)
>>>>   DAY: identity(23)
>>>>   batchId: identity(24)
>>>> ]
>>>>
>>>>
>>>>
>>>> On Thu, May 2, 2019 at 1:46 PM Gautam <ga...@gmail.com> wrote:
>>>>
>>>>> > Using those, you should be able to control parallelism. If you want
>>>>> to test with 4,000, then you can set the min count to 5,000 so Iceberg
>>>>> won’t compact manifests.
>>>>>
>>>>> This is helpful. Thanks for the pointer on increasing parallelism.
>>>>> Will try this out. So I understand the behaviour, if a different dataset
>>>>> has >=5000  batches then the resultant # manifests would be
>>>>> (total_num_batches % 5000 ) ?
>>>>>
>>>>> > What surprises me is that you’re not getting much benefit from
>>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>>>> it.
>>>>>
>>>>> Pardon the verbose example but i think it'l help explain what i'm
>>>>> seeing ..
>>>>>
>>>>> Regarding manifest filtering:  I tested if partition filters in sql
>>>>> query actually reduce manifests being inspected. In my example, i have 16
>>>>> manifests that point to 4000 batch partitions ( each file is restricted to
>>>>> one partition as we'r using physical partitioning in the table ).  So when
>>>>> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
>>>>> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
>>>>> 16) , right? But i see that all 16 are being inspected in
>>>>> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
>>>>> that should be giving me the manifests that match a partition. When I
>>>>> inspect this  it says `matchingManifests = 16` ,  which is all the
>>>>> manifests in the table.  This *could* be due to the fact that our
>>>>> batch ids are random UUIDs so lower/upper bounds may not help coz there's
>>>>> no inherent ordering amongst batches.
>>>>> But then  i tried year = 2019 and month = 01 and day = 01 which also
>>>>> scanned all manifests. Could this be due to the way Iceberg manifests are
>>>>> re-grouped and merged? If so, shouldn't re-grouping honour partition
>>>>> boundaries and optimize for it?
>>>>>
>>>>>
>>>>> Cheers,
>>>>> -Gautam.
>>>>>
>>>>> [1] -
>>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>>>>>
>>>>>
>>>>> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> Good questions. Grouping manifests is configurable at the table
>>>>>> level. There are 2 settings:
>>>>>>
>>>>>>    - commit.manifest.target-size-bytes defaults to 8MB, this is the
>>>>>>    target size that Iceberg will compact to
>>>>>>    - commit.manifest.min-count-to-merge defaults to 100, this is the
>>>>>>    minimum number of files before a compaction is triggered
>>>>>>
>>>>>> Using those, you should be able to control parallelism. If you want
>>>>>> to test with 4,000, then you can set the min count to 5,000 so Iceberg
>>>>>> won’t compact manifests.
>>>>>>
>>>>>> What surprises me is that you’re not getting much benefit from
>>>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>>>>> it. You might try sorting the data files by partition before adding them to
>>>>>> the table. That will cluster data files in the same partition so you can
>>>>>> read fewer manifests.
>>>>>>
>>>>>> On Thu, May 2, 2019 at 12:09 PM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Anton,
>>>>>>>             Sorry bout the delay on this. Been caught up with some
>>>>>>> other things. Thanks for raising issue#173 .
>>>>>>>
>>>>>>> So the root cause is indeed the density and size of the schema.
>>>>>>> While I agree the option to configure stats for columns is good (although
>>>>>>> i'm not fully convinced that this is purely due to lower/upper bounds). For
>>>>>>> instance, maybe it's just taking a while to iterate over manifest rows and
>>>>>>> deserialize the DataFile stats in each read?  The solution i'm using right
>>>>>>> now is to parallelize the manifest reading in split planning. We
>>>>>>> regenerated the Iceberg table with more manifests. Now the code enables the
>>>>>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>>>>>>> default, configurable using 'iceberg.worker.num-threads' ) to read
>>>>>>> manifests.
>>>>>>>
>>>>>>> On that note, the ability to parallelize is limited to how many
>>>>>>> manifests are in the table. So as a test, for a table with 4000 files we
>>>>>>> created one manifest per file (think of one file as a single batch commit
>>>>>>> in this case). So I was hoping to get a parallelism factor of 4000. But
>>>>>>> Iceberg summarizes manifests into fewer manifests with each commit so we
>>>>>>> instead ended up with 16 manifests. So now split planning is limited to
>>>>>>> reading at most 16 units of parallelism. Is this grouping of manifests into
>>>>>>> fewer configurable? if not should we allow making this configurable?
>>>>>>>
>>>>>>> Sorry if this is forking a different conversation. If so, I can
>>>>>>> start a separate conversation thread on this.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <
>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>
>>>>>>>> Hey Gautam,
>>>>>>>>
>>>>>>>> Out of my curiosity, did you manage to confirm the root cause of
>>>>>>>> the issue?
>>>>>>>>
>>>>>>>> P.S. I created [1] so that we can make collection of lower/upper
>>>>>>>> bounds configurable.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Anton
>>>>>>>>
>>>>>>>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>>>>>>>
>>>>>>>> On 22 Apr 2019, at 09:15, Gautam <ga...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Thanks guys for the insights ..
>>>>>>>>
>>>>>>>> > I like Anton's idea to have an optional list of columns for which
>>>>>>>> we keep stats. That would allow us to avoid storing stats for thousands of
>>>>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>>>>
>>>>>>>> This makes sense to me. It adds a variable that data pipelines can
>>>>>>>> tweak on to improve performance. I will add an issue on Github to add a
>>>>>>>> stats config/flag. Although, having said that, I would try to optimize
>>>>>>>> around this coz read patterns are hardly ever known a priori and adding a
>>>>>>>> column to this list means having to re-write the entire data again. So i'l
>>>>>>>> try the other suggestion which is parallelizing on multiple manifests.
>>>>>>>>
>>>>>>>> >  To clarify my comment on changing the storage: the idea is to
>>>>>>>> use separate columns instead of a map and then use a columnar storage
>>>>>>>> format so we can project those columns independently. Avro can't project
>>>>>>>> columns independently. This wouldn't help on the write side and may just
>>>>>>>> cause a lot of seeking on the read side that diminishes the benefits.
>>>>>>>>
>>>>>>>> Gotcha.
>>>>>>>>
>>>>>>>> > Also, now that we have more details, I think there is a second
>>>>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>>>>> time.
>>>>>>>>
>>>>>>>> This might actually be the issue here, this was a test bench
>>>>>>>> dataset so the writer job created a single manifest for all the data in the
>>>>>>>> dataset which isn't really how we will do things in prod. I'l try and
>>>>>>>> create the metadata based on productions expected commit pattern.
>>>>>>>>
>>>>>>>>
>>>>>>>> Regarding Iceberg not truncating large bounded column values
>>>>>>>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
>>>>>>>> consider this with our dataset. The current evidence is leading towards the
>>>>>>>> number of columns and the sheer number of files that the manifest is
>>>>>>>> maintaining but this is a good thing to look into.
>>>>>>>>
>>>>>>>> Thanks again guys.
>>>>>>>>
>>>>>>>> -Gautam.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I like Anton's idea to have an optional list of columns for which
>>>>>>>>> we keep stats. That would allow us to avoid storing stats for thousands of
>>>>>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>>>>>
>>>>>>>>> To clarify my comment on changing the storage: the idea is to use
>>>>>>>>> separate columns instead of a map and then use a columnar storage format so
>>>>>>>>> we can project those columns independently. Avro can't project columns
>>>>>>>>> independently. This wouldn't help on the write side and may just cause a
>>>>>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>>>>>
>>>>>>>>> Also, now that we have more details, I think there is a second
>>>>>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>>>>>> time.
>>>>>>>>>
>>>>>>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <
>>>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>>>
>>>>>>>>>> No, we haven’t experienced it yet. The manifest size is huge in
>>>>>>>>>> your case. To me, Ryan is correct: it might be either big lower/upper
>>>>>>>>>> bounds (then truncation will help) or a big number columns (then collecting
>>>>>>>>>> lower/upper bounds only for specific columns will help). I think both
>>>>>>>>>> optimizations are needed and will reduce the manifest size.
>>>>>>>>>>
>>>>>>>>>> Since you mentioned you have a lot of columns and we collect
>>>>>>>>>> bounds for nested struct fields, I am wondering if you could revert [1]
>>>>>>>>>> locally and compare the manifest size.
>>>>>>>>>>
>>>>>>>>>> [1] -
>>>>>>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>>>>>>>
>>>>>>>>>> On 19 Apr 2019, at 15:42, Gautam <ga...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks for responding Anton! Do we think the delay is mainly due
>>>>>>>>>> to lower/upper bound filtering? have you faced this? I haven't exactly
>>>>>>>>>> found where the slowness is yet. It's generally due to the stats filtering
>>>>>>>>>> but what part of it is causing this much network traffic. There's
>>>>>>>>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>>>>>>>>> calls. My guess is the expression evaluation on each manifest entry is
>>>>>>>>>> what's doing it.
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <
>>>>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think we need to have a list of columns for which we want to
>>>>>>>>>>> collect stats and that should be configurable by the user. Maybe, this
>>>>>>>>>>> config should be applicable only to lower/upper bounds. As we now collect
>>>>>>>>>>> stats even for nested struct fields, this might generate a lot of data. In
>>>>>>>>>>> most cases, users cluster/sort their data by a subset of data columns to
>>>>>>>>>>> have fast queries with predicates on those columns. So, being able to
>>>>>>>>>>> configure columns for which to collect lower/upper bounds seems reasonable.
>>>>>>>>>>>
>>>>>>>>>>> On 19 Apr 2019, at 08:03, Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> >  The length in bytes of the schema is 109M as compared to 687K
>>>>>>>>>>> of the non-stats dataset.
>>>>>>>>>>>
>>>>>>>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Correction, partition count = 4308.
>>>>>>>>>>>>
>>>>>>>>>>>> > Re: Changing the way we keep stats. Avro is a block
>>>>>>>>>>>> splittable format and is friendly with parallel compute frameworks like
>>>>>>>>>>>> Spark.
>>>>>>>>>>>>
>>>>>>>>>>>> Here I am trying to say that we don't need to change the format
>>>>>>>>>>>> to columnar right? The current format is already friendly for
>>>>>>>>>>>> parallelization.
>>>>>>>>>>>>
>>>>>>>>>>>> thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <
>>>>>>>>>>>> gautamkowshik@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are
>>>>>>>>>>>>> some details on the dataset with stats :
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>>>>>>>  Spark Schema fields : 20
>>>>>>>>>>>>>  Snapshot Summary :{added-data-files=4308,
>>>>>>>>>>>>> added-records=11494037, changed-partition-count=4308,
>>>>>>>>>>>>> total-records=11494037, total-data-files=4308}
>>>>>>>>>>>>>  Manifest files :1
>>>>>>>>>>>>>  Manifest details:
>>>>>>>>>>>>>      => manifest file path:
>>>>>>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>>>>>>>      => manifest file length: 109,028,885
>>>>>>>>>>>>>      => existing files count: 0
>>>>>>>>>>>>>      => added files count: 4308
>>>>>>>>>>>>>      => deleted files count: 0
>>>>>>>>>>>>>      => partitions count: 4
>>>>>>>>>>>>>      => partition fields count: 4
>>>>>>>>>>>>>
>>>>>>>>>>>>> Re: Num data files. It has a single manifest keep track of
>>>>>>>>>>>>> 4308 files. Total record count is 11.4 Million.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>>>>>>>> although it has only 20 top-level columns,  num leaf columns are in order
>>>>>>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and has
>>>>>>>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf
>>>>>>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for
>>>>>>>>>>>>> native, struct types (not yet for map KVs and arrays).  The length in bytes
>>>>>>>>>>>>> of the schema is 109M as compared to 687K of the non-stats dataset.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for
>>>>>>>>>>>>> our datasets with much larger number of data files we want to leverage
>>>>>>>>>>>>> iceberg's ability to skip entire files based on these stats. This is one of
>>>>>>>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>>>>>>> format and is friendly with parallel compute frameworks like Spark. So
>>>>>>>>>>>>> would it make sense for instance to have add an option to have Spark job /
>>>>>>>>>>>>> Futures  handle split planning?   In a larger context, 109M is not that
>>>>>>>>>>>>> much metadata given that Iceberg is meant for datasets where the metadata
>>>>>>>>>>>>> itself is Bigdata scale.  I'm curious on how folks with larger sized
>>>>>>>>>>>>> metadata (in GB) are optimizing this today.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>>>>>>>> rblue@netflix.com.invalid> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for bringing this up! My initial theory is that this
>>>>>>>>>>>>>> table has a ton of stats data that you have to read. That could happen in a
>>>>>>>>>>>>>> couple of cases.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> First, you might have large values in some columns. Parquet
>>>>>>>>>>>>>> will suppress its stats if values are larger than 4k and those are what
>>>>>>>>>>>>>> Iceberg uses. But that could still cause you to store two 1k+ objects for
>>>>>>>>>>>>>> each large column (lower and upper bounds). With a lot of data files, that
>>>>>>>>>>>>>> could add up quickly. The solution here is to implement #113
>>>>>>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so
>>>>>>>>>>>>>> that we don't store the actual min and max for string or binary columns,
>>>>>>>>>>>>>> but instead a truncated value that is just above or just below.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The second case is when you have a lot of columns. Each
>>>>>>>>>>>>>> column stores both a lower and upper bound, so 1,000 columns could easily
>>>>>>>>>>>>>> take 8k per file. If this is the problem, then maybe we want to have a way
>>>>>>>>>>>>>> to turn off column stats. We could also think of ways to change the way
>>>>>>>>>>>>>> stats are stored in the manifest files, but that only helps if we move to a
>>>>>>>>>>>>>> columnar format to store manifests, so this is probably not a short-term
>>>>>>>>>>>>>> fix.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If you can share a bit more information about this table, we
>>>>>>>>>>>>>> can probably tell which one is the problem. I'm guessing it is the large
>>>>>>>>>>>>>> values problem.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <
>>>>>>>>>>>>>> gautamkowshik@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have been testing Iceberg reading with and without stats
>>>>>>>>>>>>>>> built into Iceberg dataset manifest and found that there's a huge jump in
>>>>>>>>>>>>>>> network traffic with the latter..
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In my test I am comparing two Iceberg datasets, both written
>>>>>>>>>>>>>>> in Iceberg format. One with and the other without stats collected in
>>>>>>>>>>>>>>> Iceberg manifests. In particular the difference between the writers used
>>>>>>>>>>>>>>> for the two datasets is this PR:
>>>>>>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump from
>>>>>>>>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets.
>>>>>>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem (ADLS) when
>>>>>>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same Iceberg
>>>>>>>>>>>>>>> reader code to access both datasets.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>>>>>> reading from file
>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>>>>>>>>> (Ethernet)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *8844*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>>>>>> reading from file
>>>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap, link-type EN10MB (Ethernet)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *269708*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> As a consequence of this the query response times get
>>>>>>>>>>>>>>> affected drastically (illustrated below). I must confess that I am on a
>>>>>>>>>>>>>>> slow internet connection via VPN connecting to the remote FS. But the
>>>>>>>>>>>>>>> dataset without stats took just 1m 49s while the dataset with stats took
>>>>>>>>>>>>>>> 26m 48s to read the same sized data. Most of that time in the latter
>>>>>>>>>>>>>>> dataset was spent split planning in Manifest reading and stats evaluation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>> all=> select count(*)  from
>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues where batchId =
>>>>>>>>>>>>>>> '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>>>>>>>  count(1)
>>>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>>>      3627
>>>>>>>>>>>>>>> (1 row)
>>>>>>>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11
>>>>>>>>>>>>>>> where _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>>>>>>>  count(1)
>>>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>>>      3808
>>>>>>>>>>>>>>> (1 row)
>>>>>>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching
>>>>>>>>>>>>>>> or parallelism option here that can be leveraged.  Would appreciate some
>>>>>>>>>>>>>>> guidance. If there isn't a straightforward fix and others feel this is an
>>>>>>>>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>>> Netflix
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Reading dataset with stats making lots of network traffic..

Posted by Gautam <ga...@gmail.com>.
Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic. So
it preserves the natrual order of manifests so i guess it groups based on
when manifests were created so the answer is whatever order the commits
were done. If batches within multiple days were committed out of order then
a manifest could end up with multiple days.


On Thu, May 2, 2019 at 2:23 PM Gautam <ga...@gmail.com> wrote:

> Ok, thanks for the tip on not having to by tied to a hierarchical
> partition spec.
>
> Although this still doesn't explain why all the manifests are scanned,
> coz it should be pruning the list of manifests and it's not. Is my
> understanding correct that the manifest grouping might be re-shuffling up
> the days so query on 1 day might map to all manifests even? Does manifest
> merging optimize for partition boundaries or is it based on manifest's
> natural order?
>
> On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> You also don't need to use year, month, and day. You can just use day.
>>
>> The time-based partition functions all produce ordinals, not local
>> values: month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day
>> and hour. In fact, I should open a PR to throw an exception when there are
>> duplicate partition functions...
>>
>> On Thu, May 2, 2019 at 1:52 PM Gautam <ga...@gmail.com> wrote:
>>
>>> FYI .. The test Partition Spec is  :
>>> [
>>>   YEAR: identity(21)
>>>   MONTH: identity(22)
>>>   DAY: identity(23)
>>>   batchId: identity(24)
>>> ]
>>>
>>>
>>>
>>> On Thu, May 2, 2019 at 1:46 PM Gautam <ga...@gmail.com> wrote:
>>>
>>>> > Using those, you should be able to control parallelism. If you want
>>>> to test with 4,000, then you can set the min count to 5,000 so Iceberg
>>>> won’t compact manifests.
>>>>
>>>> This is helpful. Thanks for the pointer on increasing parallelism. Will
>>>> try this out. So I understand the behaviour, if a different dataset has
>>>> >=5000  batches then the resultant # manifests would be (total_num_batches
>>>> % 5000 ) ?
>>>>
>>>> > What surprises me is that you’re not getting much benefit from
>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>>> it.
>>>>
>>>> Pardon the verbose example but i think it'l help explain what i'm
>>>> seeing ..
>>>>
>>>> Regarding manifest filtering:  I tested if partition filters in sql
>>>> query actually reduce manifests being inspected. In my example, i have 16
>>>> manifests that point to 4000 batch partitions ( each file is restricted to
>>>> one partition as we'r using physical partitioning in the table ).  So when
>>>> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
>>>> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
>>>> 16) , right? But i see that all 16 are being inspected in
>>>> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
>>>> that should be giving me the manifests that match a partition. When I
>>>> inspect this  it says `matchingManifests = 16` ,  which is all the
>>>> manifests in the table.  This *could* be due to the fact that our
>>>> batch ids are random UUIDs so lower/upper bounds may not help coz there's
>>>> no inherent ordering amongst batches.
>>>> But then  i tried year = 2019 and month = 01 and day = 01 which also
>>>> scanned all manifests. Could this be due to the way Iceberg manifests are
>>>> re-grouped and merged? If so, shouldn't re-grouping honour partition
>>>> boundaries and optimize for it?
>>>>
>>>>
>>>> Cheers,
>>>> -Gautam.
>>>>
>>>> [1] -
>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>>>>
>>>>
>>>> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>>> Good questions. Grouping manifests is configurable at the table level.
>>>>> There are 2 settings:
>>>>>
>>>>>    - commit.manifest.target-size-bytes defaults to 8MB, this is the
>>>>>    target size that Iceberg will compact to
>>>>>    - commit.manifest.min-count-to-merge defaults to 100, this is the
>>>>>    minimum number of files before a compaction is triggered
>>>>>
>>>>> Using those, you should be able to control parallelism. If you want to
>>>>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
>>>>> compact manifests.
>>>>>
>>>>> What surprises me is that you’re not getting much benefit from
>>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>>>> it. You might try sorting the data files by partition before adding them to
>>>>> the table. That will cluster data files in the same partition so you can
>>>>> read fewer manifests.
>>>>>
>>>>> On Thu, May 2, 2019 at 12:09 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey Anton,
>>>>>>             Sorry bout the delay on this. Been caught up with some
>>>>>> other things. Thanks for raising issue#173 .
>>>>>>
>>>>>> So the root cause is indeed the density and size of the schema. While
>>>>>> I agree the option to configure stats for columns is good (although i'm not
>>>>>> fully convinced that this is purely due to lower/upper bounds). For
>>>>>> instance, maybe it's just taking a while to iterate over manifest rows and
>>>>>> deserialize the DataFile stats in each read?  The solution i'm using right
>>>>>> now is to parallelize the manifest reading in split planning. We
>>>>>> regenerated the Iceberg table with more manifests. Now the code enables the
>>>>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>>>>>> default, configurable using 'iceberg.worker.num-threads' ) to read
>>>>>> manifests.
>>>>>>
>>>>>> On that note, the ability to parallelize is limited to how many
>>>>>> manifests are in the table. So as a test, for a table with 4000 files we
>>>>>> created one manifest per file (think of one file as a single batch commit
>>>>>> in this case). So I was hoping to get a parallelism factor of 4000. But
>>>>>> Iceberg summarizes manifests into fewer manifests with each commit so we
>>>>>> instead ended up with 16 manifests. So now split planning is limited to
>>>>>> reading at most 16 units of parallelism. Is this grouping of manifests into
>>>>>> fewer configurable? if not should we allow making this configurable?
>>>>>>
>>>>>> Sorry if this is forking a different conversation. If so, I can start
>>>>>> a separate conversation thread on this.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <
>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>
>>>>>>> Hey Gautam,
>>>>>>>
>>>>>>> Out of my curiosity, did you manage to confirm the root cause of the
>>>>>>> issue?
>>>>>>>
>>>>>>> P.S. I created [1] so that we can make collection of lower/upper
>>>>>>> bounds configurable.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Anton
>>>>>>>
>>>>>>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>>>>>>
>>>>>>> On 22 Apr 2019, at 09:15, Gautam <ga...@gmail.com> wrote:
>>>>>>>
>>>>>>> Thanks guys for the insights ..
>>>>>>>
>>>>>>> > I like Anton's idea to have an optional list of columns for which
>>>>>>> we keep stats. That would allow us to avoid storing stats for thousands of
>>>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>>>
>>>>>>> This makes sense to me. It adds a variable that data pipelines can
>>>>>>> tweak on to improve performance. I will add an issue on Github to add a
>>>>>>> stats config/flag. Although, having said that, I would try to optimize
>>>>>>> around this coz read patterns are hardly ever known a priori and adding a
>>>>>>> column to this list means having to re-write the entire data again. So i'l
>>>>>>> try the other suggestion which is parallelizing on multiple manifests.
>>>>>>>
>>>>>>> >  To clarify my comment on changing the storage: the idea is to use
>>>>>>> separate columns instead of a map and then use a columnar storage format so
>>>>>>> we can project those columns independently. Avro can't project columns
>>>>>>> independently. This wouldn't help on the write side and may just cause a
>>>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>>>
>>>>>>> Gotcha.
>>>>>>>
>>>>>>> > Also, now that we have more details, I think there is a second
>>>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>>>> time.
>>>>>>>
>>>>>>> This might actually be the issue here, this was a test bench dataset
>>>>>>> so the writer job created a single manifest for all the data in the dataset
>>>>>>> which isn't really how we will do things in prod. I'l try and create the
>>>>>>> metadata based on productions expected commit pattern.
>>>>>>>
>>>>>>>
>>>>>>> Regarding Iceberg not truncating large bounded column values
>>>>>>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
>>>>>>> consider this with our dataset. The current evidence is leading towards the
>>>>>>> number of columns and the sheer number of files that the manifest is
>>>>>>> maintaining but this is a good thing to look into.
>>>>>>>
>>>>>>> Thanks again guys.
>>>>>>>
>>>>>>> -Gautam.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>>
>>>>>>>> I like Anton's idea to have an optional list of columns for which
>>>>>>>> we keep stats. That would allow us to avoid storing stats for thousands of
>>>>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>>>>
>>>>>>>> To clarify my comment on changing the storage: the idea is to use
>>>>>>>> separate columns instead of a map and then use a columnar storage format so
>>>>>>>> we can project those columns independently. Avro can't project columns
>>>>>>>> independently. This wouldn't help on the write side and may just cause a
>>>>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>>>>
>>>>>>>> Also, now that we have more details, I think there is a second
>>>>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>>>>> time.
>>>>>>>>
>>>>>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <
>>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>>
>>>>>>>>> No, we haven’t experienced it yet. The manifest size is huge in
>>>>>>>>> your case. To me, Ryan is correct: it might be either big lower/upper
>>>>>>>>> bounds (then truncation will help) or a big number columns (then collecting
>>>>>>>>> lower/upper bounds only for specific columns will help). I think both
>>>>>>>>> optimizations are needed and will reduce the manifest size.
>>>>>>>>>
>>>>>>>>> Since you mentioned you have a lot of columns and we collect
>>>>>>>>> bounds for nested struct fields, I am wondering if you could revert [1]
>>>>>>>>> locally and compare the manifest size.
>>>>>>>>>
>>>>>>>>> [1] -
>>>>>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>>>>>>
>>>>>>>>> On 19 Apr 2019, at 15:42, Gautam <ga...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Thanks for responding Anton! Do we think the delay is mainly due
>>>>>>>>> to lower/upper bound filtering? have you faced this? I haven't exactly
>>>>>>>>> found where the slowness is yet. It's generally due to the stats filtering
>>>>>>>>> but what part of it is causing this much network traffic. There's
>>>>>>>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>>>>>>>> calls. My guess is the expression evaluation on each manifest entry is
>>>>>>>>> what's doing it.
>>>>>>>>>
>>>>>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <
>>>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>>>
>>>>>>>>>> I think we need to have a list of columns for which we want to
>>>>>>>>>> collect stats and that should be configurable by the user. Maybe, this
>>>>>>>>>> config should be applicable only to lower/upper bounds. As we now collect
>>>>>>>>>> stats even for nested struct fields, this might generate a lot of data. In
>>>>>>>>>> most cases, users cluster/sort their data by a subset of data columns to
>>>>>>>>>> have fast queries with predicates on those columns. So, being able to
>>>>>>>>>> configure columns for which to collect lower/upper bounds seems reasonable.
>>>>>>>>>>
>>>>>>>>>> On 19 Apr 2019, at 08:03, Gautam <ga...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> >  The length in bytes of the schema is 109M as compared to 687K
>>>>>>>>>> of the non-stats dataset.
>>>>>>>>>>
>>>>>>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <ga...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Correction, partition count = 4308.
>>>>>>>>>>>
>>>>>>>>>>> > Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>>>>> format and is friendly with parallel compute frameworks like Spark.
>>>>>>>>>>>
>>>>>>>>>>> Here I am trying to say that we don't need to change the format
>>>>>>>>>>> to columnar right? The current format is already friendly for
>>>>>>>>>>> parallelization.
>>>>>>>>>>>
>>>>>>>>>>> thanks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are
>>>>>>>>>>>> some details on the dataset with stats :
>>>>>>>>>>>>
>>>>>>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>>>>>>  Spark Schema fields : 20
>>>>>>>>>>>>  Snapshot Summary :{added-data-files=4308,
>>>>>>>>>>>> added-records=11494037, changed-partition-count=4308,
>>>>>>>>>>>> total-records=11494037, total-data-files=4308}
>>>>>>>>>>>>  Manifest files :1
>>>>>>>>>>>>  Manifest details:
>>>>>>>>>>>>      => manifest file path:
>>>>>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>>>>>>      => manifest file length: 109,028,885
>>>>>>>>>>>>      => existing files count: 0
>>>>>>>>>>>>      => added files count: 4308
>>>>>>>>>>>>      => deleted files count: 0
>>>>>>>>>>>>      => partitions count: 4
>>>>>>>>>>>>      => partition fields count: 4
>>>>>>>>>>>>
>>>>>>>>>>>> Re: Num data files. It has a single manifest keep track of 4308
>>>>>>>>>>>> files. Total record count is 11.4 Million.
>>>>>>>>>>>>
>>>>>>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>>>>>>> although it has only 20 top-level columns,  num leaf columns are in order
>>>>>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and has
>>>>>>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf
>>>>>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for
>>>>>>>>>>>> native, struct types (not yet for map KVs and arrays).  The length in bytes
>>>>>>>>>>>> of the schema is 109M as compared to 687K of the non-stats dataset.
>>>>>>>>>>>>
>>>>>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for
>>>>>>>>>>>> our datasets with much larger number of data files we want to leverage
>>>>>>>>>>>> iceberg's ability to skip entire files based on these stats. This is one of
>>>>>>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>>>>>>
>>>>>>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>>>>>> format and is friendly with parallel compute frameworks like Spark. So
>>>>>>>>>>>> would it make sense for instance to have add an option to have Spark job /
>>>>>>>>>>>> Futures  handle split planning?   In a larger context, 109M is not that
>>>>>>>>>>>> much metadata given that Iceberg is meant for datasets where the metadata
>>>>>>>>>>>> itself is Bigdata scale.  I'm curious on how folks with larger sized
>>>>>>>>>>>> metadata (in GB) are optimizing this today.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>>>>>>> rblue@netflix.com.invalid> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for bringing this up! My initial theory is that this
>>>>>>>>>>>>> table has a ton of stats data that you have to read. That could happen in a
>>>>>>>>>>>>> couple of cases.
>>>>>>>>>>>>>
>>>>>>>>>>>>> First, you might have large values in some columns. Parquet
>>>>>>>>>>>>> will suppress its stats if values are larger than 4k and those are what
>>>>>>>>>>>>> Iceberg uses. But that could still cause you to store two 1k+ objects for
>>>>>>>>>>>>> each large column (lower and upper bounds). With a lot of data files, that
>>>>>>>>>>>>> could add up quickly. The solution here is to implement #113
>>>>>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so
>>>>>>>>>>>>> that we don't store the actual min and max for string or binary columns,
>>>>>>>>>>>>> but instead a truncated value that is just above or just below.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The second case is when you have a lot of columns. Each column
>>>>>>>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily take 8k
>>>>>>>>>>>>> per file. If this is the problem, then maybe we want to have a way to turn
>>>>>>>>>>>>> off column stats. We could also think of ways to change the way stats are
>>>>>>>>>>>>> stored in the manifest files, but that only helps if we move to a columnar
>>>>>>>>>>>>> format to store manifests, so this is probably not a short-term fix.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If you can share a bit more information about this table, we
>>>>>>>>>>>>> can probably tell which one is the problem. I'm guessing it is the large
>>>>>>>>>>>>> values problem.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <
>>>>>>>>>>>>> gautamkowshik@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have been testing Iceberg reading with and without stats
>>>>>>>>>>>>>> built into Iceberg dataset manifest and found that there's a huge jump in
>>>>>>>>>>>>>> network traffic with the latter..
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In my test I am comparing two Iceberg datasets, both written
>>>>>>>>>>>>>> in Iceberg format. One with and the other without stats collected in
>>>>>>>>>>>>>> Iceberg manifests. In particular the difference between the writers used
>>>>>>>>>>>>>> for the two datasets is this PR:
>>>>>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump from
>>>>>>>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets.
>>>>>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem (ADLS) when
>>>>>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same Iceberg
>>>>>>>>>>>>>> reader code to access both datasets.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>>>>> reading from file
>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>>>>>>>> (Ethernet)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *8844*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>>>>> reading from file
>>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap, link-type EN10MB (Ethernet)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *269708*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As a consequence of this the query response times get
>>>>>>>>>>>>>> affected drastically (illustrated below). I must confess that I am on a
>>>>>>>>>>>>>> slow internet connection via VPN connecting to the remote FS. But the
>>>>>>>>>>>>>> dataset without stats took just 1m 49s while the dataset with stats took
>>>>>>>>>>>>>> 26m 48s to read the same sized data. Most of that time in the latter
>>>>>>>>>>>>>> dataset was spent split planning in Manifest reading and stats evaluation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>> all=> select count(*)  from
>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues where batchId =
>>>>>>>>>>>>>> '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>>>>>>  count(1)
>>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>>      3627
>>>>>>>>>>>>>> (1 row)
>>>>>>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11
>>>>>>>>>>>>>> where _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>>>>>>  count(1)
>>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>>      3808
>>>>>>>>>>>>>> (1 row)
>>>>>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching
>>>>>>>>>>>>>> or parallelism option here that can be leveraged.  Would appreciate some
>>>>>>>>>>>>>> guidance. If there isn't a straightforward fix and others feel this is an
>>>>>>>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>> Netflix
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Software Engineer
>>>>>>>> Netflix
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Re: Reading dataset with stats making lots of network traffic..

Posted by Gautam <ga...@gmail.com>.
Ok, thanks for the tip on not having to by tied to a hierarchical
partition spec.

Although this still doesn't explain why all the manifests are scanned,  coz
it should be pruning the list of manifests and it's not. Is my
understanding correct that the manifest grouping might be re-shuffling up
the days so query on 1 day might map to all manifests even? Does manifest
merging optimize for partition boundaries or is it based on manifest's
natural order?

On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rb...@netflix.com> wrote:

> You also don't need to use year, month, and day. You can just use day.
>
> The time-based partition functions all produce ordinals, not local values:
> month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day and hour.
> In fact, I should open a PR to throw an exception when there are duplicate
> partition functions...
>
> On Thu, May 2, 2019 at 1:52 PM Gautam <ga...@gmail.com> wrote:
>
>> FYI .. The test Partition Spec is  :
>> [
>>   YEAR: identity(21)
>>   MONTH: identity(22)
>>   DAY: identity(23)
>>   batchId: identity(24)
>> ]
>>
>>
>>
>> On Thu, May 2, 2019 at 1:46 PM Gautam <ga...@gmail.com> wrote:
>>
>>> > Using those, you should be able to control parallelism. If you want to
>>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
>>> compact manifests.
>>>
>>> This is helpful. Thanks for the pointer on increasing parallelism. Will
>>> try this out. So I understand the behaviour, if a different dataset has
>>> >=5000  batches then the resultant # manifests would be (total_num_batches
>>> % 5000 ) ?
>>>
>>> > What surprises me is that you’re not getting much benefit from
>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>> it.
>>>
>>> Pardon the verbose example but i think it'l help explain what i'm seeing
>>> ..
>>>
>>> Regarding manifest filtering:  I tested if partition filters in sql
>>> query actually reduce manifests being inspected. In my example, i have 16
>>> manifests that point to 4000 batch partitions ( each file is restricted to
>>> one partition as we'r using physical partitioning in the table ).  So when
>>> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
>>> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
>>> 16) , right? But i see that all 16 are being inspected in
>>> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
>>> that should be giving me the manifests that match a partition. When I
>>> inspect this  it says `matchingManifests = 16` ,  which is all the
>>> manifests in the table.  This *could* be due to the fact that our batch
>>> ids are random UUIDs so lower/upper bounds may not help coz there's no
>>> inherent ordering amongst batches.
>>> But then  i tried year = 2019 and month = 01 and day = 01 which also
>>> scanned all manifests. Could this be due to the way Iceberg manifests are
>>> re-grouped and merged? If so, shouldn't re-grouping honour partition
>>> boundaries and optimize for it?
>>>
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>> [1] -
>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>>>
>>>
>>> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> Good questions. Grouping manifests is configurable at the table level.
>>>> There are 2 settings:
>>>>
>>>>    - commit.manifest.target-size-bytes defaults to 8MB, this is the
>>>>    target size that Iceberg will compact to
>>>>    - commit.manifest.min-count-to-merge defaults to 100, this is the
>>>>    minimum number of files before a compaction is triggered
>>>>
>>>> Using those, you should be able to control parallelism. If you want to
>>>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
>>>> compact manifests.
>>>>
>>>> What surprises me is that you’re not getting much benefit from
>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>>> it. You might try sorting the data files by partition before adding them to
>>>> the table. That will cluster data files in the same partition so you can
>>>> read fewer manifests.
>>>>
>>>> On Thu, May 2, 2019 at 12:09 PM Gautam <ga...@gmail.com> wrote:
>>>>
>>>>> Hey Anton,
>>>>>             Sorry bout the delay on this. Been caught up with some
>>>>> other things. Thanks for raising issue#173 .
>>>>>
>>>>> So the root cause is indeed the density and size of the schema. While
>>>>> I agree the option to configure stats for columns is good (although i'm not
>>>>> fully convinced that this is purely due to lower/upper bounds). For
>>>>> instance, maybe it's just taking a while to iterate over manifest rows and
>>>>> deserialize the DataFile stats in each read?  The solution i'm using right
>>>>> now is to parallelize the manifest reading in split planning. We
>>>>> regenerated the Iceberg table with more manifests. Now the code enables the
>>>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>>>>> default, configurable using 'iceberg.worker.num-threads' ) to read
>>>>> manifests.
>>>>>
>>>>> On that note, the ability to parallelize is limited to how many
>>>>> manifests are in the table. So as a test, for a table with 4000 files we
>>>>> created one manifest per file (think of one file as a single batch commit
>>>>> in this case). So I was hoping to get a parallelism factor of 4000. But
>>>>> Iceberg summarizes manifests into fewer manifests with each commit so we
>>>>> instead ended up with 16 manifests. So now split planning is limited to
>>>>> reading at most 16 units of parallelism. Is this grouping of manifests into
>>>>> fewer configurable? if not should we allow making this configurable?
>>>>>
>>>>> Sorry if this is forking a different conversation. If so, I can start
>>>>> a separate conversation thread on this.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <ao...@apple.com>
>>>>> wrote:
>>>>>
>>>>>> Hey Gautam,
>>>>>>
>>>>>> Out of my curiosity, did you manage to confirm the root cause of the
>>>>>> issue?
>>>>>>
>>>>>> P.S. I created [1] so that we can make collection of lower/upper
>>>>>> bounds configurable.
>>>>>>
>>>>>> Thanks,
>>>>>> Anton
>>>>>>
>>>>>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>>>>>
>>>>>> On 22 Apr 2019, at 09:15, Gautam <ga...@gmail.com> wrote:
>>>>>>
>>>>>> Thanks guys for the insights ..
>>>>>>
>>>>>> > I like Anton's idea to have an optional list of columns for which
>>>>>> we keep stats. That would allow us to avoid storing stats for thousands of
>>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>>
>>>>>> This makes sense to me. It adds a variable that data pipelines can
>>>>>> tweak on to improve performance. I will add an issue on Github to add a
>>>>>> stats config/flag. Although, having said that, I would try to optimize
>>>>>> around this coz read patterns are hardly ever known a priori and adding a
>>>>>> column to this list means having to re-write the entire data again. So i'l
>>>>>> try the other suggestion which is parallelizing on multiple manifests.
>>>>>>
>>>>>> >  To clarify my comment on changing the storage: the idea is to use
>>>>>> separate columns instead of a map and then use a columnar storage format so
>>>>>> we can project those columns independently. Avro can't project columns
>>>>>> independently. This wouldn't help on the write side and may just cause a
>>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>>
>>>>>> Gotcha.
>>>>>>
>>>>>> > Also, now that we have more details, I think there is a second
>>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>>> time.
>>>>>>
>>>>>> This might actually be the issue here, this was a test bench dataset
>>>>>> so the writer job created a single manifest for all the data in the dataset
>>>>>> which isn't really how we will do things in prod. I'l try and create the
>>>>>> metadata based on productions expected commit pattern.
>>>>>>
>>>>>>
>>>>>> Regarding Iceberg not truncating large bounded column values
>>>>>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
>>>>>> consider this with our dataset. The current evidence is leading towards the
>>>>>> number of columns and the sheer number of files that the manifest is
>>>>>> maintaining but this is a good thing to look into.
>>>>>>
>>>>>> Thanks again guys.
>>>>>>
>>>>>> -Gautam.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>>> I like Anton's idea to have an optional list of columns for which we
>>>>>>> keep stats. That would allow us to avoid storing stats for thousands of
>>>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>>>
>>>>>>> To clarify my comment on changing the storage: the idea is to use
>>>>>>> separate columns instead of a map and then use a columnar storage format so
>>>>>>> we can project those columns independently. Avro can't project columns
>>>>>>> independently. This wouldn't help on the write side and may just cause a
>>>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>>>
>>>>>>> Also, now that we have more details, I think there is a second
>>>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>>>> time.
>>>>>>>
>>>>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <
>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>
>>>>>>>> No, we haven’t experienced it yet. The manifest size is huge in
>>>>>>>> your case. To me, Ryan is correct: it might be either big lower/upper
>>>>>>>> bounds (then truncation will help) or a big number columns (then collecting
>>>>>>>> lower/upper bounds only for specific columns will help). I think both
>>>>>>>> optimizations are needed and will reduce the manifest size.
>>>>>>>>
>>>>>>>> Since you mentioned you have a lot of columns and we collect bounds
>>>>>>>> for nested struct fields, I am wondering if you could revert [1] locally
>>>>>>>> and compare the manifest size.
>>>>>>>>
>>>>>>>> [1] -
>>>>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>>>>>
>>>>>>>> On 19 Apr 2019, at 15:42, Gautam <ga...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Thanks for responding Anton! Do we think the delay is mainly due to
>>>>>>>> lower/upper bound filtering? have you faced this? I haven't exactly found
>>>>>>>> where the slowness is yet. It's generally due to the stats filtering but
>>>>>>>> what part of it is causing this much network traffic. There's
>>>>>>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>>>>>>> calls. My guess is the expression evaluation on each manifest entry is
>>>>>>>> what's doing it.
>>>>>>>>
>>>>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <
>>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>>
>>>>>>>>> I think we need to have a list of columns for which we want to
>>>>>>>>> collect stats and that should be configurable by the user. Maybe, this
>>>>>>>>> config should be applicable only to lower/upper bounds. As we now collect
>>>>>>>>> stats even for nested struct fields, this might generate a lot of data. In
>>>>>>>>> most cases, users cluster/sort their data by a subset of data columns to
>>>>>>>>> have fast queries with predicates on those columns. So, being able to
>>>>>>>>> configure columns for which to collect lower/upper bounds seems reasonable.
>>>>>>>>>
>>>>>>>>> On 19 Apr 2019, at 08:03, Gautam <ga...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> >  The length in bytes of the schema is 109M as compared to 687K
>>>>>>>>> of the non-stats dataset.
>>>>>>>>>
>>>>>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>>>>>
>>>>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <ga...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Correction, partition count = 4308.
>>>>>>>>>>
>>>>>>>>>> > Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>>>> format and is friendly with parallel compute frameworks like Spark.
>>>>>>>>>>
>>>>>>>>>> Here I am trying to say that we don't need to change the format
>>>>>>>>>> to columnar right? The current format is already friendly for
>>>>>>>>>> parallelization.
>>>>>>>>>>
>>>>>>>>>> thanks.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <ga...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are
>>>>>>>>>>> some details on the dataset with stats :
>>>>>>>>>>>
>>>>>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>>>>>  Spark Schema fields : 20
>>>>>>>>>>>  Snapshot Summary :{added-data-files=4308,
>>>>>>>>>>> added-records=11494037, changed-partition-count=4308,
>>>>>>>>>>> total-records=11494037, total-data-files=4308}
>>>>>>>>>>>  Manifest files :1
>>>>>>>>>>>  Manifest details:
>>>>>>>>>>>      => manifest file path:
>>>>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>>>>>      => manifest file length: 109,028,885
>>>>>>>>>>>      => existing files count: 0
>>>>>>>>>>>      => added files count: 4308
>>>>>>>>>>>      => deleted files count: 0
>>>>>>>>>>>      => partitions count: 4
>>>>>>>>>>>      => partition fields count: 4
>>>>>>>>>>>
>>>>>>>>>>> Re: Num data files. It has a single manifest keep track of 4308
>>>>>>>>>>> files. Total record count is 11.4 Million.
>>>>>>>>>>>
>>>>>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>>>>>> although it has only 20 top-level columns,  num leaf columns are in order
>>>>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and has
>>>>>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf
>>>>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for
>>>>>>>>>>> native, struct types (not yet for map KVs and arrays).  The length in bytes
>>>>>>>>>>> of the schema is 109M as compared to 687K of the non-stats dataset.
>>>>>>>>>>>
>>>>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for
>>>>>>>>>>> our datasets with much larger number of data files we want to leverage
>>>>>>>>>>> iceberg's ability to skip entire files based on these stats. This is one of
>>>>>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>>>>>
>>>>>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>>>>> format and is friendly with parallel compute frameworks like Spark. So
>>>>>>>>>>> would it make sense for instance to have add an option to have Spark job /
>>>>>>>>>>> Futures  handle split planning?   In a larger context, 109M is not that
>>>>>>>>>>> much metadata given that Iceberg is meant for datasets where the metadata
>>>>>>>>>>> itself is Bigdata scale.  I'm curious on how folks with larger sized
>>>>>>>>>>> metadata (in GB) are optimizing this today.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> -Gautam.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>>>>>> rblue@netflix.com.invalid> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for bringing this up! My initial theory is that this
>>>>>>>>>>>> table has a ton of stats data that you have to read. That could happen in a
>>>>>>>>>>>> couple of cases.
>>>>>>>>>>>>
>>>>>>>>>>>> First, you might have large values in some columns. Parquet
>>>>>>>>>>>> will suppress its stats if values are larger than 4k and those are what
>>>>>>>>>>>> Iceberg uses. But that could still cause you to store two 1k+ objects for
>>>>>>>>>>>> each large column (lower and upper bounds). With a lot of data files, that
>>>>>>>>>>>> could add up quickly. The solution here is to implement #113
>>>>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so
>>>>>>>>>>>> that we don't store the actual min and max for string or binary columns,
>>>>>>>>>>>> but instead a truncated value that is just above or just below.
>>>>>>>>>>>>
>>>>>>>>>>>> The second case is when you have a lot of columns. Each column
>>>>>>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily take 8k
>>>>>>>>>>>> per file. If this is the problem, then maybe we want to have a way to turn
>>>>>>>>>>>> off column stats. We could also think of ways to change the way stats are
>>>>>>>>>>>> stored in the manifest files, but that only helps if we move to a columnar
>>>>>>>>>>>> format to store manifests, so this is probably not a short-term fix.
>>>>>>>>>>>>
>>>>>>>>>>>> If you can share a bit more information about this table, we
>>>>>>>>>>>> can probably tell which one is the problem. I'm guessing it is the large
>>>>>>>>>>>> values problem.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <
>>>>>>>>>>>> gautamkowshik@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have been testing Iceberg reading with and without stats
>>>>>>>>>>>>> built into Iceberg dataset manifest and found that there's a huge jump in
>>>>>>>>>>>>> network traffic with the latter..
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> In my test I am comparing two Iceberg datasets, both written
>>>>>>>>>>>>> in Iceberg format. One with and the other without stats collected in
>>>>>>>>>>>>> Iceberg manifests. In particular the difference between the writers used
>>>>>>>>>>>>> for the two datasets is this PR:
>>>>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump from
>>>>>>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets.
>>>>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem (ADLS) when
>>>>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same Iceberg
>>>>>>>>>>>>> reader code to access both datasets.
>>>>>>>>>>>>>
>>>>>>>>>>>>> ```
>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>>>> reading from file
>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>>>>>>> (Ethernet)
>>>>>>>>>>>>>
>>>>>>>>>>>>> *8844*
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>>>> reading from file
>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap, link-type EN10MB (Ethernet)
>>>>>>>>>>>>>
>>>>>>>>>>>>> *269708*
>>>>>>>>>>>>>
>>>>>>>>>>>>> ```
>>>>>>>>>>>>>
>>>>>>>>>>>>> As a consequence of this the query response times get affected
>>>>>>>>>>>>> drastically (illustrated below). I must confess that I am on a slow
>>>>>>>>>>>>> internet connection via VPN connecting to the remote FS. But the dataset
>>>>>>>>>>>>> without stats took just 1m 49s while the dataset with stats took 26m 48s to
>>>>>>>>>>>>> read the same sized data. Most of that time in the latter dataset was spent
>>>>>>>>>>>>> split planning in Manifest reading and stats evaluation.
>>>>>>>>>>>>>
>>>>>>>>>>>>> ```
>>>>>>>>>>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues
>>>>>>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>>>>>  count(1)
>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>      3627
>>>>>>>>>>>>> (1 row)
>>>>>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>>>>>
>>>>>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where
>>>>>>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>>>>>  count(1)
>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>      3808
>>>>>>>>>>>>> (1 row)
>>>>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>>>>>
>>>>>>>>>>>>> ```
>>>>>>>>>>>>>
>>>>>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching
>>>>>>>>>>>>> or parallelism option here that can be leveraged.  Would appreciate some
>>>>>>>>>>>>> guidance. If there isn't a straightforward fix and others feel this is an
>>>>>>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>> Netflix
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Reading dataset with stats making lots of network traffic..

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
You also don't need to use year, month, and day. You can just use day.

The time-based partition functions all produce ordinals, not local values:
month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day and hour.
In fact, I should open a PR to throw an exception when there are duplicate
partition functions...

On Thu, May 2, 2019 at 1:52 PM Gautam <ga...@gmail.com> wrote:

> FYI .. The test Partition Spec is  :
> [
>   YEAR: identity(21)
>   MONTH: identity(22)
>   DAY: identity(23)
>   batchId: identity(24)
> ]
>
>
>
> On Thu, May 2, 2019 at 1:46 PM Gautam <ga...@gmail.com> wrote:
>
>> > Using those, you should be able to control parallelism. If you want to
>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
>> compact manifests.
>>
>> This is helpful. Thanks for the pointer on increasing parallelism. Will
>> try this out. So I understand the behaviour, if a different dataset has
>> >=5000  batches then the resultant # manifests would be (total_num_batches
>> % 5000 ) ?
>>
>> > What surprises me is that you’re not getting much benefit from
>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>> it.
>>
>> Pardon the verbose example but i think it'l help explain what i'm seeing
>> ..
>>
>> Regarding manifest filtering:  I tested if partition filters in sql query
>> actually reduce manifests being inspected. In my example, i have 16
>> manifests that point to 4000 batch partitions ( each file is restricted to
>> one partition as we'r using physical partitioning in the table ).  So when
>> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
>> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
>> 16) , right? But i see that all 16 are being inspected in
>> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
>> that should be giving me the manifests that match a partition. When I
>> inspect this  it says `matchingManifests = 16` ,  which is all the
>> manifests in the table.  This *could* be due to the fact that our batch
>> ids are random UUIDs so lower/upper bounds may not help coz there's no
>> inherent ordering amongst batches.
>> But then  i tried year = 2019 and month = 01 and day = 01 which also
>> scanned all manifests. Could this be due to the way Iceberg manifests are
>> re-grouped and merged? If so, shouldn't re-grouping honour partition
>> boundaries and optimize for it?
>>
>>
>> Cheers,
>> -Gautam.
>>
>> [1] -
>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>>
>>
>> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Good questions. Grouping manifests is configurable at the table level.
>>> There are 2 settings:
>>>
>>>    - commit.manifest.target-size-bytes defaults to 8MB, this is the
>>>    target size that Iceberg will compact to
>>>    - commit.manifest.min-count-to-merge defaults to 100, this is the
>>>    minimum number of files before a compaction is triggered
>>>
>>> Using those, you should be able to control parallelism. If you want to
>>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
>>> compact manifests.
>>>
>>> What surprises me is that you’re not getting much benefit from filtering
>>> out manifests that aren’t helpful. We see a lot of benefit from it. You
>>> might try sorting the data files by partition before adding them to the
>>> table. That will cluster data files in the same partition so you can read
>>> fewer manifests.
>>>
>>> On Thu, May 2, 2019 at 12:09 PM Gautam <ga...@gmail.com> wrote:
>>>
>>>> Hey Anton,
>>>>             Sorry bout the delay on this. Been caught up with some
>>>> other things. Thanks for raising issue#173 .
>>>>
>>>> So the root cause is indeed the density and size of the schema. While I
>>>> agree the option to configure stats for columns is good (although i'm not
>>>> fully convinced that this is purely due to lower/upper bounds). For
>>>> instance, maybe it's just taking a while to iterate over manifest rows and
>>>> deserialize the DataFile stats in each read?  The solution i'm using right
>>>> now is to parallelize the manifest reading in split planning. We
>>>> regenerated the Iceberg table with more manifests. Now the code enables the
>>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>>>> default, configurable using 'iceberg.worker.num-threads' ) to read
>>>> manifests.
>>>>
>>>> On that note, the ability to parallelize is limited to how many
>>>> manifests are in the table. So as a test, for a table with 4000 files we
>>>> created one manifest per file (think of one file as a single batch commit
>>>> in this case). So I was hoping to get a parallelism factor of 4000. But
>>>> Iceberg summarizes manifests into fewer manifests with each commit so we
>>>> instead ended up with 16 manifests. So now split planning is limited to
>>>> reading at most 16 units of parallelism. Is this grouping of manifests into
>>>> fewer configurable? if not should we allow making this configurable?
>>>>
>>>> Sorry if this is forking a different conversation. If so, I can start a
>>>> separate conversation thread on this.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <ao...@apple.com>
>>>> wrote:
>>>>
>>>>> Hey Gautam,
>>>>>
>>>>> Out of my curiosity, did you manage to confirm the root cause of the
>>>>> issue?
>>>>>
>>>>> P.S. I created [1] so that we can make collection of lower/upper
>>>>> bounds configurable.
>>>>>
>>>>> Thanks,
>>>>> Anton
>>>>>
>>>>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>>>>
>>>>> On 22 Apr 2019, at 09:15, Gautam <ga...@gmail.com> wrote:
>>>>>
>>>>> Thanks guys for the insights ..
>>>>>
>>>>> > I like Anton's idea to have an optional list of columns for which we
>>>>> keep stats. That would allow us to avoid storing stats for thousands of
>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>
>>>>> This makes sense to me. It adds a variable that data pipelines can
>>>>> tweak on to improve performance. I will add an issue on Github to add a
>>>>> stats config/flag. Although, having said that, I would try to optimize
>>>>> around this coz read patterns are hardly ever known a priori and adding a
>>>>> column to this list means having to re-write the entire data again. So i'l
>>>>> try the other suggestion which is parallelizing on multiple manifests.
>>>>>
>>>>> >  To clarify my comment on changing the storage: the idea is to use
>>>>> separate columns instead of a map and then use a columnar storage format so
>>>>> we can project those columns independently. Avro can't project columns
>>>>> independently. This wouldn't help on the write side and may just cause a
>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>
>>>>> Gotcha.
>>>>>
>>>>> > Also, now that we have more details, I think there is a second
>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>> split planning on manifests instead of splits of manifest files. This
>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>> time.
>>>>>
>>>>> This might actually be the issue here, this was a test bench dataset
>>>>> so the writer job created a single manifest for all the data in the dataset
>>>>> which isn't really how we will do things in prod. I'l try and create the
>>>>> metadata based on productions expected commit pattern.
>>>>>
>>>>>
>>>>> Regarding Iceberg not truncating large bounded column values
>>>>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
>>>>> consider this with our dataset. The current evidence is leading towards the
>>>>> number of columns and the sheer number of files that the manifest is
>>>>> maintaining but this is a good thing to look into.
>>>>>
>>>>> Thanks again guys.
>>>>>
>>>>> -Gautam.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> I like Anton's idea to have an optional list of columns for which we
>>>>>> keep stats. That would allow us to avoid storing stats for thousands of
>>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>>
>>>>>> To clarify my comment on changing the storage: the idea is to use
>>>>>> separate columns instead of a map and then use a columnar storage format so
>>>>>> we can project those columns independently. Avro can't project columns
>>>>>> independently. This wouldn't help on the write side and may just cause a
>>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>>
>>>>>> Also, now that we have more details, I think there is a second
>>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>>> time.
>>>>>>
>>>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <
>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>
>>>>>>> No, we haven’t experienced it yet. The manifest size is huge in your
>>>>>>> case. To me, Ryan is correct: it might be either big lower/upper bounds
>>>>>>> (then truncation will help) or a big number columns (then collecting
>>>>>>> lower/upper bounds only for specific columns will help). I think both
>>>>>>> optimizations are needed and will reduce the manifest size.
>>>>>>>
>>>>>>> Since you mentioned you have a lot of columns and we collect bounds
>>>>>>> for nested struct fields, I am wondering if you could revert [1] locally
>>>>>>> and compare the manifest size.
>>>>>>>
>>>>>>> [1] -
>>>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>>>>
>>>>>>> On 19 Apr 2019, at 15:42, Gautam <ga...@gmail.com> wrote:
>>>>>>>
>>>>>>> Thanks for responding Anton! Do we think the delay is mainly due to
>>>>>>> lower/upper bound filtering? have you faced this? I haven't exactly found
>>>>>>> where the slowness is yet. It's generally due to the stats filtering but
>>>>>>> what part of it is causing this much network traffic. There's
>>>>>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>>>>>> calls. My guess is the expression evaluation on each manifest entry is
>>>>>>> what's doing it.
>>>>>>>
>>>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <
>>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>>
>>>>>>>> I think we need to have a list of columns for which we want to
>>>>>>>> collect stats and that should be configurable by the user. Maybe, this
>>>>>>>> config should be applicable only to lower/upper bounds. As we now collect
>>>>>>>> stats even for nested struct fields, this might generate a lot of data. In
>>>>>>>> most cases, users cluster/sort their data by a subset of data columns to
>>>>>>>> have fast queries with predicates on those columns. So, being able to
>>>>>>>> configure columns for which to collect lower/upper bounds seems reasonable.
>>>>>>>>
>>>>>>>> On 19 Apr 2019, at 08:03, Gautam <ga...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> >  The length in bytes of the schema is 109M as compared to 687K of
>>>>>>>> the non-stats dataset.
>>>>>>>>
>>>>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>>>>
>>>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <ga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Correction, partition count = 4308.
>>>>>>>>>
>>>>>>>>> > Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>>> format and is friendly with parallel compute frameworks like Spark.
>>>>>>>>>
>>>>>>>>> Here I am trying to say that we don't need to change the format to
>>>>>>>>> columnar right? The current format is already friendly for parallelization.
>>>>>>>>>
>>>>>>>>> thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <ga...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are
>>>>>>>>>> some details on the dataset with stats :
>>>>>>>>>>
>>>>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>>>>  Spark Schema fields : 20
>>>>>>>>>>  Snapshot Summary :{added-data-files=4308,
>>>>>>>>>> added-records=11494037, changed-partition-count=4308,
>>>>>>>>>> total-records=11494037, total-data-files=4308}
>>>>>>>>>>  Manifest files :1
>>>>>>>>>>  Manifest details:
>>>>>>>>>>      => manifest file path:
>>>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>>>>      => manifest file length: 109,028,885
>>>>>>>>>>      => existing files count: 0
>>>>>>>>>>      => added files count: 4308
>>>>>>>>>>      => deleted files count: 0
>>>>>>>>>>      => partitions count: 4
>>>>>>>>>>      => partition fields count: 4
>>>>>>>>>>
>>>>>>>>>> Re: Num data files. It has a single manifest keep track of 4308
>>>>>>>>>> files. Total record count is 11.4 Million.
>>>>>>>>>>
>>>>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>>>>> although it has only 20 top-level columns,  num leaf columns are in order
>>>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and has
>>>>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf
>>>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for native,
>>>>>>>>>> struct types (not yet for map KVs and arrays).  The length in bytes of the
>>>>>>>>>> schema is 109M as compared to 687K of the non-stats dataset.
>>>>>>>>>>
>>>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for our
>>>>>>>>>> datasets with much larger number of data files we want to leverage
>>>>>>>>>> iceberg's ability to skip entire files based on these stats. This is one of
>>>>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>>>>
>>>>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>>>> format and is friendly with parallel compute frameworks like Spark. So
>>>>>>>>>> would it make sense for instance to have add an option to have Spark job /
>>>>>>>>>> Futures  handle split planning?   In a larger context, 109M is not that
>>>>>>>>>> much metadata given that Iceberg is meant for datasets where the metadata
>>>>>>>>>> itself is Bigdata scale.  I'm curious on how folks with larger sized
>>>>>>>>>> metadata (in GB) are optimizing this today.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> -Gautam.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>>>>> rblue@netflix.com.invalid> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for bringing this up! My initial theory is that this
>>>>>>>>>>> table has a ton of stats data that you have to read. That could happen in a
>>>>>>>>>>> couple of cases.
>>>>>>>>>>>
>>>>>>>>>>> First, you might have large values in some columns. Parquet will
>>>>>>>>>>> suppress its stats if values are larger than 4k and those are what Iceberg
>>>>>>>>>>> uses. But that could still cause you to store two 1k+ objects for each
>>>>>>>>>>> large column (lower and upper bounds). With a lot of data files, that could
>>>>>>>>>>> add up quickly. The solution here is to implement #113
>>>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so
>>>>>>>>>>> that we don't store the actual min and max for string or binary columns,
>>>>>>>>>>> but instead a truncated value that is just above or just below.
>>>>>>>>>>>
>>>>>>>>>>> The second case is when you have a lot of columns. Each column
>>>>>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily take 8k
>>>>>>>>>>> per file. If this is the problem, then maybe we want to have a way to turn
>>>>>>>>>>> off column stats. We could also think of ways to change the way stats are
>>>>>>>>>>> stored in the manifest files, but that only helps if we move to a columnar
>>>>>>>>>>> format to store manifests, so this is probably not a short-term fix.
>>>>>>>>>>>
>>>>>>>>>>> If you can share a bit more information about this table, we can
>>>>>>>>>>> probably tell which one is the problem. I'm guessing it is the large values
>>>>>>>>>>> problem.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>
>>>>>>>>>>>> I have been testing Iceberg reading with and without stats
>>>>>>>>>>>> built into Iceberg dataset manifest and found that there's a huge jump in
>>>>>>>>>>>> network traffic with the latter..
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> In my test I am comparing two Iceberg datasets, both written in
>>>>>>>>>>>> Iceberg format. One with and the other without stats collected in Iceberg
>>>>>>>>>>>> manifests. In particular the difference between the writers used for the
>>>>>>>>>>>> two datasets is this PR:
>>>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump from
>>>>>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets.
>>>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem (ADLS) when
>>>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same Iceberg
>>>>>>>>>>>> reader code to access both datasets.
>>>>>>>>>>>>
>>>>>>>>>>>> ```
>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>>> reading from file
>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>>>>>> (Ethernet)
>>>>>>>>>>>>
>>>>>>>>>>>> *8844*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap,
>>>>>>>>>>>> link-type EN10MB (Ethernet)
>>>>>>>>>>>>
>>>>>>>>>>>> *269708*
>>>>>>>>>>>>
>>>>>>>>>>>> ```
>>>>>>>>>>>>
>>>>>>>>>>>> As a consequence of this the query response times get affected
>>>>>>>>>>>> drastically (illustrated below). I must confess that I am on a slow
>>>>>>>>>>>> internet connection via VPN connecting to the remote FS. But the dataset
>>>>>>>>>>>> without stats took just 1m 49s while the dataset with stats took 26m 48s to
>>>>>>>>>>>> read the same sized data. Most of that time in the latter dataset was spent
>>>>>>>>>>>> split planning in Manifest reading and stats evaluation.
>>>>>>>>>>>>
>>>>>>>>>>>> ```
>>>>>>>>>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues
>>>>>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>>>>  count(1)
>>>>>>>>>>>> ----------
>>>>>>>>>>>>      3627
>>>>>>>>>>>> (1 row)
>>>>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>>>>
>>>>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where
>>>>>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>>>>  count(1)
>>>>>>>>>>>> ----------
>>>>>>>>>>>>      3808
>>>>>>>>>>>> (1 row)
>>>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>>>>
>>>>>>>>>>>> ```
>>>>>>>>>>>>
>>>>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching or
>>>>>>>>>>>> parallelism option here that can be leveraged.  Would appreciate some
>>>>>>>>>>>> guidance. If there isn't a straightforward fix and others feel this is an
>>>>>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Ryan Blue
>>>>>>>>>>> Software Engineer
>>>>>>>>>>> Netflix
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Reading dataset with stats making lots of network traffic..

Posted by Gautam <ga...@gmail.com>.
FYI .. The test Partition Spec is  :
[
  YEAR: identity(21)
  MONTH: identity(22)
  DAY: identity(23)
  batchId: identity(24)
]



On Thu, May 2, 2019 at 1:46 PM Gautam <ga...@gmail.com> wrote:

> > Using those, you should be able to control parallelism. If you want to
> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
> compact manifests.
>
> This is helpful. Thanks for the pointer on increasing parallelism. Will
> try this out. So I understand the behaviour, if a different dataset has
> >=5000  batches then the resultant # manifests would be (total_num_batches
> % 5000 ) ?
>
> > What surprises me is that you’re not getting much benefit from filtering
> out manifests that aren’t helpful. We see a lot of benefit from it.
>
> Pardon the verbose example but i think it'l help explain what i'm seeing
> ..
>
> Regarding manifest filtering:  I tested if partition filters in sql query
> actually reduce manifests being inspected. In my example, i have 16
> manifests that point to 4000 batch partitions ( each file is restricted to
> one partition as we'r using physical partitioning in the table ).  So when
> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
> 16) , right? But i see that all 16 are being inspected in
> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
> that should be giving me the manifests that match a partition. When I
> inspect this  it says `matchingManifests = 16` ,  which is all the
> manifests in the table.  This *could* be due to the fact that our batch
> ids are random UUIDs so lower/upper bounds may not help coz there's no
> inherent ordering amongst batches.
> But then  i tried year = 2019 and month = 01 and day = 01 which also
> scanned all manifests. Could this be due to the way Iceberg manifests are
> re-grouped and merged? If so, shouldn't re-grouping honour partition
> boundaries and optimize for it?
>
>
> Cheers,
> -Gautam.
>
> [1] -
> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>
>
> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> Good questions. Grouping manifests is configurable at the table level.
>> There are 2 settings:
>>
>>    - commit.manifest.target-size-bytes defaults to 8MB, this is the
>>    target size that Iceberg will compact to
>>    - commit.manifest.min-count-to-merge defaults to 100, this is the
>>    minimum number of files before a compaction is triggered
>>
>> Using those, you should be able to control parallelism. If you want to
>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
>> compact manifests.
>>
>> What surprises me is that you’re not getting much benefit from filtering
>> out manifests that aren’t helpful. We see a lot of benefit from it. You
>> might try sorting the data files by partition before adding them to the
>> table. That will cluster data files in the same partition so you can read
>> fewer manifests.
>>
>> On Thu, May 2, 2019 at 12:09 PM Gautam <ga...@gmail.com> wrote:
>>
>>> Hey Anton,
>>>             Sorry bout the delay on this. Been caught up with some other
>>> things. Thanks for raising issue#173 .
>>>
>>> So the root cause is indeed the density and size of the schema. While I
>>> agree the option to configure stats for columns is good (although i'm not
>>> fully convinced that this is purely due to lower/upper bounds). For
>>> instance, maybe it's just taking a while to iterate over manifest rows and
>>> deserialize the DataFile stats in each read?  The solution i'm using right
>>> now is to parallelize the manifest reading in split planning. We
>>> regenerated the Iceberg table with more manifests. Now the code enables the
>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>>> default, configurable using 'iceberg.worker.num-threads' ) to read
>>> manifests.
>>>
>>> On that note, the ability to parallelize is limited to how many
>>> manifests are in the table. So as a test, for a table with 4000 files we
>>> created one manifest per file (think of one file as a single batch commit
>>> in this case). So I was hoping to get a parallelism factor of 4000. But
>>> Iceberg summarizes manifests into fewer manifests with each commit so we
>>> instead ended up with 16 manifests. So now split planning is limited to
>>> reading at most 16 units of parallelism. Is this grouping of manifests into
>>> fewer configurable? if not should we allow making this configurable?
>>>
>>> Sorry if this is forking a different conversation. If so, I can start a
>>> separate conversation thread on this.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <ao...@apple.com>
>>> wrote:
>>>
>>>> Hey Gautam,
>>>>
>>>> Out of my curiosity, did you manage to confirm the root cause of the
>>>> issue?
>>>>
>>>> P.S. I created [1] so that we can make collection of lower/upper bounds
>>>> configurable.
>>>>
>>>> Thanks,
>>>> Anton
>>>>
>>>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>>>
>>>> On 22 Apr 2019, at 09:15, Gautam <ga...@gmail.com> wrote:
>>>>
>>>> Thanks guys for the insights ..
>>>>
>>>> > I like Anton's idea to have an optional list of columns for which we
>>>> keep stats. That would allow us to avoid storing stats for thousands of
>>>> columns that won't ever be used. Another option here is to add a flag to
>>>> keep stats only for top-level columns. That's much less configuration for
>>>> users and probably does the right thing in many cases. Simpler to use but
>>>> not as fast in all cases is sometimes a good compromise.
>>>>
>>>> This makes sense to me. It adds a variable that data pipelines can
>>>> tweak on to improve performance. I will add an issue on Github to add a
>>>> stats config/flag. Although, having said that, I would try to optimize
>>>> around this coz read patterns are hardly ever known a priori and adding a
>>>> column to this list means having to re-write the entire data again. So i'l
>>>> try the other suggestion which is parallelizing on multiple manifests.
>>>>
>>>> >  To clarify my comment on changing the storage: the idea is to use
>>>> separate columns instead of a map and then use a columnar storage format so
>>>> we can project those columns independently. Avro can't project columns
>>>> independently. This wouldn't help on the write side and may just cause a
>>>> lot of seeking on the read side that diminishes the benefits.
>>>>
>>>> Gotcha.
>>>>
>>>> > Also, now that we have more details, I think there is a second
>>>> problem. Because we expect several manifests in a table, we parallelize
>>>> split planning on manifests instead of splits of manifest files. This
>>>> planning operation is happening in a single thread instead of in parallel.
>>>> I think if you split the write across several manifests, you'd improve wall
>>>> time.
>>>>
>>>> This might actually be the issue here, this was a test bench dataset so
>>>> the writer job created a single manifest for all the data in the dataset
>>>> which isn't really how we will do things in prod. I'l try and create the
>>>> metadata based on productions expected commit pattern.
>>>>
>>>>
>>>> Regarding Iceberg not truncating large bounded column values
>>>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
>>>> consider this with our dataset. The current evidence is leading towards the
>>>> number of columns and the sheer number of files that the manifest is
>>>> maintaining but this is a good thing to look into.
>>>>
>>>> Thanks again guys.
>>>>
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>>> I like Anton's idea to have an optional list of columns for which we
>>>>> keep stats. That would allow us to avoid storing stats for thousands of
>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>
>>>>> To clarify my comment on changing the storage: the idea is to use
>>>>> separate columns instead of a map and then use a columnar storage format so
>>>>> we can project those columns independently. Avro can't project columns
>>>>> independently. This wouldn't help on the write side and may just cause a
>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>
>>>>> Also, now that we have more details, I think there is a second
>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>> split planning on manifests instead of splits of manifest files. This
>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>> I think if you split the write across several manifests, you'd improve wall
>>>>> time.
>>>>>
>>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <
>>>>> aokolnychyi@apple.com> wrote:
>>>>>
>>>>>> No, we haven’t experienced it yet. The manifest size is huge in your
>>>>>> case. To me, Ryan is correct: it might be either big lower/upper bounds
>>>>>> (then truncation will help) or a big number columns (then collecting
>>>>>> lower/upper bounds only for specific columns will help). I think both
>>>>>> optimizations are needed and will reduce the manifest size.
>>>>>>
>>>>>> Since you mentioned you have a lot of columns and we collect bounds
>>>>>> for nested struct fields, I am wondering if you could revert [1] locally
>>>>>> and compare the manifest size.
>>>>>>
>>>>>> [1] -
>>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>>>
>>>>>> On 19 Apr 2019, at 15:42, Gautam <ga...@gmail.com> wrote:
>>>>>>
>>>>>> Thanks for responding Anton! Do we think the delay is mainly due to
>>>>>> lower/upper bound filtering? have you faced this? I haven't exactly found
>>>>>> where the slowness is yet. It's generally due to the stats filtering but
>>>>>> what part of it is causing this much network traffic. There's
>>>>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>>>>> calls. My guess is the expression evaluation on each manifest entry is
>>>>>> what's doing it.
>>>>>>
>>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <
>>>>>> aokolnychyi@apple.com> wrote:
>>>>>>
>>>>>>> I think we need to have a list of columns for which we want to
>>>>>>> collect stats and that should be configurable by the user. Maybe, this
>>>>>>> config should be applicable only to lower/upper bounds. As we now collect
>>>>>>> stats even for nested struct fields, this might generate a lot of data. In
>>>>>>> most cases, users cluster/sort their data by a subset of data columns to
>>>>>>> have fast queries with predicates on those columns. So, being able to
>>>>>>> configure columns for which to collect lower/upper bounds seems reasonable.
>>>>>>>
>>>>>>> On 19 Apr 2019, at 08:03, Gautam <ga...@gmail.com> wrote:
>>>>>>>
>>>>>>> >  The length in bytes of the schema is 109M as compared to 687K of
>>>>>>> the non-stats dataset.
>>>>>>>
>>>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>>>
>>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Correction, partition count = 4308.
>>>>>>>>
>>>>>>>> > Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>> format and is friendly with parallel compute frameworks like Spark.
>>>>>>>>
>>>>>>>> Here I am trying to say that we don't need to change the format to
>>>>>>>> columnar right? The current format is already friendly for parallelization.
>>>>>>>>
>>>>>>>> thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <ga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are some
>>>>>>>>> details on the dataset with stats :
>>>>>>>>>
>>>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>>>  Spark Schema fields : 20
>>>>>>>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
>>>>>>>>> changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>>>>>>>>>  Manifest files :1
>>>>>>>>>  Manifest details:
>>>>>>>>>      => manifest file path:
>>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>>>      => manifest file length: 109,028,885
>>>>>>>>>      => existing files count: 0
>>>>>>>>>      => added files count: 4308
>>>>>>>>>      => deleted files count: 0
>>>>>>>>>      => partitions count: 4
>>>>>>>>>      => partition fields count: 4
>>>>>>>>>
>>>>>>>>> Re: Num data files. It has a single manifest keep track of 4308
>>>>>>>>> files. Total record count is 11.4 Million.
>>>>>>>>>
>>>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>>>> although it has only 20 top-level columns,  num leaf columns are in order
>>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and has
>>>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf
>>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for native,
>>>>>>>>> struct types (not yet for map KVs and arrays).  The length in bytes of the
>>>>>>>>> schema is 109M as compared to 687K of the non-stats dataset.
>>>>>>>>>
>>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for our
>>>>>>>>> datasets with much larger number of data files we want to leverage
>>>>>>>>> iceberg's ability to skip entire files based on these stats. This is one of
>>>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>>>
>>>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>>> format and is friendly with parallel compute frameworks like Spark. So
>>>>>>>>> would it make sense for instance to have add an option to have Spark job /
>>>>>>>>> Futures  handle split planning?   In a larger context, 109M is not that
>>>>>>>>> much metadata given that Iceberg is meant for datasets where the metadata
>>>>>>>>> itself is Bigdata scale.  I'm curious on how folks with larger sized
>>>>>>>>> metadata (in GB) are optimizing this today.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>>>> rblue@netflix.com.invalid> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for bringing this up! My initial theory is that this table
>>>>>>>>>> has a ton of stats data that you have to read. That could happen in a
>>>>>>>>>> couple of cases.
>>>>>>>>>>
>>>>>>>>>> First, you might have large values in some columns. Parquet will
>>>>>>>>>> suppress its stats if values are larger than 4k and those are what Iceberg
>>>>>>>>>> uses. But that could still cause you to store two 1k+ objects for each
>>>>>>>>>> large column (lower and upper bounds). With a lot of data files, that could
>>>>>>>>>> add up quickly. The solution here is to implement #113
>>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so that
>>>>>>>>>> we don't store the actual min and max for string or binary columns, but
>>>>>>>>>> instead a truncated value that is just above or just below.
>>>>>>>>>>
>>>>>>>>>> The second case is when you have a lot of columns. Each column
>>>>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily take 8k
>>>>>>>>>> per file. If this is the problem, then maybe we want to have a way to turn
>>>>>>>>>> off column stats. We could also think of ways to change the way stats are
>>>>>>>>>> stored in the manifest files, but that only helps if we move to a columnar
>>>>>>>>>> format to store manifests, so this is probably not a short-term fix.
>>>>>>>>>>
>>>>>>>>>> If you can share a bit more information about this table, we can
>>>>>>>>>> probably tell which one is the problem. I'm guessing it is the large values
>>>>>>>>>> problem.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <ga...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello folks,
>>>>>>>>>>>
>>>>>>>>>>> I have been testing Iceberg reading with and without stats built
>>>>>>>>>>> into Iceberg dataset manifest and found that there's a huge jump in network
>>>>>>>>>>> traffic with the latter..
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> In my test I am comparing two Iceberg datasets, both written in
>>>>>>>>>>> Iceberg format. One with and the other without stats collected in Iceberg
>>>>>>>>>>> manifests. In particular the difference between the writers used for the
>>>>>>>>>>> two datasets is this PR:
>>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump from
>>>>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets.
>>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem (ADLS) when
>>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same Iceberg
>>>>>>>>>>> reader code to access both datasets.
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>> reading from file
>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>>>>> (Ethernet)
>>>>>>>>>>>
>>>>>>>>>>> *8844*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap,
>>>>>>>>>>> link-type EN10MB (Ethernet)
>>>>>>>>>>>
>>>>>>>>>>> *269708*
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> As a consequence of this the query response times get affected
>>>>>>>>>>> drastically (illustrated below). I must confess that I am on a slow
>>>>>>>>>>> internet connection via VPN connecting to the remote FS. But the dataset
>>>>>>>>>>> without stats took just 1m 49s while the dataset with stats took 26m 48s to
>>>>>>>>>>> read the same sized data. Most of that time in the latter dataset was spent
>>>>>>>>>>> split planning in Manifest reading and stats evaluation.
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues
>>>>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>>>  count(1)
>>>>>>>>>>> ----------
>>>>>>>>>>>      3627
>>>>>>>>>>> (1 row)
>>>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>>>
>>>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where
>>>>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>>>  count(1)
>>>>>>>>>>> ----------
>>>>>>>>>>>      3808
>>>>>>>>>>> (1 row)
>>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching or
>>>>>>>>>>> parallelism option here that can be leveraged.  Would appreciate some
>>>>>>>>>>> guidance. If there isn't a straightforward fix and others feel this is an
>>>>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> -Gautam.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Ryan Blue
>>>>>>>>>> Software Engineer
>>>>>>>>>> Netflix
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Re: Reading dataset with stats making lots of network traffic..

Posted by Gautam <ga...@gmail.com>.
> Using those, you should be able to control parallelism. If you want to
test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
compact manifests.

This is helpful. Thanks for the pointer on increasing parallelism. Will try
this out. So I understand the behaviour, if a different dataset has >=5000
batches then the resultant # manifests would be (total_num_batches % 5000 )
?

> What surprises me is that you’re not getting much benefit from filtering
out manifests that aren’t helpful. We see a lot of benefit from it.

Pardon the verbose example but i think it'l help explain what i'm seeing ..

Regarding manifest filtering:  I tested if partition filters in sql query
actually reduce manifests being inspected. In my example, i have 16
manifests that point to 4000 batch partitions ( each file is restricted to
one partition as we'r using physical partitioning in the table ).  So when
querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
16) , right? But i see that all 16 are being inspected in
BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
that should be giving me the manifests that match a partition. When I
inspect this  it says `matchingManifests = 16` ,  which is all the
manifests in the table.  This *could* be due to the fact that our batch ids
are random UUIDs so lower/upper bounds may not help coz there's no inherent
ordering amongst batches.
But then  i tried year = 2019 and month = 01 and day = 01 which also
scanned all manifests. Could this be due to the way Iceberg manifests are
re-grouped and merged? If so, shouldn't re-grouping honour partition
boundaries and optimize for it?


Cheers,
-Gautam.

[1] -
https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173


On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote:

> Good questions. Grouping manifests is configurable at the table level.
> There are 2 settings:
>
>    - commit.manifest.target-size-bytes defaults to 8MB, this is the
>    target size that Iceberg will compact to
>    - commit.manifest.min-count-to-merge defaults to 100, this is the
>    minimum number of files before a compaction is triggered
>
> Using those, you should be able to control parallelism. If you want to
> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
> compact manifests.
>
> What surprises me is that you’re not getting much benefit from filtering
> out manifests that aren’t helpful. We see a lot of benefit from it. You
> might try sorting the data files by partition before adding them to the
> table. That will cluster data files in the same partition so you can read
> fewer manifests.
>
> On Thu, May 2, 2019 at 12:09 PM Gautam <ga...@gmail.com> wrote:
>
>> Hey Anton,
>>             Sorry bout the delay on this. Been caught up with some other
>> things. Thanks for raising issue#173 .
>>
>> So the root cause is indeed the density and size of the schema. While I
>> agree the option to configure stats for columns is good (although i'm not
>> fully convinced that this is purely due to lower/upper bounds). For
>> instance, maybe it's just taking a while to iterate over manifest rows and
>> deserialize the DataFile stats in each read?  The solution i'm using right
>> now is to parallelize the manifest reading in split planning. We
>> regenerated the Iceberg table with more manifests. Now the code enables the
>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>> default, configurable using 'iceberg.worker.num-threads' ) to read
>> manifests.
>>
>> On that note, the ability to parallelize is limited to how many manifests
>> are in the table. So as a test, for a table with 4000 files we created one
>> manifest per file (think of one file as a single batch commit in this
>> case). So I was hoping to get a parallelism factor of 4000. But Iceberg
>> summarizes manifests into fewer manifests with each commit so we instead
>> ended up with 16 manifests. So now split planning is limited to reading at
>> most 16 units of parallelism. Is this grouping of manifests into fewer
>> configurable? if not should we allow making this configurable?
>>
>> Sorry if this is forking a different conversation. If so, I can start a
>> separate conversation thread on this.
>>
>>
>>
>>
>>
>>
>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <ao...@apple.com>
>> wrote:
>>
>>> Hey Gautam,
>>>
>>> Out of my curiosity, did you manage to confirm the root cause of the
>>> issue?
>>>
>>> P.S. I created [1] so that we can make collection of lower/upper bounds
>>> configurable.
>>>
>>> Thanks,
>>> Anton
>>>
>>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>>
>>> On 22 Apr 2019, at 09:15, Gautam <ga...@gmail.com> wrote:
>>>
>>> Thanks guys for the insights ..
>>>
>>> > I like Anton's idea to have an optional list of columns for which we
>>> keep stats. That would allow us to avoid storing stats for thousands of
>>> columns that won't ever be used. Another option here is to add a flag to
>>> keep stats only for top-level columns. That's much less configuration for
>>> users and probably does the right thing in many cases. Simpler to use but
>>> not as fast in all cases is sometimes a good compromise.
>>>
>>> This makes sense to me. It adds a variable that data pipelines can tweak
>>> on to improve performance. I will add an issue on Github to add a stats
>>> config/flag. Although, having said that, I would try to optimize around
>>> this coz read patterns are hardly ever known a priori and adding a column
>>> to this list means having to re-write the entire data again. So i'l try the
>>> other suggestion which is parallelizing on multiple manifests.
>>>
>>> >  To clarify my comment on changing the storage: the idea is to use
>>> separate columns instead of a map and then use a columnar storage format so
>>> we can project those columns independently. Avro can't project columns
>>> independently. This wouldn't help on the write side and may just cause a
>>> lot of seeking on the read side that diminishes the benefits.
>>>
>>> Gotcha.
>>>
>>> > Also, now that we have more details, I think there is a second
>>> problem. Because we expect several manifests in a table, we parallelize
>>> split planning on manifests instead of splits of manifest files. This
>>> planning operation is happening in a single thread instead of in parallel.
>>> I think if you split the write across several manifests, you'd improve wall
>>> time.
>>>
>>> This might actually be the issue here, this was a test bench dataset so
>>> the writer job created a single manifest for all the data in the dataset
>>> which isn't really how we will do things in prod. I'l try and create the
>>> metadata based on productions expected commit pattern.
>>>
>>>
>>> Regarding Iceberg not truncating large bounded column values
>>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
>>> consider this with our dataset. The current evidence is leading towards the
>>> number of columns and the sheer number of files that the manifest is
>>> maintaining but this is a good thing to look into.
>>>
>>> Thanks again guys.
>>>
>>> -Gautam.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> I like Anton's idea to have an optional list of columns for which we
>>>> keep stats. That would allow us to avoid storing stats for thousands of
>>>> columns that won't ever be used. Another option here is to add a flag to
>>>> keep stats only for top-level columns. That's much less configuration for
>>>> users and probably does the right thing in many cases. Simpler to use but
>>>> not as fast in all cases is sometimes a good compromise.
>>>>
>>>> To clarify my comment on changing the storage: the idea is to use
>>>> separate columns instead of a map and then use a columnar storage format so
>>>> we can project those columns independently. Avro can't project columns
>>>> independently. This wouldn't help on the write side and may just cause a
>>>> lot of seeking on the read side that diminishes the benefits.
>>>>
>>>> Also, now that we have more details, I think there is a second problem.
>>>> Because we expect several manifests in a table, we parallelize split
>>>> planning on manifests instead of splits of manifest files. This planning
>>>> operation is happening in a single thread instead of in parallel. I think
>>>> if you split the write across several manifests, you'd improve wall time.
>>>>
>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <ao...@apple.com>
>>>> wrote:
>>>>
>>>>> No, we haven’t experienced it yet. The manifest size is huge in your
>>>>> case. To me, Ryan is correct: it might be either big lower/upper bounds
>>>>> (then truncation will help) or a big number columns (then collecting
>>>>> lower/upper bounds only for specific columns will help). I think both
>>>>> optimizations are needed and will reduce the manifest size.
>>>>>
>>>>> Since you mentioned you have a lot of columns and we collect bounds
>>>>> for nested struct fields, I am wondering if you could revert [1] locally
>>>>> and compare the manifest size.
>>>>>
>>>>> [1] -
>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>>
>>>>> On 19 Apr 2019, at 15:42, Gautam <ga...@gmail.com> wrote:
>>>>>
>>>>> Thanks for responding Anton! Do we think the delay is mainly due to
>>>>> lower/upper bound filtering? have you faced this? I haven't exactly found
>>>>> where the slowness is yet. It's generally due to the stats filtering but
>>>>> what part of it is causing this much network traffic. There's
>>>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>>>> calls. My guess is the expression evaluation on each manifest entry is
>>>>> what's doing it.
>>>>>
>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <
>>>>> aokolnychyi@apple.com> wrote:
>>>>>
>>>>>> I think we need to have a list of columns for which we want to
>>>>>> collect stats and that should be configurable by the user. Maybe, this
>>>>>> config should be applicable only to lower/upper bounds. As we now collect
>>>>>> stats even for nested struct fields, this might generate a lot of data. In
>>>>>> most cases, users cluster/sort their data by a subset of data columns to
>>>>>> have fast queries with predicates on those columns. So, being able to
>>>>>> configure columns for which to collect lower/upper bounds seems reasonable.
>>>>>>
>>>>>> On 19 Apr 2019, at 08:03, Gautam <ga...@gmail.com> wrote:
>>>>>>
>>>>>> >  The length in bytes of the schema is 109M as compared to 687K of
>>>>>> the non-stats dataset.
>>>>>>
>>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>>
>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Correction, partition count = 4308.
>>>>>>>
>>>>>>> > Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>> format and is friendly with parallel compute frameworks like Spark.
>>>>>>>
>>>>>>> Here I am trying to say that we don't need to change the format to
>>>>>>> columnar right? The current format is already friendly for parallelization.
>>>>>>>
>>>>>>> thanks.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are some
>>>>>>>> details on the dataset with stats :
>>>>>>>>
>>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>>  Spark Schema fields : 20
>>>>>>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
>>>>>>>> changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>>>>>>>>  Manifest files :1
>>>>>>>>  Manifest details:
>>>>>>>>      => manifest file path:
>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>>      => manifest file length: 109,028,885
>>>>>>>>      => existing files count: 0
>>>>>>>>      => added files count: 4308
>>>>>>>>      => deleted files count: 0
>>>>>>>>      => partitions count: 4
>>>>>>>>      => partition fields count: 4
>>>>>>>>
>>>>>>>> Re: Num data files. It has a single manifest keep track of 4308
>>>>>>>> files. Total record count is 11.4 Million.
>>>>>>>>
>>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>>> although it has only 20 top-level columns,  num leaf columns are in order
>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and has
>>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf
>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for native,
>>>>>>>> struct types (not yet for map KVs and arrays).  The length in bytes of the
>>>>>>>> schema is 109M as compared to 687K of the non-stats dataset.
>>>>>>>>
>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for our
>>>>>>>> datasets with much larger number of data files we want to leverage
>>>>>>>> iceberg's ability to skip entire files based on these stats. This is one of
>>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>>
>>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>> format and is friendly with parallel compute frameworks like Spark. So
>>>>>>>> would it make sense for instance to have add an option to have Spark job /
>>>>>>>> Futures  handle split planning?   In a larger context, 109M is not that
>>>>>>>> much metadata given that Iceberg is meant for datasets where the metadata
>>>>>>>> itself is Bigdata scale.  I'm curious on how folks with larger sized
>>>>>>>> metadata (in GB) are optimizing this today.
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> -Gautam.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>>> rblue@netflix.com.invalid> wrote:
>>>>>>>>
>>>>>>>>> Thanks for bringing this up! My initial theory is that this table
>>>>>>>>> has a ton of stats data that you have to read. That could happen in a
>>>>>>>>> couple of cases.
>>>>>>>>>
>>>>>>>>> First, you might have large values in some columns. Parquet will
>>>>>>>>> suppress its stats if values are larger than 4k and those are what Iceberg
>>>>>>>>> uses. But that could still cause you to store two 1k+ objects for each
>>>>>>>>> large column (lower and upper bounds). With a lot of data files, that could
>>>>>>>>> add up quickly. The solution here is to implement #113
>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so that
>>>>>>>>> we don't store the actual min and max for string or binary columns, but
>>>>>>>>> instead a truncated value that is just above or just below.
>>>>>>>>>
>>>>>>>>> The second case is when you have a lot of columns. Each column
>>>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily take 8k
>>>>>>>>> per file. If this is the problem, then maybe we want to have a way to turn
>>>>>>>>> off column stats. We could also think of ways to change the way stats are
>>>>>>>>> stored in the manifest files, but that only helps if we move to a columnar
>>>>>>>>> format to store manifests, so this is probably not a short-term fix.
>>>>>>>>>
>>>>>>>>> If you can share a bit more information about this table, we can
>>>>>>>>> probably tell which one is the problem. I'm guessing it is the large values
>>>>>>>>> problem.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <ga...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello folks,
>>>>>>>>>>
>>>>>>>>>> I have been testing Iceberg reading with and without stats built
>>>>>>>>>> into Iceberg dataset manifest and found that there's a huge jump in network
>>>>>>>>>> traffic with the latter..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> In my test I am comparing two Iceberg datasets, both written in
>>>>>>>>>> Iceberg format. One with and the other without stats collected in Iceberg
>>>>>>>>>> manifests. In particular the difference between the writers used for the
>>>>>>>>>> two datasets is this PR:
>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump from
>>>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets.
>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem (ADLS) when
>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same Iceberg
>>>>>>>>>> reader code to access both datasets.
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>> reading from file
>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>>>> (Ethernet)
>>>>>>>>>>
>>>>>>>>>> *8844*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap,
>>>>>>>>>> link-type EN10MB (Ethernet)
>>>>>>>>>>
>>>>>>>>>> *269708*
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> As a consequence of this the query response times get affected
>>>>>>>>>> drastically (illustrated below). I must confess that I am on a slow
>>>>>>>>>> internet connection via VPN connecting to the remote FS. But the dataset
>>>>>>>>>> without stats took just 1m 49s while the dataset with stats took 26m 48s to
>>>>>>>>>> read the same sized data. Most of that time in the latter dataset was spent
>>>>>>>>>> split planning in Manifest reading and stats evaluation.
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues
>>>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>>  count(1)
>>>>>>>>>> ----------
>>>>>>>>>>      3627
>>>>>>>>>> (1 row)
>>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>>
>>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where
>>>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>>  count(1)
>>>>>>>>>> ----------
>>>>>>>>>>      3808
>>>>>>>>>> (1 row)
>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching or
>>>>>>>>>> parallelism option here that can be leveraged.  Would appreciate some
>>>>>>>>>> guidance. If there isn't a straightforward fix and others feel this is an
>>>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> -Gautam.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Reading dataset with stats making lots of network traffic..

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Good questions. Grouping manifests is configurable at the table level.
There are 2 settings:

   - commit.manifest.target-size-bytes defaults to 8MB, this is the target
   size that Iceberg will compact to
   - commit.manifest.min-count-to-merge defaults to 100, this is the
   minimum number of files before a compaction is triggered

Using those, you should be able to control parallelism. If you want to test
with 4,000, then you can set the min count to 5,000 so Iceberg won’t
compact manifests.

What surprises me is that you’re not getting much benefit from filtering
out manifests that aren’t helpful. We see a lot of benefit from it. You
might try sorting the data files by partition before adding them to the
table. That will cluster data files in the same partition so you can read
fewer manifests.

On Thu, May 2, 2019 at 12:09 PM Gautam <ga...@gmail.com> wrote:

> Hey Anton,
>             Sorry bout the delay on this. Been caught up with some other
> things. Thanks for raising issue#173 .
>
> So the root cause is indeed the density and size of the schema. While I
> agree the option to configure stats for columns is good (although i'm not
> fully convinced that this is purely due to lower/upper bounds). For
> instance, maybe it's just taking a while to iterate over manifest rows and
> deserialize the DataFile stats in each read?  The solution i'm using right
> now is to parallelize the manifest reading in split planning. We
> regenerated the Iceberg table with more manifests. Now the code enables the
> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
> default, configurable using 'iceberg.worker.num-threads' ) to read
> manifests.
>
> On that note, the ability to parallelize is limited to how many manifests
> are in the table. So as a test, for a table with 4000 files we created one
> manifest per file (think of one file as a single batch commit in this
> case). So I was hoping to get a parallelism factor of 4000. But Iceberg
> summarizes manifests into fewer manifests with each commit so we instead
> ended up with 16 manifests. So now split planning is limited to reading at
> most 16 units of parallelism. Is this grouping of manifests into fewer
> configurable? if not should we allow making this configurable?
>
> Sorry if this is forking a different conversation. If so, I can start a
> separate conversation thread on this.
>
>
>
>
>
>
> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <ao...@apple.com>
> wrote:
>
>> Hey Gautam,
>>
>> Out of my curiosity, did you manage to confirm the root cause of the
>> issue?
>>
>> P.S. I created [1] so that we can make collection of lower/upper bounds
>> configurable.
>>
>> Thanks,
>> Anton
>>
>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>
>> On 22 Apr 2019, at 09:15, Gautam <ga...@gmail.com> wrote:
>>
>> Thanks guys for the insights ..
>>
>> > I like Anton's idea to have an optional list of columns for which we
>> keep stats. That would allow us to avoid storing stats for thousands of
>> columns that won't ever be used. Another option here is to add a flag to
>> keep stats only for top-level columns. That's much less configuration for
>> users and probably does the right thing in many cases. Simpler to use but
>> not as fast in all cases is sometimes a good compromise.
>>
>> This makes sense to me. It adds a variable that data pipelines can tweak
>> on to improve performance. I will add an issue on Github to add a stats
>> config/flag. Although, having said that, I would try to optimize around
>> this coz read patterns are hardly ever known a priori and adding a column
>> to this list means having to re-write the entire data again. So i'l try the
>> other suggestion which is parallelizing on multiple manifests.
>>
>> >  To clarify my comment on changing the storage: the idea is to use
>> separate columns instead of a map and then use a columnar storage format so
>> we can project those columns independently. Avro can't project columns
>> independently. This wouldn't help on the write side and may just cause a
>> lot of seeking on the read side that diminishes the benefits.
>>
>> Gotcha.
>>
>> > Also, now that we have more details, I think there is a second problem.
>> Because we expect several manifests in a table, we parallelize split
>> planning on manifests instead of splits of manifest files. This planning
>> operation is happening in a single thread instead of in parallel. I think
>> if you split the write across several manifests, you'd improve wall time.
>>
>> This might actually be the issue here, this was a test bench dataset so
>> the writer job created a single manifest for all the data in the dataset
>> which isn't really how we will do things in prod. I'l try and create the
>> metadata based on productions expected commit pattern.
>>
>>
>> Regarding Iceberg not truncating large bounded column values
>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
>> consider this with our dataset. The current evidence is leading towards the
>> number of columns and the sheer number of files that the manifest is
>> maintaining but this is a good thing to look into.
>>
>> Thanks again guys.
>>
>> -Gautam.
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> I like Anton's idea to have an optional list of columns for which we
>>> keep stats. That would allow us to avoid storing stats for thousands of
>>> columns that won't ever be used. Another option here is to add a flag to
>>> keep stats only for top-level columns. That's much less configuration for
>>> users and probably does the right thing in many cases. Simpler to use but
>>> not as fast in all cases is sometimes a good compromise.
>>>
>>> To clarify my comment on changing the storage: the idea is to use
>>> separate columns instead of a map and then use a columnar storage format so
>>> we can project those columns independently. Avro can't project columns
>>> independently. This wouldn't help on the write side and may just cause a
>>> lot of seeking on the read side that diminishes the benefits.
>>>
>>> Also, now that we have more details, I think there is a second problem.
>>> Because we expect several manifests in a table, we parallelize split
>>> planning on manifests instead of splits of manifest files. This planning
>>> operation is happening in a single thread instead of in parallel. I think
>>> if you split the write across several manifests, you'd improve wall time.
>>>
>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <ao...@apple.com>
>>> wrote:
>>>
>>>> No, we haven’t experienced it yet. The manifest size is huge in your
>>>> case. To me, Ryan is correct: it might be either big lower/upper bounds
>>>> (then truncation will help) or a big number columns (then collecting
>>>> lower/upper bounds only for specific columns will help). I think both
>>>> optimizations are needed and will reduce the manifest size.
>>>>
>>>> Since you mentioned you have a lot of columns and we collect bounds for
>>>> nested struct fields, I am wondering if you could revert [1] locally and
>>>> compare the manifest size.
>>>>
>>>> [1] -
>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>
>>>> On 19 Apr 2019, at 15:42, Gautam <ga...@gmail.com> wrote:
>>>>
>>>> Thanks for responding Anton! Do we think the delay is mainly due to
>>>> lower/upper bound filtering? have you faced this? I haven't exactly found
>>>> where the slowness is yet. It's generally due to the stats filtering but
>>>> what part of it is causing this much network traffic. There's
>>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>>> calls. My guess is the expression evaluation on each manifest entry is
>>>> what's doing it.
>>>>
>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <ao...@apple.com>
>>>> wrote:
>>>>
>>>>> I think we need to have a list of columns for which we want to collect
>>>>> stats and that should be configurable by the user. Maybe, this config
>>>>> should be applicable only to lower/upper bounds. As we now collect stats
>>>>> even for nested struct fields, this might generate a lot of data. In most
>>>>> cases, users cluster/sort their data by a subset of data columns to have
>>>>> fast queries with predicates on those columns. So, being able to configure
>>>>> columns for which to collect lower/upper bounds seems reasonable.
>>>>>
>>>>> On 19 Apr 2019, at 08:03, Gautam <ga...@gmail.com> wrote:
>>>>>
>>>>> >  The length in bytes of the schema is 109M as compared to 687K of
>>>>> the non-stats dataset.
>>>>>
>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>
>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Correction, partition count = 4308.
>>>>>>
>>>>>> > Re: Changing the way we keep stats. Avro is a block splittable
>>>>>> format and is friendly with parallel compute frameworks like Spark.
>>>>>>
>>>>>> Here I am trying to say that we don't need to change the format to
>>>>>> columnar right? The current format is already friendly for parallelization.
>>>>>>
>>>>>> thanks.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are some
>>>>>>> details on the dataset with stats :
>>>>>>>
>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>  Spark Schema fields : 20
>>>>>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
>>>>>>> changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>>>>>>>  Manifest files :1
>>>>>>>  Manifest details:
>>>>>>>      => manifest file path:
>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>      => manifest file length: 109,028,885
>>>>>>>      => existing files count: 0
>>>>>>>      => added files count: 4308
>>>>>>>      => deleted files count: 0
>>>>>>>      => partitions count: 4
>>>>>>>      => partition fields count: 4
>>>>>>>
>>>>>>> Re: Num data files. It has a single manifest keep track of 4308
>>>>>>> files. Total record count is 11.4 Million.
>>>>>>>
>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>> although it has only 20 top-level columns,  num leaf columns are in order
>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and has
>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf fields
>>>>>>> and additionally *lower-bounds, upper-bounds* for native, struct
>>>>>>> types (not yet for map KVs and arrays).  The length in bytes of the schema
>>>>>>> is 109M as compared to 687K of the non-stats dataset.
>>>>>>>
>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for our
>>>>>>> datasets with much larger number of data files we want to leverage
>>>>>>> iceberg's ability to skip entire files based on these stats. This is one of
>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>
>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>> format and is friendly with parallel compute frameworks like Spark. So
>>>>>>> would it make sense for instance to have add an option to have Spark job /
>>>>>>> Futures  handle split planning?   In a larger context, 109M is not that
>>>>>>> much metadata given that Iceberg is meant for datasets where the metadata
>>>>>>> itself is Bigdata scale.  I'm curious on how folks with larger sized
>>>>>>> metadata (in GB) are optimizing this today.
>>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>> -Gautam.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>> rblue@netflix.com.invalid> wrote:
>>>>>>>
>>>>>>>> Thanks for bringing this up! My initial theory is that this table
>>>>>>>> has a ton of stats data that you have to read. That could happen in a
>>>>>>>> couple of cases.
>>>>>>>>
>>>>>>>> First, you might have large values in some columns. Parquet will
>>>>>>>> suppress its stats if values are larger than 4k and those are what Iceberg
>>>>>>>> uses. But that could still cause you to store two 1k+ objects for each
>>>>>>>> large column (lower and upper bounds). With a lot of data files, that could
>>>>>>>> add up quickly. The solution here is to implement #113
>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so that
>>>>>>>> we don't store the actual min and max for string or binary columns, but
>>>>>>>> instead a truncated value that is just above or just below.
>>>>>>>>
>>>>>>>> The second case is when you have a lot of columns. Each column
>>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily take 8k
>>>>>>>> per file. If this is the problem, then maybe we want to have a way to turn
>>>>>>>> off column stats. We could also think of ways to change the way stats are
>>>>>>>> stored in the manifest files, but that only helps if we move to a columnar
>>>>>>>> format to store manifests, so this is probably not a short-term fix.
>>>>>>>>
>>>>>>>> If you can share a bit more information about this table, we can
>>>>>>>> probably tell which one is the problem. I'm guessing it is the large values
>>>>>>>> problem.
>>>>>>>>
>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <ga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello folks,
>>>>>>>>>
>>>>>>>>> I have been testing Iceberg reading with and without stats built
>>>>>>>>> into Iceberg dataset manifest and found that there's a huge jump in network
>>>>>>>>> traffic with the latter..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In my test I am comparing two Iceberg datasets, both written in
>>>>>>>>> Iceberg format. One with and the other without stats collected in Iceberg
>>>>>>>>> manifests. In particular the difference between the writers used for the
>>>>>>>>> two datasets is this PR:
>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump from
>>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets.
>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem (ADLS) when
>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same Iceberg
>>>>>>>>> reader code to access both datasets.
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>> reading from file
>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>>> (Ethernet)
>>>>>>>>>
>>>>>>>>> *8844*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap,
>>>>>>>>> link-type EN10MB (Ethernet)
>>>>>>>>>
>>>>>>>>> *269708*
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> As a consequence of this the query response times get affected
>>>>>>>>> drastically (illustrated below). I must confess that I am on a slow
>>>>>>>>> internet connection via VPN connecting to the remote FS. But the dataset
>>>>>>>>> without stats took just 1m 49s while the dataset with stats took 26m 48s to
>>>>>>>>> read the same sized data. Most of that time in the latter dataset was spent
>>>>>>>>> split planning in Manifest reading and stats evaluation.
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues
>>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>  count(1)
>>>>>>>>> ----------
>>>>>>>>>      3627
>>>>>>>>> (1 row)
>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>
>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where
>>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>  count(1)
>>>>>>>>> ----------
>>>>>>>>>      3808
>>>>>>>>> (1 row)
>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching or
>>>>>>>>> parallelism option here that can be leveraged.  Would appreciate some
>>>>>>>>> guidance. If there isn't a straightforward fix and others feel this is an
>>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Software Engineer
>>>>>>>> Netflix
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Reading dataset with stats making lots of network traffic..

Posted by Gautam <ga...@gmail.com>.
Hey Anton,
            Sorry bout the delay on this. Been caught up with some other
things. Thanks for raising issue#173 .

So the root cause is indeed the density and size of the schema. While I
agree the option to configure stats for columns is good (although i'm not
fully convinced that this is purely due to lower/upper bounds). For
instance, maybe it's just taking a while to iterate over manifest rows and
deserialize the DataFile stats in each read?  The solution i'm using right
now is to parallelize the manifest reading in split planning. We
regenerated the Iceberg table with more manifests. Now the code enables the
ParallelIterator which uses a worker pool of threads (1 thread per cpu by
default, configurable using 'iceberg.worker.num-threads' ) to read
manifests.

On that note, the ability to parallelize is limited to how many manifests
are in the table. So as a test, for a table with 4000 files we created one
manifest per file (think of one file as a single batch commit in this
case). So I was hoping to get a parallelism factor of 4000. But Iceberg
summarizes manifests into fewer manifests with each commit so we instead
ended up with 16 manifests. So now split planning is limited to reading at
most 16 units of parallelism. Is this grouping of manifests into fewer
configurable? if not should we allow making this configurable?

Sorry if this is forking a different conversation. If so, I can start a
separate conversation thread on this.






On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <ao...@apple.com>
wrote:

> Hey Gautam,
>
> Out of my curiosity, did you manage to confirm the root cause of the issue?
>
> P.S. I created [1] so that we can make collection of lower/upper bounds
> configurable.
>
> Thanks,
> Anton
>
> [1] - https://github.com/apache/incubator-iceberg/issues/173
>
> On 22 Apr 2019, at 09:15, Gautam <ga...@gmail.com> wrote:
>
> Thanks guys for the insights ..
>
> > I like Anton's idea to have an optional list of columns for which we
> keep stats. That would allow us to avoid storing stats for thousands of
> columns that won't ever be used. Another option here is to add a flag to
> keep stats only for top-level columns. That's much less configuration for
> users and probably does the right thing in many cases. Simpler to use but
> not as fast in all cases is sometimes a good compromise.
>
> This makes sense to me. It adds a variable that data pipelines can tweak
> on to improve performance. I will add an issue on Github to add a stats
> config/flag. Although, having said that, I would try to optimize around
> this coz read patterns are hardly ever known a priori and adding a column
> to this list means having to re-write the entire data again. So i'l try the
> other suggestion which is parallelizing on multiple manifests.
>
> >  To clarify my comment on changing the storage: the idea is to use
> separate columns instead of a map and then use a columnar storage format so
> we can project those columns independently. Avro can't project columns
> independently. This wouldn't help on the write side and may just cause a
> lot of seeking on the read side that diminishes the benefits.
>
> Gotcha.
>
> > Also, now that we have more details, I think there is a second problem.
> Because we expect several manifests in a table, we parallelize split
> planning on manifests instead of splits of manifest files. This planning
> operation is happening in a single thread instead of in parallel. I think
> if you split the write across several manifests, you'd improve wall time.
>
> This might actually be the issue here, this was a test bench dataset so
> the writer job created a single manifest for all the data in the dataset
> which isn't really how we will do things in prod. I'l try and create the
> metadata based on productions expected commit pattern.
>
>
> Regarding Iceberg not truncating large bounded column values
> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
> consider this with our dataset. The current evidence is leading towards the
> number of columns and the sheer number of files that the manifest is
> maintaining but this is a good thing to look into.
>
> Thanks again guys.
>
> -Gautam.
>
>
>
>
>
>
>
> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> wrote:
>
>> I like Anton's idea to have an optional list of columns for which we keep
>> stats. That would allow us to avoid storing stats for thousands of columns
>> that won't ever be used. Another option here is to add a flag to keep stats
>> only for top-level columns. That's much less configuration for users and
>> probably does the right thing in many cases. Simpler to use but not as fast
>> in all cases is sometimes a good compromise.
>>
>> To clarify my comment on changing the storage: the idea is to use
>> separate columns instead of a map and then use a columnar storage format so
>> we can project those columns independently. Avro can't project columns
>> independently. This wouldn't help on the write side and may just cause a
>> lot of seeking on the read side that diminishes the benefits.
>>
>> Also, now that we have more details, I think there is a second problem.
>> Because we expect several manifests in a table, we parallelize split
>> planning on manifests instead of splits of manifest files. This planning
>> operation is happening in a single thread instead of in parallel. I think
>> if you split the write across several manifests, you'd improve wall time.
>>
>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <ao...@apple.com>
>> wrote:
>>
>>> No, we haven’t experienced it yet. The manifest size is huge in your
>>> case. To me, Ryan is correct: it might be either big lower/upper bounds
>>> (then truncation will help) or a big number columns (then collecting
>>> lower/upper bounds only for specific columns will help). I think both
>>> optimizations are needed and will reduce the manifest size.
>>>
>>> Since you mentioned you have a lot of columns and we collect bounds for
>>> nested struct fields, I am wondering if you could revert [1] locally and
>>> compare the manifest size.
>>>
>>> [1] -
>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>
>>> On 19 Apr 2019, at 15:42, Gautam <ga...@gmail.com> wrote:
>>>
>>> Thanks for responding Anton! Do we think the delay is mainly due to
>>> lower/upper bound filtering? have you faced this? I haven't exactly found
>>> where the slowness is yet. It's generally due to the stats filtering but
>>> what part of it is causing this much network traffic. There's
>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>> calls. My guess is the expression evaluation on each manifest entry is
>>> what's doing it.
>>>
>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <ao...@apple.com>
>>> wrote:
>>>
>>>> I think we need to have a list of columns for which we want to collect
>>>> stats and that should be configurable by the user. Maybe, this config
>>>> should be applicable only to lower/upper bounds. As we now collect stats
>>>> even for nested struct fields, this might generate a lot of data. In most
>>>> cases, users cluster/sort their data by a subset of data columns to have
>>>> fast queries with predicates on those columns. So, being able to configure
>>>> columns for which to collect lower/upper bounds seems reasonable.
>>>>
>>>> On 19 Apr 2019, at 08:03, Gautam <ga...@gmail.com> wrote:
>>>>
>>>> >  The length in bytes of the schema is 109M as compared to 687K of the
>>>> non-stats dataset.
>>>>
>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>
>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <ga...@gmail.com>
>>>> wrote:
>>>>
>>>>> Correction, partition count = 4308.
>>>>>
>>>>> > Re: Changing the way we keep stats. Avro is a block splittable
>>>>> format and is friendly with parallel compute frameworks like Spark.
>>>>>
>>>>> Here I am trying to say that we don't need to change the format to
>>>>> columnar right? The current format is already friendly for parallelization.
>>>>>
>>>>> thanks.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ah, my bad. I missed adding in the schema details .. Here are some
>>>>>> details on the dataset with stats :
>>>>>>
>>>>>>  Iceberg Schema Columns : 20
>>>>>>  Spark Schema fields : 20
>>>>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
>>>>>> changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>>>>>>  Manifest files :1
>>>>>>  Manifest details:
>>>>>>      => manifest file path:
>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>      => manifest file length: 109,028,885
>>>>>>      => existing files count: 0
>>>>>>      => added files count: 4308
>>>>>>      => deleted files count: 0
>>>>>>      => partitions count: 4
>>>>>>      => partition fields count: 4
>>>>>>
>>>>>> Re: Num data files. It has a single manifest keep track of 4308
>>>>>> files. Total record count is 11.4 Million.
>>>>>>
>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>> although it has only 20 top-level columns,  num leaf columns are in order
>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and has
>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf fields
>>>>>> and additionally *lower-bounds, upper-bounds* for native, struct
>>>>>> types (not yet for map KVs and arrays).  The length in bytes of the schema
>>>>>> is 109M as compared to 687K of the non-stats dataset.
>>>>>>
>>>>>> Re: Turning off stats. I am looking to leverage stats coz for our
>>>>>> datasets with much larger number of data files we want to leverage
>>>>>> iceberg's ability to skip entire files based on these stats. This is one of
>>>>>> the big incentives for us to use Iceberg.
>>>>>>
>>>>>> Re: Changing the way we keep stats. Avro is a block splittable format
>>>>>> and is friendly with parallel compute frameworks like Spark. So would it
>>>>>> make sense for instance to have add an option to have Spark job / Futures
>>>>>> handle split planning?   In a larger context, 109M is not that much
>>>>>> metadata given that Iceberg is meant for datasets where the metadata itself
>>>>>> is Bigdata scale.  I'm curious on how folks with larger sized metadata (in
>>>>>> GB) are optimizing this today.
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> -Gautam.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <rb...@netflix.com.invalid>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for bringing this up! My initial theory is that this table
>>>>>>> has a ton of stats data that you have to read. That could happen in a
>>>>>>> couple of cases.
>>>>>>>
>>>>>>> First, you might have large values in some columns. Parquet will
>>>>>>> suppress its stats if values are larger than 4k and those are what Iceberg
>>>>>>> uses. But that could still cause you to store two 1k+ objects for each
>>>>>>> large column (lower and upper bounds). With a lot of data files, that could
>>>>>>> add up quickly. The solution here is to implement #113
>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so that we
>>>>>>> don't store the actual min and max for string or binary columns, but
>>>>>>> instead a truncated value that is just above or just below.
>>>>>>>
>>>>>>> The second case is when you have a lot of columns. Each column
>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily take 8k
>>>>>>> per file. If this is the problem, then maybe we want to have a way to turn
>>>>>>> off column stats. We could also think of ways to change the way stats are
>>>>>>> stored in the manifest files, but that only helps if we move to a columnar
>>>>>>> format to store manifests, so this is probably not a short-term fix.
>>>>>>>
>>>>>>> If you can share a bit more information about this table, we can
>>>>>>> probably tell which one is the problem. I'm guessing it is the large values
>>>>>>> problem.
>>>>>>>
>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello folks,
>>>>>>>>
>>>>>>>> I have been testing Iceberg reading with and without stats built
>>>>>>>> into Iceberg dataset manifest and found that there's a huge jump in network
>>>>>>>> traffic with the latter..
>>>>>>>>
>>>>>>>>
>>>>>>>> In my test I am comparing two Iceberg datasets, both written in
>>>>>>>> Iceberg format. One with and the other without stats collected in Iceberg
>>>>>>>> manifests. In particular the difference between the writers used for the
>>>>>>>> two datasets is this PR:
>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump from
>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both datasets.
>>>>>>>> There's a 30x jump in network traffic to the remote filesystem (ADLS) when
>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same Iceberg
>>>>>>>> reader code to access both datasets.
>>>>>>>>
>>>>>>>> ```
>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>> reading from file
>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>> (Ethernet)
>>>>>>>>
>>>>>>>> *8844*
>>>>>>>>
>>>>>>>>
>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap,
>>>>>>>> link-type EN10MB (Ethernet)
>>>>>>>>
>>>>>>>> *269708*
>>>>>>>>
>>>>>>>> ```
>>>>>>>>
>>>>>>>> As a consequence of this the query response times get affected
>>>>>>>> drastically (illustrated below). I must confess that I am on a slow
>>>>>>>> internet connection via VPN connecting to the remote FS. But the dataset
>>>>>>>> without stats took just 1m 49s while the dataset with stats took 26m 48s to
>>>>>>>> read the same sized data. Most of that time in the latter dataset was spent
>>>>>>>> split planning in Manifest reading and stats evaluation.
>>>>>>>>
>>>>>>>> ```
>>>>>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues
>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>  count(1)
>>>>>>>> ----------
>>>>>>>>      3627
>>>>>>>> (1 row)
>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>
>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where
>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>  count(1)
>>>>>>>> ----------
>>>>>>>>      3808
>>>>>>>> (1 row)
>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>
>>>>>>>> ```
>>>>>>>>
>>>>>>>> Has anyone faced this? I'm wondering if there's some caching or
>>>>>>>> parallelism option here that can be leveraged.  Would appreciate some
>>>>>>>> guidance. If there isn't a straightforward fix and others feel this is an
>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> -Gautam.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>