You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Lalitha MV <la...@gmail.com> on 2016/07/01 00:57:04 UTC

Re: Hash table in map join - Hive

Hi,

I was following this thread. I tried adding the patch of the jira manually (
https://issues.apache.org/jira/browse/TEZ-3287 ) [referenced in the above
reply for auto reducer optimization in shuffle hash join case]. I added it
to 0.8.3 while the patch was for the master.
But, I got a comment from the author that, the patch wouldn't affect --
hive.tez.auto.reducer.parallelism=true.
 Am I missing something?


Thanks,
Lalitha

On Thu, Jun 30, 2016 at 9:11 AM, Gopal Vijayaraghavan <go...@apache.org>
wrote:

>
> > 1. In the query plan, it still says Map Join Operator (Would have
> >expected it to be named as Reduce side operator).
>
> The "Map" in that case refers really to Map<K,V> rather the hadoop
> version. An unambigous name is if it were called the HashJoinOperator.
>
> This is one of the optimizations of Tez right now that a map-join can be
> inserted in any vertex, because "Map 1" is just really in the name (it is
> a vertex).
>
> Also, even if the input format was Text/Sequencefile, the reduce
> vectorization can vectorize the simple join cases because it is not tied
> to the inputformat anymore.
>
> > 2. The edges in this query plans were named as custom_simple_edge: Is
> >this the one pointing to the fact that sorting of mapper inputs are
> >bypassed?
>
> Not directly related, but the custom edges do their own edgemanager - the
> edgemanager that is there can possibly be replaced with a simple edge +
> unsorted input-output pairs since tez-0.5.x.
>
> But the edge has an extension which can do some non-simple things too,
> which is why Tez supports edge overrides like this.
>
>  <http://www.slideshare.net/Hadoop_Summit/w-235phall1pandey/13>
>
>
> > 3. "hive.tez.auto.reducer.parallelism=true" is not taking effect for
> >shuffle hash join.
>
> That issue was already reported by Twitter, the unsorted edges do not send
> out the output size bitsets.
>
>  <https://issues.apache.org/jira/browse/TEZ-3287>
>
>
> > 1. What does tez.auto.reducer.parallelism do -- Does it only reduce the
> >number of reducers based on the actual size of mapper output, or does it
> >do more.
>
> It does a bit more when PipelineSorter is enabled.
>
> The sorted edges actually partition-first and sort-then. So the sort-key
> is actually (reducer-n, key) & the first few bytes of that information is
> stored into metadata region of the sorter for better L1 cache hit-rate
> when sorting.
>
> So the more reducers there are, the faster it sorts. However, it
> compresses each reducer output independently, so slicing too thin produces
> bad network overheads.
>
> Auto-reducer parallelism exists so that you don't need to tune each query
> by hand to fit those trade-offs.
>
> > 2. I did not understand the intuition behind setting
> >hive.mapjoin.hybridgrace.hashtable=false (as mentioned in your previous
> >reply).
>
> Yes, it is the same impl from the wiki. But the grace hashjoin drops the
> hashtable if it spills between executions of the same vertex.
>
> The regular hashJoin does not reload the hashtable when the split changes,
> this means the grace hashjoin can take 4-5x more time than the optimized
> one.
>
> The time it takes to load the hashtable goes up, while the lookups aren't
> much different because the grace hash-join has a bloom filter on top of it.
>
> If you have 35,000 splits and 800 containers, the hash-build times adds up
> pretty quickly.
>
> > 3. In general, map join  in cluster mode, are these the actual steps
> >followed in hive/tez:
> > a. Hash table generation:  Partitioned hash tables of the small table is
> >created across multiple containers. In each container, a part of the
> >small table is dealt with. And in each container, the hash table is built
>
> No, broadcast tasks generate merely produces an unordered output - it is
> not a hashtable.
>
> This is done in parallel as you describe across multiple containers & on
> the cluster (tries for locality next to the small tables).
>
> > b. Broadcast of hash table: All the partitions of all the parts of mall
> >table, including the ones spilled in the disk are serialized and sent to
> >all the second map containers.
>
> The broadcast is done via shuffle, same as sorted data movement, but one
> which reads the unordered streams and builds a hashtable inside every
> JoinOperator.
>
> The hashtable is then put into a cache in the task which has scope of the
> Vertex - if the same vertex re-runs on the same container, it will reload
> from the cache instead of the shuffle stream.
>
> The grace hashtable throws away in-mem data when it reloads a spilled
> fraction of the hashtable, so the moment it has spilled it is no longer
> considered for reuse.
>
> > Where does the rebuilding of spilt hash table happen? Is it during
> >second map phase where join is happening with bigger table?
>
> The split-hashtable looks exactly like the regular hashtable, but it has 3
> return values for the data - Yes, No, Ask-Later.
>
> So other than the handling of the Ask-Later scenario, the split-hashtable
> looks exactly like the full in-mem one.
>
> > c. Join operator:  The big table is scanned in each second mapper,
> >against the entire hash table of small table, and result is got.
>
> Yes.
>
> Hadoop Summit slides from 2014, in the slides above are a little out of
> date, but they cover some of the basics related to how this all fits
> together.
>
> Cheers,
> Gopal
>
>
>

