You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Edgar Rodriguez <ed...@airbnb.com.INVALID> on 2021/03/02 17:49:15 UTC

Hive query with join of Iceberg table and Hive table

Hi,

I'm trying to run a simple query in Hive 2.3.4 with a join of a Hive table
and an Iceberg table, each configured accordingly - Iceberg table has the
`storage_handler` defined and running with MR engine.

I'm using the `iceberg.mr.catalog.loader.class` class to load our internal
catalog. In the logs I can see Hive loading the Iceberg table, but then I
can see the Driver doing some traversal through the FS path under the table
location, getting statuses for all data within the directory - this is not
the behavior I see when querying an Iceberg table in Hive by itself, where
I can see the splits being computed correctly.
Due to this behavior, the query basically scans the full FS structure under
the path - which if large it looks like it's stuck, however I do see the
wire activity fetching the FS listings.

Question is, has anyone experienced this behavior on querying Hive tables
with joins on Iceberg tables? If so, what's the best way to approach this?

Best,
-- 
Edgar R

Re: Hive query with join of Iceberg table and Hive table

Posted by Edgar Rodriguez <ed...@airbnb.com.INVALID>.
Great! Thanks Peter for letting me know. I'll take a look.

Best,

On Fri, Mar 12, 2021 at 10:14 AM Peter Vary <pv...@cloudera.com.invalid>
wrote:

> Hi Edgar,
>
> You might want to take a look at this:
> https://github.com/apache/iceberg/pull/2329
>
> The PR aims to update Hive table statistics to the HiveCatalog when any
> change to the table is committed. This solves the issue with the upstream
> Hive code, and might solve the issue with other versions as well.
>
> Thanks,
> Peter
>
> On Mar 4, 2021, at 09:43, Vivekanand Vellanki <vi...@dremio.com> wrote:
>
> Our concern is not specific to Iceberg. I am concerned about the memory
> requirement in caching a large number of splits.
>
> With Iceberg, estimating row counts when the query has predicates requires
> scanning the manifest list and manifest files to identify all the data
> files; and compute the row count estimates. While it is reasonable to cache
> these splits to avoid reading the manifest files twice, this increases the
> memory requirement. Also, query engines might want to handle row count
> estimation and split generation phases - row count estimation is required
> for the cost based optimiser. Split generation can be done in parallel by
> reading manifest files in parallel.
>
> It would be good to decouple row count estimation from split generation.
>
> On Wed, Mar 3, 2021 at 11:24 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> I agree with the concern about caching splits, but doesn't the API cause
>> us to collect all of the splits into memory anyway? I thought there was no
>> way to return splits as an `Iterator` that lazily loads them. If that's the
>> case, then we primarily need to worry about cleanup and how long they are
>> kept around.
>>
>> I think it is also fairly reasonable to do the planning twice to avoid
>> the problem in Hive. Spark distributes the responsibility to each driver,
>> so jobs are separate and don't affect one another. If this is happening on
>> a shared Hive server endpoint then we probably have more of a concern about
>> memory consumption.
>>
>> Vivekanand, can you share more detail about how/where this is happening
>> in your case?
>>
>> On Wed, Mar 3, 2021 at 7:53 AM Edgar Rodriguez <
>> edgar.rodriguez@airbnb.com.invalid> wrote:
>>
>>> On Wed, Mar 3, 2021 at 1:48 AM Peter Vary <pv...@cloudera.com.invalid>
>>> wrote:
>>>
>>>> Quick question @Edgar: Am I right that the table is created by Spark? I
>>>> think if it is created from Hive and we inserted the data from Hive, then
>>>> we should have the basic stats already collected and we should not need the
>>>> estimation (we might still do it, but probably we should not)
>>>>
>>>
>>> Yes, Spark creates the table. We don't write Iceberg tables with Hive.
>>>
>>>
>>>>
>>>> Also we should check if Hive expects the full size of the table, or the
>>>> size of the table after filters. If Hive collects this data by file
>>>> scanning I would expect that it would be adequate to start with unfiltered
>>>> raw size.
>>>>
>>>
>>> In this case Hive is performing the FS scan to find the raw size of the
>>> location to query - in this case since the table is unpartitioned (ICEBERG
>>> type) the location to query is the full table since Hive is not aware of
>>> Iceberg metadata. However, if the estimator is used it passes a
>>> TableScanOperator, which I assume could be used to gather some specific
>>> stats if present in the operator.
>>>
>>>
>>>>
>>>> Thanks,
>>>> Peter
>>>>
>>>>
>>>> Vivekanand Vellanki <vi...@dremio.com> ezt írta (időpont: 2021. márc.
>>>> 3., Sze 5:15):
>>>>
>>>>> One of our concerns with caching the splits is the amount of memory
>>>>> required for this. If the filtering is not very selective and the table
>>>>> happens to be large, this increases the memory requirement to hold all the
>>>>> splits in memory.
>>>>>
>>>>
>>> I agree with this - caching the splits would be a concern with memory
>>> consumption; even now serializing/deserializing (probably another topic for
>>> discussion) splits in Hive for a query producing ~3.5K splits takes
>>> considerable time.
>>>
>>> Cheers,
>>> --
>>> Edgar R
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>

