You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@tez.apache.org by Grandl Robert <rg...@yahoo.com> on 2014/07/03 04:08:49 UTC

tez counters

Two high level questions:

1. It is possible to use updateProgressSplits() method in TaskAttemptImpl.java to get periodic statistic updates ?
I was trying to do that for Memory, but the values reported are different from the ones reported in TaskCounterUpdater.java
(updateResourceCounters()).

2. We really need byte counters per task. Specifically: total input size, total output size, total input size read from network(or disk), total output size written
to disk(or network). 



More detailed explanations for each of them: 

Here is a list of counters we would like to have. Do you
think it is possible to have an 
implementation in Tez, or to give us some feedback on how to
do it(e.g. - which classes to touch, how to propagate them to a sensible place,
etc.) ?


1. CPU, Memory
We need "cpu time spent by the task [ms]" and
"peak physical/virtual memory used[B/KB/...]". 
 
For CPU, there is a TaskCounter CPU_MILLISECONDS which is
updated in TaskCounterUpdater.java in 
a method updateResourceCounters() butonly at the end of task lifetime. Ideally, we would like to be 
updated periodically over task lifetime. 
 
For Memory, there are PHYSICAL/VIRTUAL_MEMORY_BYTES counters
in TaskCounter, updated in the same locations 
as CPU_MILLISECONDS. However, in order toobserve the peak usage we would
like to poll over task lifetime. 
 
Periodic polling can be done in updateProgressSplits()
method in TaskAttemptImpl.java. Can we use that to do 
periodic snapshots? 
 
I was trying to create a ResourceCalculatorProcessTree
object in updateProgressSplits() and capture the cpu/mem used by the 
whole subtree of processes. However, the values reported are
different from the ones reported in updateResourceCounters(). 
We suspect that either TaskAttemptImpl is a different
process or a parent of TaskCounterUpdater. Should
that matter? 
 
 
2. Network, Storage
We need the following counters per task: 
total input size, total output size, total input size read from network(or disk), total output size written
to disk(or network). 
 
Ideally, we would like the bytes read from network/disk or
written to disk/network periodically. So we can compute task rates over time. 
Otherwise, we want these values at the very end of task, at
least. 
 
I was looking to all the counters exposed in Tez.
Definitely, there are no counters specifically for network or disk. Also, I
could not find 
any counter for input size other than an INPUT_RECORDS_PROCESSED TaskCounter. However, I
am not sure how to translate this to bytes or if it's general 
enough for all types of tasks(is
the size of a record always 36 bytes ?).
Similar for output, there are two counters OUTPUT_RECORDS and OUTPUT_BYTES, but 
they are not present for all types of tasks. For example,
for a Shuffle Task these aforementioned counters are not present at all. 
 
Thanks,
Robert

RE: tez counters

Posted by Bikas Saha <bi...@hortonworks.com>.
The updateProgressSplits() code is dead and should be removed.



TaskCounter.java There is a comment in that code saying explicitly that it
should be called once. It also says that it should be improved to all
incremental updates. So you should probably open a jira for this. Most
likely, there may be perf issues in doing this more often.



Yes. TaskAttemptImpl class is running in the AppMaster to logically
represent a task. So it’s a different JVM.



For counters/memory, the method is called
TaskCounter.updateResourceCounters(). So it may be that updateCounters()
which updates file system counters is expensive but
updateResourceCounters() is not that expensive. It may be possible to call
updateResourceCounters() more frequently. The updateResourceCounters() may
be called periodically from the same location that currently calls
getCounter() in TaskReporter.java. That would update cpu/mem every second
or so.

        *if* ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) *
pollInterval >= sendCounterInterval) {

          counters = task.getCounters();

          prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;

        }



You can get some input/output information from the following. These are
dependent on the task inputs. So when reading from HDFS you will get some
counters and while reading intermediate data you will get some counters.
These are legacy counters inherited from MR. Ideally, we would like to
replace these with Tez counters.



e.g.

Here are the counters for a “reduce” vertex that reads intermediate data
over network and writes to HDFS. Shuffle can almost always be assumed to be
over the network when happening over a non 1-1 edge.

