You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Ross Guth <ro...@gmail.com> on 2016/06/24 22:06:45 UTC

Hash table in map join - Hive

1. Is there a way to check the size of the hash table created during map
side join in Hive/Tez?
2. Is the hash table (small table's), created for the entire table or only
for the selected and join key columns?
3. The hash table (created in map side join) spills to disk, if it does not
fit in memory Is there a parameter in hive/tez to specify the percentage of
the hash file which can spill?

Thanks!

Re: Hash table in map join - Hive

Posted by Lalitha MV <la...@gmail.com>.
Also, a couple of follow up questions:

1. The grace hash has to reload/ rebuild the hash table for a new split,
only if it has spilled the hash table because of lack of memory space
right?
    How does the regular hash join handle the case when the hash table
cannot fit into memory? Does it create a single file hash table or multiple
partitions like in the case of grace hash join?

2. I tried to find the hash table size being built from the logs in a
container(assuming that the entire hash table information should be there
in every container).
    When I tried a grep for "Map metrics", I got:  Map metrics: keys
allocated 2097152, keys assigned 934963, write conflict 3330682, write max
dist 22, read conflict 1859185, expanded 0 times in 0ms
    From this, how do I estimate the net size of the hash table?

Thanks in advance,
Lalitha

On Thu, Jun 30, 2016 at 5:57 PM, Lalitha MV <la...@gmail.com> wrote:

> Hi,
>
> I was following this thread. I tried adding the patch of the jira manually
> (https://issues.apache.org/jira/browse/TEZ-3287 ) [referenced in the
> above reply for auto reducer optimization in shuffle hash join case]. I
> added it to 0.8.3 while the patch was for the master.
> But, I got a comment from the author that, the patch wouldn't affect --
> hive.tez.auto.reducer.parallelism=true.
>  Am I missing something?
>
>
> Thanks,
> Lalitha
>
> On Thu, Jun 30, 2016 at 9:11 AM, Gopal Vijayaraghavan <go...@apache.org>
> wrote:
>
>>
>> > 1. In the query plan, it still says Map Join Operator (Would have
>> >expected it to be named as Reduce side operator).
>>
>> The "Map" in that case refers really to Map<K,V> rather the hadoop
>> version. An unambigous name is if it were called the HashJoinOperator.
>>
>> This is one of the optimizations of Tez right now that a map-join can be
>> inserted in any vertex, because "Map 1" is just really in the name (it is
>> a vertex).
>>
>> Also, even if the input format was Text/Sequencefile, the reduce
>> vectorization can vectorize the simple join cases because it is not tied
>> to the inputformat anymore.
>>
>> > 2. The edges in this query plans were named as custom_simple_edge: Is
>> >this the one pointing to the fact that sorting of mapper inputs are
>> >bypassed?
>>
>> Not directly related, but the custom edges do their own edgemanager - the
>> edgemanager that is there can possibly be replaced with a simple edge +
>> unsorted input-output pairs since tez-0.5.x.
>>
>> But the edge has an extension which can do some non-simple things too,
>> which is why Tez supports edge overrides like this.
>>
>>  <http://www.slideshare.net/Hadoop_Summit/w-235phall1pandey/13>
>>
>>
>> > 3. "hive.tez.auto.reducer.parallelism=true" is not taking effect for
>> >shuffle hash join.
>>
>> That issue was already reported by Twitter, the unsorted edges do not send
>> out the output size bitsets.
>>
>>  <https://issues.apache.org/jira/browse/TEZ-3287>
>>
>>
>> > 1. What does tez.auto.reducer.parallelism do -- Does it only reduce the
>> >number of reducers based on the actual size of mapper output, or does it
>> >do more.
>>
>> It does a bit more when PipelineSorter is enabled.
>>
>> The sorted edges actually partition-first and sort-then. So the sort-key
>> is actually (reducer-n, key) & the first few bytes of that information is
>> stored into metadata region of the sorter for better L1 cache hit-rate
>> when sorting.
>>
>> So the more reducers there are, the faster it sorts. However, it
>> compresses each reducer output independently, so slicing too thin produces
>> bad network overheads.
>>
>> Auto-reducer parallelism exists so that you don't need to tune each query
>> by hand to fit those trade-offs.
>>
>> > 2. I did not understand the intuition behind setting
>> >hive.mapjoin.hybridgrace.hashtable=false (as mentioned in your previous
>> >reply).
>>
>> Yes, it is the same impl from the wiki. But the grace hashjoin drops the
>> hashtable if it spills between executions of the same vertex.
>>
>> The regular hashJoin does not reload the hashtable when the split changes,
>> this means the grace hashjoin can take 4-5x more time than the optimized
>> one.
>>
>> The time it takes to load the hashtable goes up, while the lookups aren't
>> much different because the grace hash-join has a bloom filter on top of
>> it.
>>
>> If you have 35,000 splits and 800 containers, the hash-build times adds up
>> pretty quickly.
>>
>> > 3. In general, map join  in cluster mode, are these the actual steps
>> >followed in hive/tez:
>> > a. Hash table generation:  Partitioned hash tables of the small table is
>> >created across multiple containers. In each container, a part of the
>> >small table is dealt with. And in each container, the hash table is built
>>
>> No, broadcast tasks generate merely produces an unordered output - it is
>> not a hashtable.
>>
>> This is done in parallel as you describe across multiple containers & on
>> the cluster (tries for locality next to the small tables).
>>
>> > b. Broadcast of hash table: All the partitions of all the parts of mall
>> >table, including the ones spilled in the disk are serialized and sent to
>> >all the second map containers.
>>
>> The broadcast is done via shuffle, same as sorted data movement, but one
>> which reads the unordered streams and builds a hashtable inside every
>> JoinOperator.
>>
>> The hashtable is then put into a cache in the task which has scope of the
>> Vertex - if the same vertex re-runs on the same container, it will reload
>> from the cache instead of the shuffle stream.
>>
>> The grace hashtable throws away in-mem data when it reloads a spilled
>> fraction of the hashtable, so the moment it has spilled it is no longer
>> considered for reuse.
>>
>> > Where does the rebuilding of spilt hash table happen? Is it during
>> >second map phase where join is happening with bigger table?
>>
>> The split-hashtable looks exactly like the regular hashtable, but it has 3
>> return values for the data - Yes, No, Ask-Later.
>>
>> So other than the handling of the Ask-Later scenario, the split-hashtable
>> looks exactly like the full in-mem one.
>>
>> > c. Join operator:  The big table is scanned in each second mapper,
>> >against the entire hash table of small table, and result is got.
>>
>> Yes.
>>
>> Hadoop Summit slides from 2014, in the slides above are a little out of
>> date, but they cover some of the basics related to how this all fits
>> together.
>>
>> Cheers,
>> Gopal
>>
>>

Re: Hash table in map join - Hive

Posted by Gopal Vijayaraghavan <go...@apache.org>.
> When is OOM error actually thrown? With
>hive.mapjoin.hybridgrace.hashtable set to true, spilling should be
>possible, so OOM error should not come.
...
> Is it the case when the hash table of not even one of the 16 partitions
>fits in memory? 

It will OOM if any one of them overflows.

The grace hash-join works really well when the memory side is a primary
key and doesn't have these sort of skews.


The real problem with the shuffle hash-join is that it amplifies the
skews, since it is doing distributions on the hashcode of the join keys.

> But increasing the partitions to 100 also did not solve the problem
>(This is in the case of 3G container size and 5G small table size.
> I have given a high value for
>hive.auto.convert.join.noconditionaltask.size so that the broadcast hash
>join path is picked.

Unfortunately, it will only start spilling after it overflows
"hive.auto.convert.join.noconditionaltask.size" or 55% of the heap.

set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.1;


    // Get the total available memory from memory manager
    long totalMapJoinMemory = desc.getMemoryNeeded();
    LOG.info("Memory manager allocates " + totalMapJoinMemory + " bytes
for the loading hashtable.");
    if (totalMapJoinMemory <= 0) {
      totalMapJoinMemory = HiveConf.getLongVar(
        hconf, 
HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
    }

    long processMaxMemory =
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
    if (totalMapJoinMemory > processMaxMemory) {
      float hashtableMemoryUsage = HiveConf.getFloatVar(
          hconf, HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE);
      LOG.warn("totalMapJoinMemory value of " + totalMapJoinMemory +
          " is greater than the max memory size of " + processMaxMemory);
      // Don't want to attempt to grab more memory than we have available
.. percentage is a bit arbitrary
      totalMapJoinMemory = (long) (processMaxMemory *
hashtableMemoryUsage);
    }


If you can devise an example using TPC-DS schema on hive-testbench, I can
run it locally and tell you what's happening.


Cheers,
Gopal



Re: Hash table in map join - Hive

Posted by Lalitha MV <la...@gmail.com>.
Hi Gopal,

Thanks a lot for the above update.

I had only one question hanging:

When is OOM error actually thrown? With hive.mapjoin.hybridgrace.hashtable
set to true, spilling should be possible, so OOM error should not come.
Is it the case when the hash table of not even one of the 16 partitions
fits in memory? But increasing the partitions to 100 also did not solve the
problem (This is in the case of 3G container size and 5G small table size.
I have given a high value for hive.auto.convert.join.noconditionaltask.size
so that the broadcast hash join path is picked. I know this is not
advisable, but I am still trying to enforce. When I give 100 partitions,
the hash table of 500Mb only has to fit in at a time, but it fails with
memory error )

Thanks,
Lalitha

On Thu, Jul 14, 2016 at 11:16 PM, Gopal Vijayaraghavan <go...@apache.org>
wrote:

> Hi,
>
> I got a chance to re-run this query today and it does auto-reduce the new
> CUSTOM_EDGE join as well.
>
> However it does it too early, before it has got enough information about
> both sides of the join.
>
> TPC-DS Query79 at 1Tb scale generates a CUSTOM_EDGE between the ms alias
> and
> customer tables.
>
> They're 13.7 million & 12 million rows each - the impl is not causing any
> trouble right now because they're roughly equal.
>
> This might be a feature I might disable for now, since the
> ShuffleVertexManager
> has no idea which side is destined for the hashtable build side & might
> squeeze
> the auto-reducer tighter than it should causing a possible OOM.
>
> Plus, the size that the system gets is actually the shuffle size, so it is
> post-compression
> and varies with the data compressibility (for instance, clickstreams
> compress really well
> due to the large number of common strings).
>
> A pre-req for the last problem would be to go fix
> <https://issues.apache.org/jira/browse/TEZ-2962>,
> which tracks pre-compression sizes.
>
> I'll have to think a bit more about the other part of the problem.
>
> Cheers,
> Gopal
>
> On 7/6/16, 12:52 PM, "Gopal Vijayaraghavan" <gopal@hortonworks.com on
> behalf of gopalv@apache.org> wrote:
>
> >
> >> I tried running the shuffle hash join with auto reducer parallelism
> >>again. But, it didn't seem to take effect. With merge join and auto
> >>reduce parallelism on, number of
> >> reducers drops from 1009 to 337, but didn't see that change in case of
> >>shuffle hash join .Should I be doing something more ?
> >
> >In your runtime, can you cross-check the YARN logs for
> >
> >"Defer scheduling tasks"
> >
> >and
> >
> >"Reduce auto parallelism for vertex"
> >
> >
> >The kick-off point for both join algorithms are slightly different.
> >
> >The merge-join uses stats from both sides (since until all tasks on both
> >sides are complete, no join ops can be performed).
> >
> >While the dynamic hash-join uses stats from only the hashtable side to do
> >this, since it can start producing rows immediately after the hashtable
> >side is complete & at least 1 task from the big-table side has completed.
> >
> >There's a scenario where the small table is too small even when it is
> >entirely complete at which point the system just goes ahead with reduce
> >tasks without any reduction (which would be faster than waiting for the
> >big table to hit slow-start before doing that).
> >
> >Cheers,
> >Gopal
>
>

Re: Hash table in map join - Hive

Posted by Gopal Vijayaraghavan <go...@apache.org>.
Hi,

I got a chance to re-run this query today and it does auto-reduce the new
CUSTOM_EDGE join as well.

However it does it too early, before it has got enough information about
both sides of the join.

TPC-DS Query79 at 1Tb scale generates a CUSTOM_EDGE between the ms alias
and 
customer tables.

They're 13.7 million & 12 million rows each - the impl is not causing any
trouble right now because they're roughly equal.

This might be a feature I might disable for now, since the
ShuffleVertexManager
has no idea which side is destined for the hashtable build side & might
squeeze
the auto-reducer tighter than it should causing a possible OOM.

Plus, the size that the system gets is actually the shuffle size, so it is
post-compression
and varies with the data compressibility (for instance, clickstreams
compress really well
due to the large number of common strings).

A pre-req for the last problem would be to go fix
<https://issues.apache.org/jira/browse/TEZ-2962>,
which tracks pre-compression sizes.

I'll have to think a bit more about the other part of the problem.

Cheers,
Gopal

On 7/6/16, 12:52 PM, "Gopal Vijayaraghavan" <gopal@hortonworks.com on
behalf of gopalv@apache.org> wrote:

>
>> I tried running the shuffle hash join with auto reducer parallelism
>>again. But, it didn't seem to take effect. With merge join and auto
>>reduce parallelism on, number of
>> reducers drops from 1009 to 337, but didn't see that change in case of
>>shuffle hash join .Should I be doing something more ?
>
>In your runtime, can you cross-check the YARN logs for
>
>"Defer scheduling tasks"
>
>and 
>
>"Reduce auto parallelism for vertex"
>
>
>The kick-off point for both join algorithms are slightly different.
>
>The merge-join uses stats from both sides (since until all tasks on both
>sides are complete, no join ops can be performed).
>
>While the dynamic hash-join uses stats from only the hashtable side to do
>this, since it can start producing rows immediately after the hashtable
>side is complete & at least 1 task from the big-table side has completed.
>
>There's a scenario where the small table is too small even when it is
>entirely complete at which point the system just goes ahead with reduce
>tasks without any reduction (which would be faster than waiting for the
>big table to hit slow-start before doing that).
>
>Cheers,
>Gopal



Re: Hash table in map join - Hive

Posted by Gopal Vijayaraghavan <go...@apache.org>.
> I tried running the shuffle hash join with auto reducer parallelism
>again. But, it didn't seem to take effect. With merge join and auto
>reduce parallelism on, number of
> reducers drops from 1009 to 337, but didn't see that change in case of
>shuffle hash join .Should I be doing something more ?

In your runtime, can you cross-check the YARN logs for

"Defer scheduling tasks"

and 

"Reduce auto parallelism for vertex"


The kick-off point for both join algorithms are slightly different.

The merge-join uses stats from both sides (since until all tasks on both
sides are complete, no join ops can be performed).

While the dynamic hash-join uses stats from only the hashtable side to do
this, since it can start producing rows immediately after the hashtable
side is complete & at least 1 task from the big-table side has completed.

There's a scenario where the small table is too small even when it is
entirely complete at which point the system just goes ahead with reduce
tasks without any reduction (which would be faster than waiting for the
big table to hit slow-start before doing that).

Cheers,
Gopal



Re: Hash table in map join - Hive

Posted by Lalitha MV <la...@gmail.com>.
Hi Gopal,

Since this jira is resolved, I cloned the master branch, compiled and used
the binaries (0.9 snapshot version of tez).

I tried running the shuffle hash join with auto reducer parallelism again.
But, it didn't seem to take effect. With merge join and auto reduce
parallelism on, number of reducers drops from 1009 to 337, but didn't see
that change in case of shuffle hash join .Should I be doing something more ?

Thanks,
Lalitha
On 30-Jun-2016 7:04 pm, "Gopal Vijayaraghavan" <go...@apache.org> wrote:

>
>
> > But, I got a comment from the author that, the patch wouldn't affect --
> >hive.tez.auto.reducer.parallelism=true.
> > Am I missing something?
>
>
> No, I've linked to the wrong JIRA :(
>
>  <https://issues.apache.org/jira/browse/TEZ-3206>
>
> Cheers,
> Gopal
>
>
>
>

Re: Hash table in map join - Hive

Posted by Gopal Vijayaraghavan <go...@apache.org>.

> But, I got a comment from the author that, the patch wouldn't affect --
>hive.tez.auto.reducer.parallelism=true.
> Am I missing something?


No, I've linked to the wrong JIRA :(

 <https://issues.apache.org/jira/browse/TEZ-3206>

Cheers,
Gopal




Re: Hash table in map join - Hive

Posted by Lalitha MV <la...@gmail.com>.
Hi,

I was following this thread. I tried adding the patch of the jira manually (
https://issues.apache.org/jira/browse/TEZ-3287 ) [referenced in the above
reply for auto reducer optimization in shuffle hash join case]. I added it
to 0.8.3 while the patch was for the master.
But, I got a comment from the author that, the patch wouldn't affect --
hive.tez.auto.reducer.parallelism=true.
 Am I missing something?


Thanks,
Lalitha

On Thu, Jun 30, 2016 at 9:11 AM, Gopal Vijayaraghavan <go...@apache.org>
wrote:

>
> > 1. In the query plan, it still says Map Join Operator (Would have
> >expected it to be named as Reduce side operator).
>
> The "Map" in that case refers really to Map<K,V> rather the hadoop
> version. An unambigous name is if it were called the HashJoinOperator.
>
> This is one of the optimizations of Tez right now that a map-join can be
> inserted in any vertex, because "Map 1" is just really in the name (it is
> a vertex).
>
> Also, even if the input format was Text/Sequencefile, the reduce
> vectorization can vectorize the simple join cases because it is not tied
> to the inputformat anymore.
>
> > 2. The edges in this query plans were named as custom_simple_edge: Is
> >this the one pointing to the fact that sorting of mapper inputs are
> >bypassed?
>
> Not directly related, but the custom edges do their own edgemanager - the
> edgemanager that is there can possibly be replaced with a simple edge +
> unsorted input-output pairs since tez-0.5.x.
>
> But the edge has an extension which can do some non-simple things too,
> which is why Tez supports edge overrides like this.
>
>  <http://www.slideshare.net/Hadoop_Summit/w-235phall1pandey/13>
>
>
> > 3. "hive.tez.auto.reducer.parallelism=true" is not taking effect for
> >shuffle hash join.
>
> That issue was already reported by Twitter, the unsorted edges do not send
> out the output size bitsets.
>
>  <https://issues.apache.org/jira/browse/TEZ-3287>
>
>
> > 1. What does tez.auto.reducer.parallelism do -- Does it only reduce the
> >number of reducers based on the actual size of mapper output, or does it
> >do more.
>
> It does a bit more when PipelineSorter is enabled.
>
> The sorted edges actually partition-first and sort-then. So the sort-key
> is actually (reducer-n, key) & the first few bytes of that information is
> stored into metadata region of the sorter for better L1 cache hit-rate
> when sorting.
>
> So the more reducers there are, the faster it sorts. However, it
> compresses each reducer output independently, so slicing too thin produces
> bad network overheads.
>
> Auto-reducer parallelism exists so that you don't need to tune each query
> by hand to fit those trade-offs.
>
> > 2. I did not understand the intuition behind setting
> >hive.mapjoin.hybridgrace.hashtable=false (as mentioned in your previous
> >reply).
>
> Yes, it is the same impl from the wiki. But the grace hashjoin drops the
> hashtable if it spills between executions of the same vertex.
>
> The regular hashJoin does not reload the hashtable when the split changes,
> this means the grace hashjoin can take 4-5x more time than the optimized
> one.
>
> The time it takes to load the hashtable goes up, while the lookups aren't
> much different because the grace hash-join has a bloom filter on top of it.
>
> If you have 35,000 splits and 800 containers, the hash-build times adds up
> pretty quickly.
>
> > 3. In general, map join  in cluster mode, are these the actual steps
> >followed in hive/tez:
> > a. Hash table generation:  Partitioned hash tables of the small table is
> >created across multiple containers. In each container, a part of the
> >small table is dealt with. And in each container, the hash table is built
>
> No, broadcast tasks generate merely produces an unordered output - it is
> not a hashtable.
>
> This is done in parallel as you describe across multiple containers & on
> the cluster (tries for locality next to the small tables).
>
> > b. Broadcast of hash table: All the partitions of all the parts of mall
> >table, including the ones spilled in the disk are serialized and sent to
> >all the second map containers.
>
> The broadcast is done via shuffle, same as sorted data movement, but one
> which reads the unordered streams and builds a hashtable inside every
> JoinOperator.
>
> The hashtable is then put into a cache in the task which has scope of the
> Vertex - if the same vertex re-runs on the same container, it will reload
> from the cache instead of the shuffle stream.
>
> The grace hashtable throws away in-mem data when it reloads a spilled
> fraction of the hashtable, so the moment it has spilled it is no longer
> considered for reuse.
>
> > Where does the rebuilding of spilt hash table happen? Is it during
> >second map phase where join is happening with bigger table?
>
> The split-hashtable looks exactly like the regular hashtable, but it has 3
> return values for the data - Yes, No, Ask-Later.
>
> So other than the handling of the Ask-Later scenario, the split-hashtable
> looks exactly like the full in-mem one.
>
> > c. Join operator:  The big table is scanned in each second mapper,
> >against the entire hash table of small table, and result is got.
>
> Yes.
>
> Hadoop Summit slides from 2014, in the slides above are a little out of
> date, but they cover some of the basics related to how this all fits
> together.
>
> Cheers,
> Gopal
>
>
>

Re: Hash table in map join - Hive

Posted by Gopal Vijayaraghavan <go...@apache.org>.
> 1. In the query plan, it still says Map Join Operator (Would have
>expected it to be named as Reduce side operator).

The "Map" in that case refers really to Map<K,V> rather the hadoop
version. An unambigous name is if it were called the HashJoinOperator.

This is one of the optimizations of Tez right now that a map-join can be
inserted in any vertex, because "Map 1" is just really in the name (it is
a vertex).

Also, even if the input format was Text/Sequencefile, the reduce
vectorization can vectorize the simple join cases because it is not tied
to the inputformat anymore.

> 2. The edges in this query plans were named as custom_simple_edge: Is
>this the one pointing to the fact that sorting of mapper inputs are
>bypassed? 

Not directly related, but the custom edges do their own edgemanager - the
edgemanager that is there can possibly be replaced with a simple edge +
unsorted input-output pairs since tez-0.5.x.

But the edge has an extension which can do some non-simple things too,
which is why Tez supports edge overrides like this.

 <http://www.slideshare.net/Hadoop_Summit/w-235phall1pandey/13>


> 3. "hive.tez.auto.reducer.parallelism=true" is not taking effect for
>shuffle hash join.

That issue was already reported by Twitter, the unsorted edges do not send
out the output size bitsets.

 <https://issues.apache.org/jira/browse/TEZ-3287>


> 1. What does tez.auto.reducer.parallelism do -- Does it only reduce the
>number of reducers based on the actual size of mapper output, or does it
>do more.

It does a bit more when PipelineSorter is enabled.

The sorted edges actually partition-first and sort-then. So the sort-key
is actually (reducer-n, key) & the first few bytes of that information is
stored into metadata region of the sorter for better L1 cache hit-rate
when sorting.

So the more reducers there are, the faster it sorts. However, it
compresses each reducer output independently, so slicing too thin produces
bad network overheads.

Auto-reducer parallelism exists so that you don't need to tune each query
by hand to fit those trade-offs.

> 2. I did not understand the intuition behind setting
>hive.mapjoin.hybridgrace.hashtable=false (as mentioned in your previous
>reply).

Yes, it is the same impl from the wiki. But the grace hashjoin drops the
hashtable if it spills between executions of the same vertex.

The regular hashJoin does not reload the hashtable when the split changes,
this means the grace hashjoin can take 4-5x more time than the optimized
one.

The time it takes to load the hashtable goes up, while the lookups aren't
much different because the grace hash-join has a bloom filter on top of it.

If you have 35,000 splits and 800 containers, the hash-build times adds up
pretty quickly.

> 3. In general, map join  in cluster mode, are these the actual steps
>followed in hive/tez:
> a. Hash table generation:  Partitioned hash tables of the small table is
>created across multiple containers. In each container, a part of the
>small table is dealt with. And in each container, the hash table is built

No, broadcast tasks generate merely produces an unordered output - it is
not a hashtable.

This is done in parallel as you describe across multiple containers & on
the cluster (tries for locality next to the small tables).

> b. Broadcast of hash table: All the partitions of all the parts of mall
>table, including the ones spilled in the disk are serialized and sent to
>all the second map containers.

The broadcast is done via shuffle, same as sorted data movement, but one
which reads the unordered streams and builds a hashtable inside every
JoinOperator.

The hashtable is then put into a cache in the task which has scope of the
Vertex - if the same vertex re-runs on the same container, it will reload
from the cache instead of the shuffle stream.

The grace hashtable throws away in-mem data when it reloads a spilled
fraction of the hashtable, so the moment it has spilled it is no longer
considered for reuse.

> Where does the rebuilding of spilt hash table happen? Is it during
>second map phase where join is happening with bigger table?

The split-hashtable looks exactly like the regular hashtable, but it has 3
return values for the data - Yes, No, Ask-Later.

So other than the handling of the Ask-Later scenario, the split-hashtable
looks exactly like the full in-mem one.

> c. Join operator:  The big table is scanned in each second mapper,
>against the entire hash table of small table, and result is got.

Yes.

Hadoop Summit slides from 2014, in the slides above are a little out of
date, but they cover some of the basics related to how this all fits
together.

Cheers,
Gopal



Re: Hash table in map join - Hive

Posted by Ross Guth <ro...@gmail.com>.
Hi Gopal,

I saw the log files and the hash table information in it. Thanks.

Also, I enforced shuffle hash join. I had a couple of questions around it:

1. In the query plan, it still says Map Join Operator (Would have expected
it to be named as Reduce side operator).
2. The edges in this query plans were named as custom_simple_edge: Is this
the one pointing to the fact that sorting of mapper inputs are bypassed?
3. "hive.tez.auto.reducer.parallelism=true" is not taking effect for
shuffle hash join. With the same input tables, in merge join (Shuffle sort
merge join), it took 1009 reducers without auto reducer turned on and took
337 reducers in the other case. While in case of shuffle hash join, it is
not changing from 1009 to 337. Is there something else I need to do, for
getting this optimization feature on, in this case?

I had a few general questions too:
1. What does tez.auto.reducer.parallelism do -- Does it only reduce the
number of reducers based on the actual size of mapper output, or does it do
more. Because as mentioned above, in sort merge join case, if I try to
manually set the number of reduce tasks to 337 (using mapred.reduce.tasks
parameter), the execution time does not improve as much as when auto.red
param picks it by itself.

 2. I did not understand the intuition behind setting
hive.mapjoin.hybridgrace.hashtable=false (as mentioned in your previous
reply). Does hybrid grace hashtable mean the Hybrid Hybrid grace Hash join
implementation as mentioned here
<https://cwiki.apache.org/confluence/display/Hive/Hybrid+Hybrid+Grace+Hash+Join%2C+v1.0>
.
If it is set to true, the hash table is created with multiple partitions.
If it is set to false, is the hash table created as a single hash table?
Isn't the true case better, as it can handle the case where the hash join
cannot fit in memory better. Also, there will be smaller lookups. I ran
both the cases (with gracehashtable set to true and false), and did not see
any difference in execution time  -- maybe because my input size was
considerably small in that case.

3. In general, map join  in cluster mode, are these the actual steps
followed in hive/tez:
 a. *Hash table generation: * Partitioned hash tables of the small table is
created across multiple containers. In each container, a part of the small
table is dealt with. And in each container, the hash table is built for
that part, in 16 partitions. If any partition cannot fit in memory, it is
spilled to disk (with only disk file and not match file, since there is no
matching with big table happening).
b. *Broadcast of hash table*: All the partitions of all the parts of mall
table, including the ones spilled in the disk are serialized and sent to
all the second map containers.
c. *Join operator*:  The big table is scanned in each second mapper,
against the entire hash table of small table, and result is got.
Where does the rebuilding of spilt hash table happen? Is it during second
map phase where join is happening with bigger table?


Apologies for the long list of questions. But knowing this would be very
helpful to me.

Thanks in advance,
Ross

On Mon, Jun 27, 2016 at 7:25 PM, Gopal Vijayaraghavan <go...@apache.org>
wrote:

>
> > 1. OOM condition -- I get the following error when I force a map join in
> >hive/tez with low container size and heap size:"
> >java.lang.OutOfMemoryError: Java heap space". I was wondering what is the
> >condition which leads to this error.
>
> You are not modifying the noconditionaltasksize to match the Xmx at all.
>
> hive.auto.convert.join.noconditionaltask.size=(Xmx - io.sort.mb)/3.0;
>
>
> > 2.  Shuffle Hash Join -- I am using hive 2.0.1. What is the way to force
> >this join implementation? Is there any documentation regarding the same?
>
> <https://issues.apache.org/jira/browse/HIVE-10673>
>
>
> For full-fledged speed-mode, do
>
> set hive.vectorized.execution.reduce.enabled=true;
> set hive.optimize.dynamic.partition.hashjoin=true;
> set hive.vectorized.execution.mapjoin.native.fast.hashtable.enabled=true;
> set hive.mapjoin.hybridgrace.hashtable=false;
>
> > 3. Hash table size: I use "--hiveconf hive.root.logger=INFO,console" for
> >seeing logs. I do not see the hash table size in these logs.
>
> No, the hashtables are no longer built on the gateway nodes - that used to
> be a single point of failure when 20-25 usere are connected via the same
> box.
>
> The hashtable logs are in the task side (in this case, I would guess Map
> 2's logs would have it). The output is from a log like which looks like
>
> yarn logs -applicationId <app-id> | grep Map.*metrics
>
> > Map 1                      3                0            0
> >37.11             65,710              1,039     15,000,000
> >15,000,000
>
>
> So you have 15 million keys going into a single hashtable? The broadcast
> output rows is fed into the hashtable on the other side.
>
> The map-join sort of runs out of steam after about ~4 million entries - I
> would guess for your scenario setting the noconditional size to 8388608
> (~8Mb) might trigger the good path.
>
> Cheers,
> Gopal
>
>
>
>
>

Re: Hash table in map join - Hive

Posted by Ross Guth <ro...@gmail.com>.
Hi Gopal,

Thanks a lot for the answers. They were helpful.
I have a few more questions regarding this:

1. OOM condition -- I get the following error when I force a map join in
hive/tez with low container size and heap size:"
java.lang.OutOfMemoryError: Java heap space". I was wondering what is the
condition which leads to this error. Is it when, even one of the 16
partitions of hashtable, cannot fit in memory? I tried setting
hive.mapjoin.hybridgrace.minnumpartitions to higher values like 50 and 100
(expecting the size of each partition to drop drastically  [the join key
doesn't have skew in the distribution] ). But I still get the OOM error.
What is the cause?

2.  Shuffle Hash Join -- I am using hive 2.0.1. What is the way to force
this join implementation? Is there any documentation regarding the same?

3. Hash table size: I use "--hiveconf hive.root.logger=INFO,console" for
seeing logs. I do not see the hash table size in these logs. I tried using:
 "--hiveconf hive.tez.exec.print.summary=true". The output of this was the
following:

METHOD                         DURATION(ms)
parse                                  977
semanticAnalyze                      2,435
TezBuildDag                            473
TezSubmitToRunningDag                  841
TotalPrepTime                       10,451

VERTICES         TOTAL_TASKS  FAILED_ATTEMPTS KILLED_TASKS DURATION_SECONDS
   CPU_TIME_MILLIS     GC_TIME_MILLIS  INPUT_RECORDS   OUTPUT_RECORDS
Map 1                      3                0            0            37.11
            65,710              1,039     15,000,000       15,000,000
Map 2                     65                0            0           498.77
         4,163,920            100,613    615,037,902                0
This doesn't seem to have information about the hash table or #items
shuffled. Am I missing something ?


Thanks,
Ross

On Mon, Jun 27, 2016 at 9:10 AM, Gopal Vijayaraghavan <go...@apache.org>
wrote:

> > 1. Is there a way to check the size of the hash table created during map
> >side join in Hive/Tez?
>
> Only from the log files.
>
> However, you enable hive.tez.exec.print.summary=true; then the hive CLI
> will print out the total # of items shuffle from the broadcast edges
> feeding the hashtable.
>
> Not sure if you might have the reduce-side map-join in your builds, but
> that is a bit harder to look into.
>
> > 2. Is the hash table (small table's), created for the entire table or
> >only for the selected and join key columns?
>
> Yup. Project, filter and group-by (in case of semi-joins).
>
> select a from tab1 where a IN (select b from tab2 ...);
>
> will inject a "select distinct b" into the plan.
>
> > 3. The hash table (created in map side join) spills to disk, if it does
> >not fit in memory Is there a parameter in hive/tez to specify the
> >percentage of the hash file which can spill?
>
> Not directly.
>
> hive.mapjoin.hybridgrace.minnumpartitions=16
>
>
> by default. So 1/16th of your key space will spill, whenever it hits the
> spilling conditions - for the small table.
>
> In general, the Snowflake-model dimension tables are joined by their
> primary key, so the key-space corresponds to the row-distribution too.
>
> For the big table it will spill only a smaller fraction since the
> BloomFilter built during hashtable generation is not spilled, so anything
> which misses the bloom filter will not spill to disk during the join.
>
> All that said, the spilling hash-join is much slower than the shuffling
> hash-join (new in 2.0), because the grace hash-join drops parts of the
> hash out of memory after each iteration & has to rebuild it for each split
> processed.
>
> In terms of total CPU, building a 4 million row hash table in 600 tasks is
> slower than building a 7500 row hashtable x 600 times - the hashtable
> lookup goes up by LG(N) too.
>
> Ask me more questions, if you need more info.
>
> Cheer,
> Gopal
>
>
>

Re: Hash table in map join - Hive

Posted by Gopal Vijayaraghavan <go...@apache.org>.
> 1. Is there a way to check the size of the hash table created during map
>side join in Hive/Tez?

Only from the log files.

However, you enable hive.tez.exec.print.summary=true; then the hive CLI
will print out the total # of items shuffle from the broadcast edges
feeding the hashtable.

Not sure if you might have the reduce-side map-join in your builds, but
that is a bit harder to look into.

> 2. Is the hash table (small table's), created for the entire table or
>only for the selected and join key columns?

Yup. Project, filter and group-by (in case of semi-joins).

select a from tab1 where a IN (select b from tab2 ...);

will inject a "select distinct b" into the plan.

> 3. The hash table (created in map side join) spills to disk, if it does
>not fit in memory Is there a parameter in hive/tez to specify the
>percentage of the hash file which can spill?

Not directly.

hive.mapjoin.hybridgrace.minnumpartitions=16


by default. So 1/16th of your key space will spill, whenever it hits the
spilling conditions - for the small table.

In general, the Snowflake-model dimension tables are joined by their
primary key, so the key-space corresponds to the row-distribution too.

For the big table it will spill only a smaller fraction since the
BloomFilter built during hashtable generation is not spilled, so anything
which misses the bloom filter will not spill to disk during the join.

All that said, the spilling hash-join is much slower than the shuffling
hash-join (new in 2.0), because the grace hash-join drops parts of the
hash out of memory after each iteration & has to rebuild it for each split
processed.

In terms of total CPU, building a 4 million row hash table in 600 tasks is
slower than building a 7500 row hashtable x 600 times - the hashtable
lookup goes up by LG(N) too.

Ask me more questions, if you need more info.

Cheer,
Gopal