-- 
Edgar R

Re: Hive query with join of Iceberg table and Hive table

Posted by Peter Vary <pv...@cloudera.com.INVALID>.
Hi Edgar,

You might want to take a look at this:
https://github.com/apache/iceberg/pull/2329 <https://github.com/apache/iceberg/pull/2329>

The PR aims to update Hive table statistics to the HiveCatalog when any change to the table is committed. This solves the issue with the upstream Hive code, and might solve the issue with other versions as well.

Thanks,
Peter

> On Mar 4, 2021, at 09:43, Vivekanand Vellanki <vi...@dremio.com> wrote:
> 
> Our concern is not specific to Iceberg. I am concerned about the memory requirement in caching a large number of splits.
> 
> With Iceberg, estimating row counts when the query has predicates requires scanning the manifest list and manifest files to identify all the data files; and compute the row count estimates. While it is reasonable to cache these splits to avoid reading the manifest files twice, this increases the memory requirement. Also, query engines might want to handle row count estimation and split generation phases - row count estimation is required for the cost based optimiser. Split generation can be done in parallel by reading manifest files in parallel.
> 
> It would be good to decouple row count estimation from split generation.
> 
> On Wed, Mar 3, 2021 at 11:24 PM Ryan Blue <rb...@netflix.com.invalid> wrote:
> I agree with the concern about caching splits, but doesn't the API cause us to collect all of the splits into memory anyway? I thought there was no way to return splits as an `Iterator` that lazily loads them. If that's the case, then we primarily need to worry about cleanup and how long they are kept around.
> 
> I think it is also fairly reasonable to do the planning twice to avoid the problem in Hive. Spark distributes the responsibility to each driver, so jobs are separate and don't affect one another. If this is happening on a shared Hive server endpoint then we probably have more of a concern about memory consumption.
> 
> Vivekanand, can you share more detail about how/where this is happening in your case?
> 
> On Wed, Mar 3, 2021 at 7:53 AM Edgar Rodriguez <ed...@airbnb.com.invalid> wrote:
> On Wed, Mar 3, 2021 at 1:48 AM Peter Vary <pv...@cloudera.com.invalid> wrote:
> Quick question @Edgar: Am I right that the table is created by Spark? I think if it is created from Hive and we inserted the data from Hive, then we should have the basic stats already collected and we should not need the estimation (we might still do it, but probably we should not)
> 
> Yes, Spark creates the table. We don't write Iceberg tables with Hive.
>  
> 
> Also we should check if Hive expects the full size of the table, or the size of the table after filters. If Hive collects this data by file scanning I would expect that it would be adequate to start with unfiltered raw size.
> 
> In this case Hive is performing the FS scan to find the raw size of the location to query - in this case since the table is unpartitioned (ICEBERG type) the location to query is the full table since Hive is not aware of Iceberg metadata. However, if the estimator is used it passes a TableScanOperator, which I assume could be used to gather some specific stats if present in the operator.
>  
> 
> Thanks,
> Peter 
> 
> 
> Vivekanand Vellanki <vivek@dremio.com <ma...@dremio.com>> ezt írta (időpont: 2021. márc. 3., Sze 5:15):
> One of our concerns with caching the splits is the amount of memory required for this. If the filtering is not very selective and the table happens to be large, this increases the memory requirement to hold all the splits in memory.
> 
> I agree with this - caching the splits would be a concern with memory consumption; even now serializing/deserializing (probably another topic for discussion) splits in Hive for a query producing ~3.5K splits takes considerable time. 
> 
> Cheers,
> -- 
> Edgar R
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix


Re: Hive query with join of Iceberg table and Hive table

Posted by Vivekanand Vellanki <vi...@dremio.com>.
Our concern is not specific to Iceberg. I am concerned about the memory
requirement in caching a large number of splits.

With Iceberg, estimating row counts when the query has predicates requires
scanning the manifest list and manifest files to identify all the data
files; and compute the row count estimates. While it is reasonable to cache
these splits to avoid reading the manifest files twice, this increases the
memory requirement. Also, query engines might want to handle row count
estimation and split generation phases - row count estimation is required
for the cost based optimiser. Split generation can be done in parallel by
reading manifest files in parallel.

It would be good to decouple row count estimation from split generation.

On Wed, Mar 3, 2021 at 11:24 PM Ryan Blue <rb...@netflix.com.invalid> wrote:

> I agree with the concern about caching splits, but doesn't the API cause
> us to collect all of the splits into memory anyway? I thought there was no
> way to return splits as an `Iterator` that lazily loads them. If that's the
> case, then we primarily need to worry about cleanup and how long they are
> kept around.
>
> I think it is also fairly reasonable to do the planning twice to avoid the
> problem in Hive. Spark distributes the responsibility to each driver, so
> jobs are separate and don't affect one another. If this is happening on a
> shared Hive server endpoint then we probably have more of a concern about
> memory consumption.
>
> Vivekanand, can you share more detail about how/where this is happening in
> your case?
>
> On Wed, Mar 3, 2021 at 7:53 AM Edgar Rodriguez
> <ed...@airbnb.com.invalid> wrote:
>
>> On Wed, Mar 3, 2021 at 1:48 AM Peter Vary <pv...@cloudera.com.invalid>
>> wrote:
>>
>>> Quick question @Edgar: Am I right that the table is created by Spark? I
>>> think if it is created from Hive and we inserted the data from Hive, then
>>> we should have the basic stats already collected and we should not need the
>>> estimation (we might still do it, but probably we should not)
>>>
>>
>> Yes, Spark creates the table. We don't write Iceberg tables with Hive.
>>
>>
>>>
>>> Also we should check if Hive expects the full size of the table, or the
>>> size of the table after filters. If Hive collects this data by file
>>> scanning I would expect that it would be adequate to start with unfiltered
>>> raw size.
>>>
>>
>> In this case Hive is performing the FS scan to find the raw size of the
>> location to query - in this case since the table is unpartitioned (ICEBERG
>> type) the location to query is the full table since Hive is not aware of
>> Iceberg metadata. However, if the estimator is used it passes a
>> TableScanOperator, which I assume could be used to gather some specific
>> stats if present in the operator.
>>
>>
>>>
>>> Thanks,
>>> Peter
>>>
>>>
>>> Vivekanand Vellanki <vi...@dremio.com> ezt írta (időpont: 2021. márc.
>>> 3., Sze 5:15):
>>>
>>>> One of our concerns with caching the splits is the amount of memory
>>>> required for this. If the filtering is not very selective and the table
>>>> happens to be large, this increases the memory requirement to hold all the
>>>> splits in memory.
>>>>
>>>
>> I agree with this - caching the splits would be a concern with memory
>> consumption; even now serializing/deserializing (probably another topic for
>> discussion) splits in Hive for a query producing ~3.5K splits takes
>> considerable time.
>>
>> Cheers,
>> --
>> Edgar R
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Hive query with join of Iceberg table and Hive table

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
I agree with the concern about caching splits, but doesn't the API cause us
to collect all of the splits into memory anyway? I thought there was no way
to return splits as an `Iterator` that lazily loads them. If that's the
case, then we primarily need to worry about cleanup and how long they are
kept around.