2014-06-30 21:32:50,526 INFO [AsyncDispatcher event handler]
org.apache.tez.dag.history.HistoryEventHandler:
[HISTORY][DAG:dag_1404192318760_0002_1][Event:TASK_ATTEMPT_FINISHED]:
vertexName=summer,
taskAttemptId=attempt_1404192318760_0002_1_01_000000_0,
startTime=1404192769568, finishTime=1404192770520,
timeTaken=952, status=SUCCEEDED, diagnostics=, counters=Counters: 38, File
System Counters, FILE: BYTES_READ=1603,         FILE: BYTES_WRITTEN=1603,
FILE: READ_OPS=0, FILE: LARGE_READ_OPS=0, FILE: WRITE_OPS=0, HDFS:
BYTES_READ=0, HDFS: BYTES_WRITTEN=1009, HDFS: READ_OPS=3, HDFS:
LARGE_READ_OPS=0, HDFS: WRITE_OPS=2,
org.apache.tez.common.counters.           TaskCounter,
REDUCE_INPUT_GROUPS=87, REDUCE_INPUT_RECORDS=109, COMBINE_INPUT_RECORDS=0,
SPILLED_RECORDS=109,               NUM_SHUFFLED_INPUTS=1,
NUM_SKIPPED_INPUTS=0, NUM_FAILED_SHUFFLE_INPUTS=0, MERGED_MAP_OUTPUTS=1,
GC_TIME_MILLIS=26,         CPU_MILLISECONDS=-780,
PHYSICAL_MEMORY_BYTES=137244672,
VIRTUAL_MEMORY_BYTES=978984960,
COMMITTED_HEAP_BYTES=150994944, OUTPUT_RECORDS=88,
ADDITIONAL_SPILLS_BYTES_WRITTEN=1603,
ADDITIONAL_SPILLS_BYTES_READ=1603, SHUFFLE_BYTES=1603,
SHUFFLE_BYTES_DECOMPRESSED=1599, SHUFFLE_BYTES_TO_MEM=1603,
SHUFFLE_BYTES_TO_DISK=0, NUM_MEM_TO_DISK_MERGES=0,
NUM_DISK_TO_DISK_MERGES=0, Shuffle Errors, BAD_ID=0, CONNECTION=0,
IO_ERROR=0, WRONG_LENGTH=0, WRONG_MAP=0, WRONG_REDUCE=0



Similarly, here are “map” that reads from HDFS and writes to local disk.

dag_1404192318760_0002_1][Event:TASK_ATTEMPT_FINISHED]:
vertexName=tokenizer,

taskAttemptId=attempt_1404192318760_0002_1_00_000000_0,
startTime=1404192765530, finishTime=1404192766540, timeTaken=1010,
status=SUCCEEDED, diagnostics=, counters=Counters: 25,
org.apache.tez.common.counters.DAGCounter, DATA_LOCAL_TASKS=1, File System
Counters, FILE: BYTES_READ=32, FILE: BYTES_WRITTEN=1635, FILE: READ_OPS=0,
FILE: LARGE_READ_OPS=0, FILE:            WRITE_OPS=0, HDFS: BYTES_READ=991,
HDFS: BYTES_WRITTEN=0, HDFS: READ_OPS=1, HDFS: LARGE_READ_OPS=0, HDFS:
WRITE_OPS=0, org.apache.tez.common.counters.TaskCounter,
SPILLED_RECORDS=109, GC_TIME_MILLIS=93,
CPU_MILLISECONDS=-640,                    PHYSICAL_MEMORY_BYTES=266371072,
VIRTUAL_MEMORY_BYTES=978378752,
COMMITTED_HEAP_BYTES=204472320,
INPUT_RECORDS_PROCESSED=28, OUTPUT_RECORDS=109, OUTPUT_BYTES=1379,
OUTPUT_BYTES_WITH_OVERHEAD=1599,
OUTPUT_BYTES_PHYSICAL=1603, ADDITIONAL_SPILLS_BYTES_WRITTEN=0,
ADDITIONAL_SPILLS_BYTES_READ=0, ADDITIONAL_SPILL_COUNT=0





So the above counters cover hdfs read, local write, network read and hdfs
write. That should cover your cases?

The above logs are from the AppMaster syslog_dag_* file. You may get away
by downloading and parsing this out to get your historical data



*From:* Grandl Robert [mailto:rgrandl@yahoo.com]
*Sent:* Wednesday, July 02, 2014 7:09 PM
*To:* user@tez.incubator.apache.org
*Subject:* tez counters



Two high level questions:



1. It is possible to use updateProgressSplits() method in
TaskAttemptImpl.java to get periodic statistic updates ?

I was trying to do that for Memory, but the values reported are different
from the ones reported in TaskCounterUpdater.java

(updateResourceCounters()).



2. We really need byte counters per task. Specifically: total input size,
total output size, total input size read from network(or disk), total
output size written to disk(or network).





More detailed explanations for each of them:



Here is a list of counters we would like to have. Do you think it is
possible to have an

implementation in Tez, or to give us some feedback on how to do it(e.g. -
which classes to touch, how to propagate them to a sensible place, etc.) ?



1. CPU, Memory

We need "cpu time spent by the task [ms]" and "peak physical/virtual memory
used[B/KB/...]".



For CPU, there is a TaskCounter CPU_MILLISECONDS which is updated in
TaskCounterUpdater.java in

a method updateResourceCounters() but only at the end of task lifetime.
Ideally, we would like to be

updated periodically over task lifetime.



For Memory, there are PHYSICAL/VIRTUAL_MEMORY_BYTES counters in
TaskCounter, updated in the same locations

as CPU_MILLISECONDS. However, in order to observe the peak usage we would
like to poll over task lifetime.



Periodic polling can be done in updateProgressSplits() method in
TaskAttemptImpl.java. Can we use that to do

periodic snapshots?



I was trying to create a ResourceCalculatorProcessTree object in
updateProgressSplits() and capture the cpu/mem used by the

whole subtree of processes. However, the values reported are different from
the ones reported in updateResourceCounters().

