You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Rajesh Balamohan <ra...@gmail.com> on 2016/02/29 05:50:04 UTC

Re: How the actual "sample data" are implemented when using tez reduce auto-parallelism

"tez.shuffle-vertex-manager.desired-task-input-size" - Determines the
amount of desired task input size per reduce task. Default is around 100 MB.

"tez.shuffle-vertex-manager.min-task-parallelism" - Min task parallelism
that ShuffleVertexManager should honor. I.e, if the client has set it as
100, ShuffleVertexManager would not try auto-reduce less than 100 tasks.

"tez.shuffle-vertex-manager.min-src-fraction",
"tez.shuffle-vertex-manager.max-src-fraction" determine the slow-start
behavior.

Hive mainly sets "tez.shuffle-vertex-manager.desired-task-input-size" and
"tez.shuffle-vertex-manager.min-task-parallelism" at the time of creating
the DAG. Min-task-parallelism is determined internally in Hive by couple of
other parameters like "hive.tez.max.partition.factor /
hive.tez.min.partition.factor" along with data size per reduce task. For
instance, assume initial reduce task number is 100 &
hive.tez.max.partition.factor=2.0 and hive.tez.min.partition.factor=0.25.
In this case, Hive would set the reducers to 200 and the hint to tez for
its min-task-parallelism would be 25, so that Tez would not try to
auto-reduce below 25 tasks. This serves as a safety net.

In Tez, When a source task generates output, DataMovementEvent (via RPC) is
sent out and its payload carry details like outputsize.
ShuffleVertexManager keeps aggregating these values from different source
tasks and checks periodically on whether it can determine compute the value
for auto-reduce parallelism. If the aggregated data size is less than
configured "desired-task-input-size", it waits for output stats from more
source tasks. It is possible that by this time, the min-src-fraction
reaches it limits. But min-src-fraction config is dynamically overriden as
it is better to wait for data from more tasks to determine more accurate
value for auto-parallelism.

There can be scenarios where the auto-reduce computed value is greater than
the currently configured parallelism depending on the amount of data
emitted by source tasks.  In such cases, existing parallelism is used.

Following method contains details on how parallelism is determined at
runtime.
https://github.com/apache/tez/blob/fd75e640396da8d5e1c67ef554d5db1846e08c69/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java#L669

It is also possible for source to send the per-partition stats along with
the DataMovementEvent payload. Retaining all details in the same payload
can be fairly expensive. Currently, per-partition details are bucketted
into one of the data range (0, 1, 10, 100, 1000 MB) and are stored in
RoaringBitMap in the payload. This can be a little noisy, but atleast
provides better hints to ShuffleVertexManager. Based on this info,
ShuffleVertexManager can schedule the reducer task which would get the
maximum amount of data. This can be enabled via
"tez.runtime.report.partition.stats" (not enabled by default)

~Rajesh.B

On Sat, Feb 27, 2016 at 11:45 AM, LLBian <li...@126.com> wrote:

> Oh,I saw some useful mesage about statistics on data from TEZ_1167.
>  now, my main confusions are:
> (1) how does the reduce ShuffleVertexManger know how many sample data is
> enough to  estimate the whole vertex parallelism.
> (2) the relationship between edge and event
>
> I am eager to get your instruction.
> Any reply  would be very very grateful.
>
> At 2016-02-27 11:13:48, "LLBian" <li...@126.com> wrote:
> >
> >
> >Hello, Respected experts:
> >
>
> >Recently, I am studying  tez reduce auto-parallelism, I read the article "Apache Tez: Dynamic Graph Reconfiguration",TEZ-398 and HIVE-7158.
>
> >I found the HIVE-7158 said that "Tez can optionally sample data from a fraction of the tasks of a vertex and use that information to choose the number of downstream tasks for any given scatter gather edge".
> >I know how to use this optimization function,but I was so confused by
> this:
> >
> >" Tez defines a VertexManager event that can be used to send an arbitrary
> user payload to the vertex manager of a given vertex. The partitioning
> tasks (say the Map tasks) use this event to send statistics such as the
> size of the output partitions produced to the ShuffleVertexManager for the
> reduce vertex. The manager receives these events and tries to model the
> final output statistics that would be produced by the all the tasks."
> >
> >(1)How the actual "sample data" are implemented?I mean how does the
> reduce ShuffleVertexManger know how many sample data is enough to  estimate
> the whole vertex parallelism, is that relates to reduce slow-start?
> I studied the source code of apache tez-0.7.0, but still not very clear.
> Mybe I was too stupid to understood that.
> >(2)Is the partitioning tasks proactively send their output data stats to
> the consumer ShuffleVertexManger ? The event is sended by RPC or http?
> >
> >I am eager to get your instruction.
> >
> >Any reply would be very very grateful.
> >
> >
> >LuluBian
>