I think it is also fairly reasonable to do the planning twice to avoid the
problem in Hive. Spark distributes the responsibility to each driver, so
jobs are separate and don't affect one another. If this is happening on a
shared Hive server endpoint then we probably have more of a concern about
memory consumption.

Vivekanand, can you share more detail about how/where this is happening in
your case?

On Wed, Mar 3, 2021 at 7:53 AM Edgar Rodriguez
<ed...@airbnb.com.invalid> wrote:

> On Wed, Mar 3, 2021 at 1:48 AM Peter Vary <pv...@cloudera.com.invalid>
> wrote:
>
>> Quick question @Edgar: Am I right that the table is created by Spark? I
>> think if it is created from Hive and we inserted the data from Hive, then
>> we should have the basic stats already collected and we should not need the
>> estimation (we might still do it, but probably we should not)
>>
>
> Yes, Spark creates the table. We don't write Iceberg tables with Hive.
>
>
>>
>> Also we should check if Hive expects the full size of the table, or the
>> size of the table after filters. If Hive collects this data by file
>> scanning I would expect that it would be adequate to start with unfiltered
>> raw size.
>>
>
> In this case Hive is performing the FS scan to find the raw size of the
> location to query - in this case since the table is unpartitioned (ICEBERG
> type) the location to query is the full table since Hive is not aware of
> Iceberg metadata. However, if the estimator is used it passes a
> TableScanOperator, which I assume could be used to gather some specific
> stats if present in the operator.
>
>
>>
>> Thanks,
>> Peter
>>
>>
>> Vivekanand Vellanki <vi...@dremio.com> ezt írta (időpont: 2021. márc.
>> 3., Sze 5:15):
>>
>>> One of our concerns with caching the splits is the amount of memory
>>> required for this. If the filtering is not very selective and the table
>>> happens to be large, this increases the memory requirement to hold all the
>>> splits in memory.
>>>
>>
> I agree with this - caching the splits would be a concern with memory
> consumption; even now serializing/deserializing (probably another topic for
> discussion) splits in Hive for a query producing ~3.5K splits takes
> considerable time.
>
> Cheers,
> --
> Edgar R
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Hive query with join of Iceberg table and Hive table

Posted by Edgar Rodriguez <ed...@airbnb.com.INVALID>.
On Wed, Mar 3, 2021 at 1:48 AM Peter Vary <pv...@cloudera.com.invalid>
wrote:

> Quick question @Edgar: Am I right that the table is created by Spark? I
> think if it is created from Hive and we inserted the data from Hive, then
> we should have the basic stats already collected and we should not need the
> estimation (we might still do it, but probably we should not)
>

Yes, Spark creates the table. We don't write Iceberg tables with Hive.


>
> Also we should check if Hive expects the full size of the table, or the
> size of the table after filters. If Hive collects this data by file
> scanning I would expect that it would be adequate to start with unfiltered
> raw size.
>

In this case Hive is performing the FS scan to find the raw size of the
location to query - in this case since the table is unpartitioned (ICEBERG
type) the location to query is the full table since Hive is not aware of
Iceberg metadata. However, if the estimator is used it passes a
TableScanOperator, which I assume could be used to gather some specific
stats if present in the operator.


>
> Thanks,
> Peter
>
>
> Vivekanand Vellanki <vi...@dremio.com> ezt írta (időpont: 2021. márc. 3.,
> Sze 5:15):
>
>> One of our concerns with caching the splits is the amount of memory
>> required for this. If the filtering is not very selective and the table
>> happens to be large, this increases the memory requirement to hold all the
>> splits in memory.
>>
>
I agree with this - caching the splits would be a concern with memory
consumption; even now serializing/deserializing (probably another topic for
discussion) splits in Hive for a query producing ~3.5K splits takes
considerable time.

Cheers,
-- 
Edgar R

Re: Hive query with join of Iceberg table and Hive table

Posted by Peter Vary <pv...@cloudera.com.INVALID>.
Quick question @Edgar: Am I right that the table is created by Spark? I
think if it is created from Hive and we inserted the data from Hive, then
we should have the basic stats already collected and we should not need the
estimation (we might still do it, but probably we should not)