Re: Hash table in map join - Hive

Posted by Lalitha MV <la...@gmail.com>.
Also, a couple of follow up questions:

1. The grace hash has to reload/ rebuild the hash table for a new split,
only if it has spilled the hash table because of lack of memory space
right?
    How does the regular hash join handle the case when the hash table
cannot fit into memory? Does it create a single file hash table or multiple
partitions like in the case of grace hash join?

2. I tried to find the hash table size being built from the logs in a
container(assuming that the entire hash table information should be there
in every container).
    When I tried a grep for "Map metrics", I got:  Map metrics: keys
allocated 2097152, keys assigned 934963, write conflict 3330682, write max
dist 22, read conflict 1859185, expanded 0 times in 0ms
    From this, how do I estimate the net size of the hash table?

Thanks in advance,
Lalitha

On Thu, Jun 30, 2016 at 5:57 PM, Lalitha MV <la...@gmail.com> wrote:

> Hi,
>
> I was following this thread. I tried adding the patch of the jira manually
> (https://issues.apache.org/jira/browse/TEZ-3287 ) [referenced in the
> above reply for auto reducer optimization in shuffle hash join case]. I
> added it to 0.8.3 while the patch was for the master.
> But, I got a comment from the author that, the patch wouldn't affect --
> hive.tez.auto.reducer.parallelism=true.
>  Am I missing something?
>
>
> Thanks,
> Lalitha
>
> On Thu, Jun 30, 2016 at 9:11 AM, Gopal Vijayaraghavan <go...@apache.org>
> wrote:
>
>>
>> > 1. In the query plan, it still says Map Join Operator (Would have
>> >expected it to be named as Reduce side operator).
>>
>> The "Map" in that case refers really to Map<K,V> rather the hadoop
>> version. An unambigous name is if it were called the HashJoinOperator.
>>
>> This is one of the optimizations of Tez right now that a map-join can be
>> inserted in any vertex, because "Map 1" is just really in the name (it is
>> a vertex).
>>
>> Also, even if the input format was Text/Sequencefile, the reduce
>> vectorization can vectorize the simple join cases because it is not tied
>> to the inputformat anymore.
>>
>> > 2. The edges in this query plans were named as custom_simple_edge: Is
>> >this the one pointing to the fact that sorting of mapper inputs are
>> >bypassed?
>>
>> Not directly related, but the custom edges do their own edgemanager - the
>> edgemanager that is there can possibly be replaced with a simple edge +
>> unsorted input-output pairs since tez-0.5.x.
>>
>> But the edge has an extension which can do some non-simple things too,
>> which is why Tez supports edge overrides like this.
>>
>>  <http://www.slideshare.net/Hadoop_Summit/w-235phall1pandey/13>
>>
>>
>> > 3. "hive.tez.auto.reducer.parallelism=true" is not taking effect for
>> >shuffle hash join.
>>
>> That issue was already reported by Twitter, the unsorted edges do not send
>> out the output size bitsets.
>>
>>  <https://issues.apache.org/jira/browse/TEZ-3287>
>>
>>
>> > 1. What does tez.auto.reducer.parallelism do -- Does it only reduce the
>> >number of reducers based on the actual size of mapper output, or does it
>> >do more.
>>
>> It does a bit more when PipelineSorter is enabled.
>>
>> The sorted edges actually partition-first and sort-then. So the sort-key
>> is actually (reducer-n, key) & the first few bytes of that information is
>> stored into metadata region of the sorter for better L1 cache hit-rate
>> when sorting.
>>
>> So the more reducers there are, the faster it sorts. However, it
>> compresses each reducer output independently, so slicing too thin produces
>> bad network overheads.
>>
>> Auto-reducer parallelism exists so that you don't need to tune each query
>> by hand to fit those trade-offs.
>>
>> > 2. I did not understand the intuition behind setting
>> >hive.mapjoin.hybridgrace.hashtable=false (as mentioned in your previous
>> >reply).
>>
>> Yes, it is the same impl from the wiki. But the grace hashjoin drops the
>> hashtable if it spills between executions of the same vertex.
>>
>> The regular hashJoin does not reload the hashtable when the split changes,
>> this means the grace hashjoin can take 4-5x more time than the optimized
>> one.
>>
>> The time it takes to load the hashtable goes up, while the lookups aren't
>> much different because the grace hash-join has a bloom filter on top of
>> it.
>>
>> If you have 35,000 splits and 800 containers, the hash-build times adds up
>> pretty quickly.
>>
>> > 3. In general, map join  in cluster mode, are these the actual steps
>> >followed in hive/tez:
>> > a. Hash table generation:  Partitioned hash tables of the small table is
>> >created across multiple containers. In each container, a part of the
>> >small table is dealt with. And in each container, the hash table is built
>>
>> No, broadcast tasks generate merely produces an unordered output - it is
>> not a hashtable.
>>
>> This is done in parallel as you describe across multiple containers & on
>> the cluster (tries for locality next to the small tables).
>>
>> > b. Broadcast of hash table: All the partitions of all the parts of mall
>> >table, including the ones spilled in the disk are serialized and sent to
>> >all the second map containers.
>>
>> The broadcast is done via shuffle, same as sorted data movement, but one
>> which reads the unordered streams and builds a hashtable inside every
>> JoinOperator.
>>
>> The hashtable is then put into a cache in the task which has scope of the
>> Vertex - if the same vertex re-runs on the same container, it will reload
>> from the cache instead of the shuffle stream.
>>
>> The grace hashtable throws away in-mem data when it reloads a spilled
>> fraction of the hashtable, so the moment it has spilled it is no longer
>> considered for reuse.
>>
>> > Where does the rebuilding of spilt hash table happen? Is it during
>> >second map phase where join is happening with bigger table?
>>
>> The split-hashtable looks exactly like the regular hashtable, but it has 3
>> return values for the data - Yes, No, Ask-Later.
>>
>> So other than the handling of the Ask-Later scenario, the split-hashtable
>> looks exactly like the full in-mem one.
>>
>> > c. Join operator:  The big table is scanned in each second mapper,
>> >against the entire hash table of small table, and result is got.
>>
>> Yes.
>>
>> Hadoop Summit slides from 2014, in the slides above are a little out of
>> date, but they cover some of the basics related to how this all fits
>> together.
>>
>> Cheers,
>> Gopal
>>
>>