-- 
~Rajesh.B

Re:Re: How the actual "sample data" are implemented when using tez reduce auto-parallelism

Posted by Maria <li...@126.com>.
 Thank you very very very much for your patiently answers. I got it, this is very helpful to understand the auto-parallelism Optimization.

At 2016-02-29 12:50:04, "Rajesh Balamohan" <ra...@gmail.com> wrote:
 


"tez.shuffle-vertex-manager.desired-task-input-size" - Determines the amount of desired task input size per reduce task. Default is around 100 MB.



"tez.shuffle-vertex-manager.min-task-parallelism" - Min task parallelism that ShuffleVertexManager should honor. I.e, if the client has set it as 100, ShuffleVertexManager would not try auto-reduce less than 100 tasks.


"tez.shuffle-vertex-manager.min-src-fraction", "tez.shuffle-vertex-manager.max-src-fraction" determine the slow-start behavior.


Hive mainly sets "tez.shuffle-vertex-manager.desired-task-input-size" and "tez.shuffle-vertex-manager.min-task-parallelism" at the time of creating the DAG. Min-task-parallelism is determined internally in Hive by couple of other parameters like "hive.tez.max.partition.factor / hive.tez.min.partition.factor" along with data size per reduce task. For instance, assume initial reduce task number is 100 & hive.tez.max.partition.factor=2.0 and hive.tez.min.partition.factor=0.25.  In this case, Hive would set the reducers to 200 and the hint to tez for its min-task-parallelism would be 25, so that Tez would not try to auto-reduce below 25 tasks. This serves as a safety net.


In Tez, When a source task generates output, DataMovementEvent (via RPC) is sent out and its payload carry details like outputsize.  ShuffleVertexManager keeps aggregating these values from different source tasks and checks periodically on whether it can determine compute the value for auto-reduce parallelism. If the aggregated data size is less than configured "desired-task-input-size", it waits for output stats from more source tasks. It is possible that by this time, the min-src-fraction reaches it limits. But min-src-fraction config is dynamically overriden as it is better to wait for data from more tasks to determine more accurate value for auto-parallelism.


There can be scenarios where the auto-reduce computed value is greater than the currently configured parallelism depending on the amount of data emitted by source tasks.  In such cases, existing parallelism is used.  


Following method contains details on how parallelism is determined at runtime.
https://github.com/apache/tez/blob/fd75e640396da8d5e1c67ef554d5db1846e08c69/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java#L669


It is also possible for source to send the per-partition stats along with the DataMovementEvent payload. Retaining all details in the same payload can be fairly expensive. Currently, per-partition details are bucketted into one of the data range (0, 1, 10, 100, 1000 MB) and are stored in RoaringBitMap in the payload. This can be a little noisy, but atleast provides better hints to ShuffleVertexManager. Based on this info, ShuffleVertexManager can schedule the reducer task which would get the maximum amount of data. This can be enabled via "tez.runtime.report.partition.stats" (not enabled by default)


~Rajesh.B


On Sat, Feb 27, 2016 at 11:45 AM, LLBian <li...@126.com> wrote:

Oh,I saw some useful mesage about statistics on data from TEZ_1167.

 now, my main confusions are:

(1) how does the reduce ShuffleVertexManger know how many sample data is enough to  estimate the whole vertex parallelism.

(2) the relationship between edge and event



I am eager to get your instruction.

Any reply  would be very very grateful.





At 2016-02-27 11:13:48, "LLBian" <li...@126.com> wrote:

>

>

>Hello, Respected experts:

> 

>Recently, I am studying  tez reduce auto-parallelism, I read the article "Apache Tez: Dynamic Graph Reconfiguration",TEZ-398 and HIVE-7158.

>I found the HIVE-7158 said that "Tez can optionally sample data from a fraction of the tasks of a vertex and use that information to choose the number of downstream tasks for any given scatter gather edge".

>I know how to use this optimization function,but I was so confused by this:

>

>" Tez defines a VertexManager event that can be used to send an arbitrary user payload to the vertex manager of a given vertex. The partitioning tasks (say the Map tasks) use this event to send statistics such as the size of the output partitions produced to the ShuffleVertexManager for the reduce vertex. The manager receives these events and tries to model the final output statistics that would be produced by the all the tasks."

>

>(1)How the actual "sample data" are implemented?I mean how does the reduce ShuffleVertexManger know how many sample data is enough to  estimate the whole vertex parallelism, is that relates to reduce slow-start?  I studied the source code of apache tez-0.7.0, but still not very clear. Mybe I was too stupid to understood that.

>(2)Is the partitioning tasks proactively send their output data stats to the consumer ShuffleVertexManger ? The event is sended by RPC or http?

>

>I am eager to get your instruction.

>

>Any reply would be very very grateful.

>

>

>LuluBian





-- 

~Rajesh.B