Also we should check if Hive expects the full size of the table, or the
size of the table after filters. If Hive collects this data by file
scanning I would expect that it would be adequate to start with unfiltered
raw size.

Thanks,
Peter


Vivekanand Vellanki <vi...@dremio.com> ezt írta (időpont: 2021. márc. 3.,
Sze 5:15):

> One of our concerns with caching the splits is the amount of memory
> required for this. If the filtering is not very selective and the table
> happens to be large, this increases the memory requirement to hold all the
> splits in memory.
>
> On Wed, Mar 3, 2021 at 6:56 AM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Yes, it sounds like we do want to at least support fake stats for Hive.
>> It would be great to also base the stats on the actual table scan, since we
>> can get fairly accurate stats after filters have been pushed down. In
>> Spark, that's allowed us to convert more joins to broadcast joins that are
>> cheaper. The only concern is when and where the stats estimation is done
>> and whether we'd be able to cache the result of planning the scan so we
>> don't plan a scan for stats estimation and then do it a second time to
>> produce splits.
>>
>> On Tue, Mar 2, 2021 at 4:12 PM Edgar Rodriguez
>> <ed...@airbnb.com.invalid> wrote:
>>
>>> After a bit of further digging, I found that the issue is related to
>>> Hive trying to find the input size (the Iceberg table) for the join at
>>> query planning time. Since HiveIcebergStorageHandler does not implement
>>> InputEstimator
>>> <https://github.com/apache/hive/blob/branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java#L2195>,
>>> Hive tries to estimate the input size the same way as it would do for a
>>> native Hive table, by scanning the FS listing the paths recursively
>>> <https://github.com/apache/hadoop/blob/branch-2.8.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L1489>
>>> and adding the file lengths - in the case of Iceberg tables it would start
>>> scanning from the table location since it's EXTERNAL unpartitioned - as
>>> mentioned in the Hive Wiki
>>> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=82903061#ConfigurationProperties-hive.fetch.task.conversion.threshold>
>>> :
>>>
>>> If target table is native, input length is calculated by summation of
>>>> file lengths. If it's not native, the storage handler for the table can
>>>> optionally implement the org.apache.hadoop.hive.ql.metadata.InputEstimator
>>>> interface.
>>>
>>>
>>> After adding the interface to the storage_handler and providing an
>>> implementation returning an Estimation(-1, -1)
>>> <https://hive.apache.org/javadocs/r2.1.1/api/org/apache/hadoop/hive/ql/metadata/InputEstimator.Estimation.html#Estimation-int-long->
>>> the query works successfully in the expected amount of time - maybe a
>>> better implementation can be done with the actual extimation. I assume this
>>> is only an issue you hit when the underlying FS tree of the Iceberg table
>>> is large and traversing the FS takes a long time, otherwise most likely
>>> Hive would do the FS traversal and the query would make progress.
>>>
>>> Should we make this change in the HiveIcebergStorageHandler?
>>>
>>> Cheers,
>>>
>>> On Tue, Mar 2, 2021 at 1:11 PM Peter Vary <pv...@cloudera.com.invalid>
>>> wrote:
>>>
>>>> I have seen this kind of problem when the catalog was not configured
>>>> for the table/session and we ended up using the default catalog instead of
>>>> HiveCatalog
>>>>
>>>> On Mar 2, 2021, at 18:49, Edgar Rodriguez <
>>>> edgar.rodriguez@airbnb.com.INVALID> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm trying to run a simple query in Hive 2.3.4 with a join of a Hive
>>>> table and an Iceberg table, each configured accordingly - Iceberg table has
>>>> the `storage_handler` defined and running with MR engine.
>>>>
>>>> I'm using the `iceberg.mr.catalog.loader.class` class to load our
>>>> internal catalog. In the logs I can see Hive loading the Iceberg table, but
>>>> then I can see the Driver doing some traversal through the FS path under
>>>> the table location, getting statuses for all data within the directory -
>>>> this is not the behavior I see when querying an Iceberg table in Hive by
>>>> itself, where I can see the splits being computed correctly.
>>>> Due to this behavior, the query basically scans the full FS structure
>>>> under the path - which if large it looks like it's stuck, however I do see
>>>> the wire activity fetching the FS listings.
>>>>
>>>> Question is, has anyone experienced this behavior on querying Hive
>>>> tables with joins on Iceberg tables? If so, what's the best way to approach
>>>> this?
>>>>
>>>> Best,
>>>> --
>>>> Edgar R
>>>>
>>>>
>>>>
>>>
>>> --
>>> Edgar R
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Re: Hive query with join of Iceberg table and Hive table