Re: Hash table in map join - Hive

Posted by Gopal Vijayaraghavan <go...@apache.org>.
> When is OOM error actually thrown? With
>hive.mapjoin.hybridgrace.hashtable set to true, spilling should be
>possible, so OOM error should not come.
...
> Is it the case when the hash table of not even one of the 16 partitions
>fits in memory? 

It will OOM if any one of them overflows.

The grace hash-join works really well when the memory side is a primary
key and doesn't have these sort of skews.


The real problem with the shuffle hash-join is that it amplifies the
skews, since it is doing distributions on the hashcode of the join keys.

> But increasing the partitions to 100 also did not solve the problem
>(This is in the case of 3G container size and 5G small table size.
> I have given a high value for
>hive.auto.convert.join.noconditionaltask.size so that the broadcast hash
>join path is picked.

Unfortunately, it will only start spilling after it overflows
"hive.auto.convert.join.noconditionaltask.size" or 55% of the heap.

set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.1;


    // Get the total available memory from memory manager
    long totalMapJoinMemory = desc.getMemoryNeeded();
    LOG.info("Memory manager allocates " + totalMapJoinMemory + " bytes
for the loading hashtable.");
    if (totalMapJoinMemory <= 0) {
      totalMapJoinMemory = HiveConf.getLongVar(
        hconf, 
HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
    }

    long processMaxMemory =
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
    if (totalMapJoinMemory > processMaxMemory) {
      float hashtableMemoryUsage = HiveConf.getFloatVar(
          hconf, HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE);
      LOG.warn("totalMapJoinMemory value of " + totalMapJoinMemory +
          " is greater than the max memory size of " + processMaxMemory);
      // Don't want to attempt to grab more memory than we have available
.. percentage is a bit arbitrary
      totalMapJoinMemory = (long) (processMaxMemory *
hashtableMemoryUsage);
    }


If you can devise an example using TPC-DS schema on hive-testbench, I can
run it locally and tell you what's happening.


Cheers,
Gopal



Re: Hash table in map join - Hive

Posted by Lalitha MV <la...@gmail.com>.
Hi Gopal,

Thanks a lot for the above update.

I had only one question hanging:

When is OOM error actually thrown? With hive.mapjoin.hybridgrace.hashtable
set to true, spilling should be possible, so OOM error should not come.
Is it the case when the hash table of not even one of the 16 partitions
fits in memory? But increasing the partitions to 100 also did not solve the
problem (This is in the case of 3G container size and 5G small table size.
I have given a high value for hive.auto.convert.join.noconditionaltask.size
so that the broadcast hash join path is picked. I know this is not
advisable, but I am still trying to enforce. When I give 100 partitions,
the hash table of 500Mb only has to fit in at a time, but it fails with
memory error )

Thanks,
Lalitha

On Thu, Jul 14, 2016 at 11:16 PM, Gopal Vijayaraghavan <go...@apache.org>
wrote:

> Hi,
>
> I got a chance to re-run this query today and it does auto-reduce the new
> CUSTOM_EDGE join as well.
>
> However it does it too early, before it has got enough information about
> both sides of the join.
>
> TPC-DS Query79 at 1Tb scale generates a CUSTOM_EDGE between the ms alias
> and
> customer tables.
>
> They're 13.7 million & 12 million rows each - the impl is not causing any
> trouble right now because they're roughly equal.
>
> This might be a feature I might disable for now, since the
> ShuffleVertexManager
> has no idea which side is destined for the hashtable build side & might
> squeeze
> the auto-reducer tighter than it should causing a possible OOM.
>
> Plus, the size that the system gets is actually the shuffle size, so it is
> post-compression
> and varies with the data compressibility (for instance, clickstreams
> compress really well
> due to the large number of common strings).
>
> A pre-req for the last problem would be to go fix
> <https://issues.apache.org/jira/browse/TEZ-2962>,
> which tracks pre-compression sizes.
>
> I'll have to think a bit more about the other part of the problem.
>
> Cheers,
> Gopal
>
> On 7/6/16, 12:52 PM, "Gopal Vijayaraghavan" <gopal@hortonworks.com on
> behalf of gopalv@apache.org> wrote:
>
> >
> >> I tried running the shuffle hash join with auto reducer parallelism
> >>again. But, it didn't seem to take effect. With merge join and auto
> >>reduce parallelism on, number of
> >> reducers drops from 1009 to 337, but didn't see that change in case of
> >>shuffle hash join .Should I be doing something more ?
> >
> >In your runtime, can you cross-check the YARN logs for
> >
> >"Defer scheduling tasks"
> >
> >and
> >
> >"Reduce auto parallelism for vertex"
> >
> >
> >The kick-off point for both join algorithms are slightly different.
> >
> >The merge-join uses stats from both sides (since until all tasks on both
> >sides are complete, no join ops can be performed).
> >
> >While the dynamic hash-join uses stats from only the hashtable side to do
> >this, since it can start producing rows immediately after the hashtable
> >side is complete & at least 1 task from the big-table side has completed.
> >
> >There's a scenario where the small table is too small even when it is
> >entirely complete at which point the system just goes ahead with reduce
> >tasks without any reduction (which would be faster than waiting for the
> >big table to hit slow-start before doing that).
> >
> >Cheers,
> >Gopal
>
>

Re: Hash table in map join - Hive

Posted by Gopal Vijayaraghavan <go...@apache.org>.
Hi,

I got a chance to re-run this query today and it does auto-reduce the new
CUSTOM_EDGE join as well.

However it does it too early, before it has got enough information about
both sides of the join.

TPC-DS Query79 at 1Tb scale generates a CUSTOM_EDGE between the ms alias
and 
customer tables.

They're 13.7 million & 12 million rows each - the impl is not causing any
trouble right now because they're roughly equal.

This might be a feature I might disable for now, since the
ShuffleVertexManager
has no idea which side is destined for the hashtable build side & might
squeeze
the auto-reducer tighter than it should causing a possible OOM.

Plus, the size that the system gets is actually the shuffle size, so it is
post-compression
and varies with the data compressibility (for instance, clickstreams
compress really well
due to the large number of common strings).

A pre-req for the last problem would be to go fix
<https://issues.apache.org/jira/browse/TEZ-2962>,
which tracks pre-compression sizes.

I'll have to think a bit more about the other part of the problem.

Cheers,
Gopal

On 7/6/16, 12:52 PM, "Gopal Vijayaraghavan" <gopal@hortonworks.com on
behalf of gopalv@apache.org> wrote:

>
>> I tried running the shuffle hash join with auto reducer parallelism
>>again. But, it didn't seem to take effect. With merge join and auto
>>reduce parallelism on, number of
>> reducers drops from 1009 to 337, but didn't see that change in case of
>>shuffle hash join .Should I be doing something more ?
>
>In your runtime, can you cross-check the YARN logs for
>
>"Defer scheduling tasks"
>
>and 
>
>"Reduce auto parallelism for vertex"
>
>
>The kick-off point for both join algorithms are slightly different.
>
>The merge-join uses stats from both sides (since until all tasks on both
>sides are complete, no join ops can be performed).
>
>While the dynamic hash-join uses stats from only the hashtable side to do
>this, since it can start producing rows immediately after the hashtable
>side is complete & at least 1 task from the big-table side has completed.
>
>There's a scenario where the small table is too small even when it is
>entirely complete at which point the system just goes ahead with reduce
>tasks without any reduction (which would be faster than waiting for the
>big table to hit slow-start before doing that).
>
>Cheers,
>Gopal



Re: Hash table in map join - Hive

Posted by Gopal Vijayaraghavan <go...@apache.org>.
> I tried running the shuffle hash join with auto reducer parallelism
>again. But, it didn't seem to take effect. With merge join and auto
>reduce parallelism on, number of
> reducers drops from 1009 to 337, but didn't see that change in case of
>shuffle hash join .Should I be doing something more ?

In your runtime, can you cross-check the YARN logs for

"Defer scheduling tasks"

and 

"Reduce auto parallelism for vertex"


The kick-off point for both join algorithms are slightly different.

The merge-join uses stats from both sides (since until all tasks on both
sides are complete, no join ops can be performed).

While the dynamic hash-join uses stats from only the hashtable side to do
this, since it can start producing rows immediately after the hashtable
side is complete & at least 1 task from the big-table side has completed.

There's a scenario where the small table is too small even when it is
entirely complete at which point the system just goes ahead with reduce
tasks without any reduction (which would be faster than waiting for the
big table to hit slow-start before doing that).

Cheers,
Gopal



Re: Hash table in map join - Hive

Posted by Lalitha MV <la...@gmail.com>.
Hi Gopal,

Since this jira is resolved, I cloned the master branch, compiled and used
the binaries (0.9 snapshot version of tez).

I tried running the shuffle hash join with auto reducer parallelism again.
But, it didn't seem to take effect. With merge join and auto reduce
parallelism on, number of reducers drops from 1009 to 337, but didn't see
that change in case of shuffle hash join .Should I be doing something more ?

Thanks,
Lalitha
On 30-Jun-2016 7:04 pm, "Gopal Vijayaraghavan" <go...@apache.org> wrote:

>
>
> > But, I got a comment from the author that, the patch wouldn't affect --
> >hive.tez.auto.reducer.parallelism=true.
> > Am I missing something?
>
>
> No, I've linked to the wrong JIRA :(
>
>  <https://issues.apache.org/jira/browse/TEZ-3206>
>
> Cheers,
> Gopal
>
>
>
>

Re: Hash table in map join - Hive

Posted by Gopal Vijayaraghavan <go...@apache.org>.

> But, I got a comment from the author that, the patch wouldn't affect --
>hive.tez.auto.reducer.parallelism=true.
> Am I missing something?


No, I've linked to the wrong JIRA :(

 <https://issues.apache.org/jira/browse/TEZ-3206>

Cheers,
Gopal