We suspect that either TaskAttemptImpl is a different process or a parent
of TaskCounterUpdater. Should that matter?





2. Network, Storage

We need the following counters per task:

total input size, total output size, total input size read from network(or
disk), total output size written to disk(or network).



Ideally, we would like the bytes read from network/disk or written to
disk/network periodically. So we can compute task rates over time.

Otherwise, we want these values at the very end of task, at least.



I was looking to all the counters exposed in Tez. Definitely, there are no
counters specifically for network or disk. Also, I could not find

any counter for input size other than an INPUT_RECORDS_PROCESSED
TaskCounter. However, I am not sure how to translate this to bytes or if
it's general

enough for all types of tasks (is the size of a record always 36 bytes ?).
Similar for output, there are two counters OUTPUT_RECORDS and OUTPUT_BYTES,
but

they are not present for all types of tasks. For example, for a Shuffle
Task these aforementioned counters are not present at all.



Thanks,

Robert

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

Re: tez counters

Posted by "Jianfeng (Jeff) Zhang" <jz...@hortonworks.com>.
Hi Robert,

See my comments below ( red )  let us know if you have any more questions.

Best Regards,
Jeff Zhang



On Thu, Jul 3, 2014 at 10:08 AM, Grandl Robert <rg...@yahoo.com> wrote:

> Two high level questions:
>
> 1. It is possible to use updateProgressSplits() method in
> TaskAttemptImpl.java to get periodic statistic updates ?
> I was trying to do that for Memory, but the values reported are different
> from the ones reported in TaskCounterUpdater.java
> (updateResourceCounters()).
>
>
        Most of all, you need to know that TaskAttemptImpl is in AM side
while the real task executed is in another container ( they are in
different VM ). And the real task executed in container heartbeat with the
AM to tell him the status of task.
        The StatusUpdaterTransition where updateProgressSplits() is in AM
and it is responsible for get the status of the task in another container.



> 2. We really need byte counters per task. Specifically: total input size,
> total output size, total input size read from network(or disk), total
> output size written to disk(or network).
>
>
> More detailed explanations for each of them:
>
> Here is a list of counters we would like to have. Do you think it is
> possible to have an
> implementation in Tez, or to give us some feedback on how to do it(e.g. -
> which classes to touch, how to propagate them to a sensible place, etc.) ?
>
> 1. CPU, Memory
> We need "cpu time spent by the task [ms]" and "peak physical/virtual
> memory used[B/KB/...]".
>
> For CPU, there is a TaskCounter CPU_MILLISECONDS which is updated in
> TaskCounterUpdater.java in
> a method updateResourceCounters() but only at the end of task lifetime.
> Ideally, we would like to be
> updated periodically over task lifetime.
>
>
       currently, TaskCounterUpdater. is designed for being invoked one
time rather than incremental update


> For Memory, there are PHYSICAL/VIRTUAL_MEMORY_BYTES counters in
> TaskCounter, updated in the same locations
> as CPU_MILLISECONDS. However, in order to observe the peak usage we would
> like to poll over task lifetime.
>
> Periodic polling can be done in updateProgressSplits() method in
> TaskAttemptImpl.java. Can we use that to do
> periodic snapshots?
>
> I was trying to create a ResourceCalculatorProcessTree object in
> updateProgressSplits() and capture the cpu/mem used by the
> whole subtree of processes. However, the values reported are different
> from the ones reported in updateResourceCounters().
> We suspect that either TaskAttemptImpl is a different process or a parent
> of TaskCounterUpdater. Should that matter?
>
>  They are in different VM as I said above, so they should be different
>


> 2. Network, Storage
> We need the following counters per task:
> total input size, total output size, total input size read from
> network(or disk), total output size written to disk(or network).
>
> Ideally, we would like the bytes read from network/disk or written to
> disk/network periodically. So we can compute task rates over time.
> Otherwise, we want these values at the very end of task, at least.
>
> I was looking to all the counters exposed in Tez. Definitely, there are no
> counters specifically for network or disk. Also, I could not find
> any counter for input size other than an INPUT_RECORDS_PROCESSED
> TaskCounter. However, I am not sure how to translate this to bytes or if
> it's general
> enough for all types of tasks (is the size of a record always 36 bytes
> ?). Similar for output, there are two counters OUTPUT_RECORDS and
> OUTPUT_BYTES, but
> they are not present for all types of tasks. For example, for a Shuffle
> Task these aforementioned counters are not present at all.
>
>
     For the input size, there's a workaround. If it is root input, you can
get the input size by using fs command, if it is an intermediate result, it
should been equal to sum of its parent vertex output size.  (See OUTPUT_BYTES
and OUTPUT_BYTES_PHYSICAL in TaskCounter )


So overall, my suggestion is that you could modify TaskCounterUpdater to
make it support periodic update. And then propagate these counters to AM
through heartbeat.

Look at the method sendTaskGeneratedEvents in class
LogicalIOProcessorRuntimeTask, currently it is only called in initialize
and close stage.  You may need a thread to update the counter and call this
method periodically to send the counters to AM.




Thanks,
> Robert
>
>

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.