Posted by Vivekanand Vellanki <vi...@dremio.com>.
One of our concerns with caching the splits is the amount of memory
required for this. If the filtering is not very selective and the table
happens to be large, this increases the memory requirement to hold all the
splits in memory.

On Wed, Mar 3, 2021 at 6:56 AM Ryan Blue <rb...@netflix.com.invalid> wrote:

> Yes, it sounds like we do want to at least support fake stats for Hive. It
> would be great to also base the stats on the actual table scan, since we
> can get fairly accurate stats after filters have been pushed down. In
> Spark, that's allowed us to convert more joins to broadcast joins that are
> cheaper. The only concern is when and where the stats estimation is done
> and whether we'd be able to cache the result of planning the scan so we
> don't plan a scan for stats estimation and then do it a second time to
> produce splits.
>
> On Tue, Mar 2, 2021 at 4:12 PM Edgar Rodriguez
> <ed...@airbnb.com.invalid> wrote:
>
>> After a bit of further digging, I found that the issue is related to Hive
>> trying to find the input size (the Iceberg table) for the join at query
>> planning time. Since HiveIcebergStorageHandler does not implement
>> InputEstimator
>> <https://github.com/apache/hive/blob/branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java#L2195>,
>> Hive tries to estimate the input size the same way as it would do for a
>> native Hive table, by scanning the FS listing the paths recursively
>> <https://github.com/apache/hadoop/blob/branch-2.8.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L1489>
>> and adding the file lengths - in the case of Iceberg tables it would start
>> scanning from the table location since it's EXTERNAL unpartitioned - as
>> mentioned in the Hive Wiki
>> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=82903061#ConfigurationProperties-hive.fetch.task.conversion.threshold>
>> :
>>
>> If target table is native, input length is calculated by summation of
>>> file lengths. If it's not native, the storage handler for the table can
>>> optionally implement the org.apache.hadoop.hive.ql.metadata.InputEstimator
>>> interface.
>>
>>
>> After adding the interface to the storage_handler and providing an
>> implementation returning an Estimation(-1, -1)
>> <https://hive.apache.org/javadocs/r2.1.1/api/org/apache/hadoop/hive/ql/metadata/InputEstimator.Estimation.html#Estimation-int-long->
>> the query works successfully in the expected amount of time - maybe a
>> better implementation can be done with the actual extimation. I assume this
>> is only an issue you hit when the underlying FS tree of the Iceberg table
>> is large and traversing the FS takes a long time, otherwise most likely
>> Hive would do the FS traversal and the query would make progress.
>>
>> Should we make this change in the HiveIcebergStorageHandler?
>>
>> Cheers,
>>
>> On Tue, Mar 2, 2021 at 1:11 PM Peter Vary <pv...@cloudera.com.invalid>
>> wrote:
>>
>>> I have seen this kind of problem when the catalog was not configured for
>>> the table/session and we ended up using the default catalog instead of
>>> HiveCatalog
>>>
>>> On Mar 2, 2021, at 18:49, Edgar Rodriguez <
>>> edgar.rodriguez@airbnb.com.INVALID> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to run a simple query in Hive 2.3.4 with a join of a Hive
>>> table and an Iceberg table, each configured accordingly - Iceberg table has
>>> the `storage_handler` defined and running with MR engine.
>>>
>>> I'm using the `iceberg.mr.catalog.loader.class` class to load our
>>> internal catalog. In the logs I can see Hive loading the Iceberg table, but
>>> then I can see the Driver doing some traversal through the FS path under
>>> the table location, getting statuses for all data within the directory -
>>> this is not the behavior I see when querying an Iceberg table in Hive by
>>> itself, where I can see the splits being computed correctly.
>>> Due to this behavior, the query basically scans the full FS structure
>>> under the path - which if large it looks like it's stuck, however I do see
>>> the wire activity fetching the FS listings.
>>>
>>> Question is, has anyone experienced this behavior on querying Hive
>>> tables with joins on Iceberg tables? If so, what's the best way to approach
>>> this?
>>>
>>> Best,
>>> --
>>> Edgar R
>>>
>>>
>>>
>>
>> --
>> Edgar R
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Hive query with join of Iceberg table and Hive table

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Yes, it sounds like we do want to at least support fake stats for Hive. It
would be great to also base the stats on the actual table scan, since we
can get fairly accurate stats after filters have been pushed down. In
Spark, that's allowed us to convert more joins to broadcast joins that are
cheaper. The only concern is when and where the stats estimation is done
and whether we'd be able to cache the result of planning the scan so we
don't plan a scan for stats estimation and then do it a second time to
produce splits.

