You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/03 22:07:14 UTC

[GitHub] [druid] jihoonson opened a new issue #10352: Metrics reporting system for native parallel batch ingestion

jihoonson opened a new issue #10352:
URL: https://github.com/apache/druid/issues/10352


   ### Motivation
   
   Currently, the Parallel task doesn't provide any metrics and so you need to read through task logs when something goes wrong. Reading task logs is harder when it comes to Parallel task because you first need to find out what subtask went wrong in the supervisor task logs and then read through the subtask logs again. Even when you find the right task logs of a bad subtask, it might be hard to find anything good since task logs only have limited information of what the task has been doing which you need to interpret instead of actual metrics. We already find providing those metrics useful for streaming ingestion. The batch ingestion could similarly benefit from it.
   
   ### Proposed changes
   
   To help with easy debugging, the native parallel batch ingestion should provide useful metrics. These metrics will be exposed via both task reporting system and metrics emitter.
   
   #### Task reports
   
   Both live and complete task reports will be provided. Live reports will be provided while the ingestion task is running and complete task reports will be available once the task is done. 
   
   The subtask report will include metrics for bytes in/out, rows in/filtered/unparseable/out, disk spills, fetch time, and errors. The supervisor task report will include the metrics per phase which are mostly the average of subtask metrics.
   
   ##### Live reports
   
   The live reports of the supervisor task will include:
   - Complete phases
     - Phase duration
     - Complete status (number of succeeded/failed subtasks)
     - Errors of _N_ last failed subtasks
     - Average/min/max duration of succeeded subtasks
     - Average/min/max bytes/rows in/out of subtasks
     - Total bytes in/out of subtasks
     - Total rows in/filtered/unparseable/out of subtasks
     - Average/min/max number of disk spills of subtasks
     - Average/min/max fetch time of subtasks
     - Average/min/max number of created segments of subtasks
     - Total number of created segments
   - Current phase
     - Runtime of current phase
     - Progress (number of succeeded/failed/expected to succeed subtasks)
     - Errors of recently failed subtasks
     - Average/min/max duration of succeeded subtasks
     - Min/max bytes/rows in/out of succeeded subtasks
     - Moving average of running total of bytes/rows in/out of running subtasks
     - Running total of bytes in/out of subtasks (succeeded + running)
     - Running total of rows in/filtered/unparseable/out of subtasks (succeeded + running)
     - Average/min/max number of disk spills and spill time of succeeded subtasks
     - Average/min/max fetch time of succeeded subtasks
     - Average/min/max number of created segments of succeeded subtasks
     - Running total of number of created segments
   - Remaining phases
     - Remaining phase names
   
   The live reports of subtasks will include:
   - Moving average of bytes/rows in/out
   - Running total of bytes/rows in/filtered/unparseable/out
   - Running total of number of disk spills, average spill time
   - Fetch time
   
   ##### Complete reports
   
   The complete reports of the supervisor task will include:
   - Duration of the supervisor task
   - Segment metrics
     - Total number of segments published
     - Average/min/max number of segments published per interval
     - Average/min/max number of rows per segment
   - Total bytes in/out
   - Total rows in/filtered/unparseable/out
   - Supervisor task error if any
   - Per-phase metrics
     - Phase duration
     - Total bytes in/out
     - Total rows in/filtered/unparseable/out
     - Number of succeeded/failed subtasks
     - Errors of _N_ recent failed subtasks
     - Average/min/max runtime of subtasks
     - Average/min/max number of total disk spills and spill time of subtasks
     - Average/min/max fetch time of subtasks
   
   The complete reports of the subtasks will include:
   - Total bytes in/out
   - Total rows in/filtered/unparseable/out
   - Total number of disk spills and spill time
   - Fetch time
   - Error if failed
   
   #### Metrics
   
   For task metrics, all the above metrics will be emitted via metrics emitter as well.
   
   MiddleManager will additionally emit these metrics.
   - Moving average of shuffle bytes
   - Moving average of shuffle requests
   
   #### Live reporting system for Parallel task
   
   ![live reporting system](https://user-images.githubusercontent.com/2322288/92177181-dcc62e00-edf4-11ea-811e-eb5c42f1537d.png)
   ![live reporting system (1)](https://user-images.githubusercontent.com/2322288/92177187-de8ff180-edf4-11ea-9766-c1b3b9fbde12.png)
   
   - Subtasks periodically send their live reports to the supervisor task
     - Failing in sending metrics can make the subtask fail. Needs retries.
     - Subtasks can report their current status with metrics directly to the supervisor task.
   - The supervisor task can use those reports as a heartbeat signal
     - If a missing report is found, the supervisor task will check with the Overlord to see if the subtask did fail. If the subtask is alive, the missing report should be noted in the live report of the supervisor task. If the subtask died, the supervisor task issues a new subtask for retry.
     - The supervisor task immediately retries failed subtasks when they report failures. They cannot succeed after reporting failures.
     - When subtasks report successes, the supervisor task checks with the Overlord if they did succeed. They can fail even after reporting successes.
   
   #### Complete reporting system for Parallel task
   
   - The final report is pushed to both deep storage and to the supervisor task
     - TaskReportFileWriter will be used to push to deep storage in middleManager and indexer
     - TaskRunnerListener can be used to send to supervisor task in peon and indexer
   
   ### Rationale
   
   #### Rationale for the list of metrics
   
   Live reports and metrics are mostly useful for debugging. The new metrics should be able to answer these questions.
   
   How is my ingestion going?
   - How long has my supervisor task been running?
   - How long does each subtask run for?
   - How many phases left to run in my parallel ingestion?
   - How many subtasks left to run in the current phase?
   - How large data is each subtask processing?
   
   Why is my ingestion slow?
   - Are there any intermittent subtask failures?
   - Is each subtask processing too many data?
   - In each subtask, are there too many spilling on disk?
   - Is shuffle slow?
   
   What was the last state of my succeeded ingestion?
   - How many segments did my ingestion create?
   - What was total size of created segments?
   - How many subtask failures were in my ingestion? What were those failures if any?
   - How long did my ingestion take?
   
   Why did my ingestion fail?
   - Were there any subtask failures? If so, what were the error messages?
   - Did the parallel task fail? If so, what was the error message?
   
   Why does my ingestion not create segments?
   - How many rows were in published segments?
   - How many rows were filtered out?
   - How many rows were unparseable?
   
   Why is my query slow after ingestion?
   - Are there too few rows per segment?
   - Are there too many segments per time chunk?
   
   #### More HTTP connections for live reporting system
   
   In the proposed live reporting system, each subtask needs to talk to its supervisor task over HTTP. This will result in more HTTP connections between tasks. However, I would like to go with the current approach for now instead of making connections between middleManagers because
   - Currently, there is another API in the supervisor task which every subtask directly calls to allocate new segments in dynamic partitioning. This API call cannot be delegated by middleManager without introducing a new async API framework.
   - Even though there is already a supervisor task API called by every subtask, I would say the number of HTTP connections won’t be that large in most cases. The number of HTTP connections is computed by `maxNumConcurrentSubTasks` * `druid.global.http.numConnections` (20 by default). In general, `maxNumConcurrentSubTasks` doesn’t go beyond 200 even in large clusters.
   - The concern with high number of connections could be that too many connections can affect query performance somehow in the data node model where a middleManager and a historical live in the same machine. However, even though every subtask is already making connections to the supervisor task in dynamic partitioning, I haven’t heard of any problem in that yet.
   - If we did observe some problem with very large number of HTTP connections, there would be still workarounds.
     - We can adjust `maxNumConcurrentSubTasks` or `druid.global.http.numConnections`. Especially for `druid.global.http.numConnections`, I’m not sure why it’s defaulted as 20 for peons and middleManagers. We should consider lowering it.
     - Another workaround is using Indexer. This problem doesn’t exist with Indexers since the connections will be made between Indexers, not tasks.
   
   #### Additional memory pressure in the supervisor task
   
   The supervisor task will track metrics per phase not per subtask (except for error messages in failed subtasks). The metrics for each phase is computed by aggregating subtask metrics whenever they send reports. As a result, the supervisor task needs to keep more or less 20 metrics per phase in memory. This shouldn't be large.
   
   ### Operational impact
   
   As described above, there will be two changes of more HTTP connections between peons and additional memory usage in the supervisor task. However, neither of them is expected to have a huge impact in operation.
   
   ### Test plan
   
   The live and complete reports should be tested in integration tests. I will perform some testing for metrics on our internal cluster.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on issue #10352: Metrics reporting system for native parallel batch ingestion

Posted by GitBox <gi...@apache.org>.
loquisgon commented on issue #10352:
URL: https://github.com/apache/druid/issues/10352#issuecomment-853328406


   Putting the communication among tasks/indexer/supervisor in a streaming system (async) is an alternative to sync communication using HTTP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on issue #10352: Metrics reporting system for native parallel batch ingestion

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on issue #10352:
URL: https://github.com/apache/druid/issues/10352#issuecomment-688864472


   I'm delighted to see this proposal. I think adding these metrics are essential for users who need to analyze issues during ingestion.
   Currently, to evaluate #10001, parsing logs was the only method that we could use to analyze resource consumption in our experiments and in production.
   I hope this proposal will gain the attention it deserves.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on issue #10352: Metrics reporting system for native parallel batch ingestion

Posted by GitBox <gi...@apache.org>.
jihoonson commented on issue #10352:
URL: https://github.com/apache/druid/issues/10352#issuecomment-686789221


   I haven't come up with metrics names yet. Will add them to this proposal later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] mghosh4 commented on issue #10352: Metrics reporting system for native parallel batch ingestion

Posted by GitBox <gi...@apache.org>.
mghosh4 commented on issue #10352:
URL: https://github.com/apache/druid/issues/10352#issuecomment-717663587


   Thanks @jihoonson for working on this. This will be super useful. I had a few things that I wanted to draw your attention to:
   1. We have scenarios in our setup where `maxNumConcurrentSubTasks` can become as large as 700 or so. That being said, I do believe having 20 `numConnections` is highly unnecessary considering they will not get any queries (batch ingestion). Do you foresee any issues at this scale?
   2. Another alternate design that I can think of is that the tasks continue to report their metrics to the Overlord (heartbeat) and the supervisor task polls them like it already does to check for health status. It would mean slightly higher memory requirements on Overlord side assuming they might have to store this information. But overlord can probably store aggregates for most metrics. What are other downsides you see? One pro is it does not add any new HTTP connection requirements.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org