On Tue, Mar 2, 2021 at 4:12 PM Edgar Rodriguez
<ed...@airbnb.com.invalid> wrote:

> After a bit of further digging, I found that the issue is related to Hive
> trying to find the input size (the Iceberg table) for the join at query
> planning time. Since HiveIcebergStorageHandler does not implement
> InputEstimator
> <https://github.com/apache/hive/blob/branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java#L2195>,
> Hive tries to estimate the input size the same way as it would do for a
> native Hive table, by scanning the FS listing the paths recursively
> <https://github.com/apache/hadoop/blob/branch-2.8.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L1489>
> and adding the file lengths - in the case of Iceberg tables it would start
> scanning from the table location since it's EXTERNAL unpartitioned - as
> mentioned in the Hive Wiki
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=82903061#ConfigurationProperties-hive.fetch.task.conversion.threshold>
> :
>
> If target table is native, input length is calculated by summation of file
>> lengths. If it's not native, the storage handler for the table can
>> optionally implement the org.apache.hadoop.hive.ql.metadata.InputEstimator
>> interface.
>
>
> After adding the interface to the storage_handler and providing an
> implementation returning an Estimation(-1, -1)
> <https://hive.apache.org/javadocs/r2.1.1/api/org/apache/hadoop/hive/ql/metadata/InputEstimator.Estimation.html#Estimation-int-long->
> the query works successfully in the expected amount of time - maybe a
> better implementation can be done with the actual extimation. I assume this
> is only an issue you hit when the underlying FS tree of the Iceberg table
> is large and traversing the FS takes a long time, otherwise most likely
> Hive would do the FS traversal and the query would make progress.
>
> Should we make this change in the HiveIcebergStorageHandler?
>
> Cheers,
>
> On Tue, Mar 2, 2021 at 1:11 PM Peter Vary <pv...@cloudera.com.invalid>
> wrote:
>
>> I have seen this kind of problem when the catalog was not configured for
>> the table/session and we ended up using the default catalog instead of
>> HiveCatalog
>>
>> On Mar 2, 2021, at 18:49, Edgar Rodriguez <
>> edgar.rodriguez@airbnb.com.INVALID> wrote:
>>
>> Hi,
>>
>> I'm trying to run a simple query in Hive 2.3.4 with a join of a Hive
>> table and an Iceberg table, each configured accordingly - Iceberg table has
>> the `storage_handler` defined and running with MR engine.
>>
>> I'm using the `iceberg.mr.catalog.loader.class` class to load our
>> internal catalog. In the logs I can see Hive loading the Iceberg table, but
>> then I can see the Driver doing some traversal through the FS path under
>> the table location, getting statuses for all data within the directory -
>> this is not the behavior I see when querying an Iceberg table in Hive by
>> itself, where I can see the splits being computed correctly.
>> Due to this behavior, the query basically scans the full FS structure
>> under the path - which if large it looks like it's stuck, however I do see
>> the wire activity fetching the FS listings.
>>
>> Question is, has anyone experienced this behavior on querying Hive tables
>> with joins on Iceberg tables? If so, what's the best way to approach this?
>>
>> Best,
>> --
>> Edgar R
>>
>>
>>
>
> --
> Edgar R
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Hive query with join of Iceberg table and Hive table

Posted by Edgar Rodriguez <ed...@airbnb.com.INVALID>.
After a bit of further digging, I found that the issue is related to Hive
trying to find the input size (the Iceberg table) for the join at query
planning time. Since HiveIcebergStorageHandler does not implement
InputEstimator
<https://github.com/apache/hive/blob/branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java#L2195>,
Hive tries to estimate the input size the same way as it would do for a
native Hive table, by scanning the FS listing the paths recursively
<https://github.com/apache/hadoop/blob/branch-2.8.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L1489>
and adding the file lengths - in the case of Iceberg tables it would start
scanning from the table location since it's EXTERNAL unpartitioned - as
mentioned in the Hive Wiki
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=82903061#ConfigurationProperties-hive.fetch.task.conversion.threshold>
:

If target table is native, input length is calculated by summation of file
> lengths. If it's not native, the storage handler for the table can
> optionally implement the org.apache.hadoop.hive.ql.metadata.InputEstimator
> interface.


After adding the interface to the storage_handler and providing an
implementation returning an Estimation(-1, -1)
<https://hive.apache.org/javadocs/r2.1.1/api/org/apache/hadoop/hive/ql/metadata/InputEstimator.Estimation.html#Estimation-int-long->
the query works successfully in the expected amount of time - maybe a
better implementation can be done with the actual extimation. I assume this
is only an issue you hit when the underlying FS tree of the Iceberg table
is large and traversing the FS takes a long time, otherwise most likely
Hive would do the FS traversal and the query would make progress.

Should we make this change in the HiveIcebergStorageHandler?

Cheers,

On Tue, Mar 2, 2021 at 1:11 PM Peter Vary <pv...@cloudera.com.invalid>
wrote:

> I have seen this kind of problem when the catalog was not configured for
> the table/session and we ended up using the default catalog instead of
> HiveCatalog
>
> On Mar 2, 2021, at 18:49, Edgar Rodriguez <
> edgar.rodriguez@airbnb.com.INVALID> wrote:
>
> Hi,
>
> I'm trying to run a simple query in Hive 2.3.4 with a join of a Hive table
> and an Iceberg table, each configured accordingly - Iceberg table has the
> `storage_handler` defined and running with MR engine.
>
> I'm using the `iceberg.mr.catalog.loader.class` class to load our internal
> catalog. In the logs I can see Hive loading the Iceberg table, but then I
> can see the Driver doing some traversal through the FS path under the table
> location, getting statuses for all data within the directory - this is not
> the behavior I see when querying an Iceberg table in Hive by itself, where
> I can see the splits being computed correctly.
> Due to this behavior, the query basically scans the full FS structure
> under the path - which if large it looks like it's stuck, however I do see
> the wire activity fetching the FS listings.
>
> Question is, has anyone experienced this behavior on querying Hive tables
> with joins on Iceberg tables? If so, what's the best way to approach this?
>
> Best,
> --
> Edgar R
>
>
>

-- 
Edgar R

Re: Hive query with join of Iceberg table and Hive table

Posted by Peter Vary <pv...@cloudera.com.INVALID>.
I have seen this kind of problem when the catalog was not configured for the table/session and we ended up using the default catalog instead of HiveCatalog

> On Mar 2, 2021, at 18:49, Edgar Rodriguez <ed...@airbnb.com.INVALID> wrote:
> 
> Hi,
> 
> I'm trying to run a simple query in Hive 2.3.4 with a join of a Hive table and an Iceberg table, each configured accordingly - Iceberg table has the `storage_handler` defined and running with MR engine.
> 
> I'm using the `iceberg.mr.catalog.loader.class` class to load our internal catalog. In the logs I can see Hive loading the Iceberg table, but then I can see the Driver doing some traversal through the FS path under the table location, getting statuses for all data within the directory - this is not the behavior I see when querying an Iceberg table in Hive by itself, where I can see the splits being computed correctly. 
> Due to this behavior, the query basically scans the full FS structure under the path - which if large it looks like it's stuck, however I do see the wire activity fetching the FS listings.
> 
> Question is, has anyone experienced this behavior on querying Hive tables with joins on Iceberg tables? If so, what's the best way to approach this?
> 
> Best,
> -- 
> Edgar R