You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-issues@hadoop.apache.org by "Andras Gyori (Jira)" <ji...@apache.org> on 2021/03/19 16:11:00 UTC

[jira] [Comment Edited] (YARN-9927) RM multi-thread event processing mechanism

    [ https://issues.apache.org/jira/browse/YARN-9927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304992#comment-17304992 ] 

Andras Gyori edited comment on YARN-9927 at 3/19/21, 4:10 PM:
--------------------------------------------------------------

Thank you [~hcarrot] for raising this issue and [~zhuqi] for stepping up to continue this undertaking.
 These are my feedback and suggestions, which might not mirror the actual situation, therefore feel free to discuss the false information. I think the current patch is not the best approach of this problem. It does include the concerns already raised in this jira, and my insights as well.

The single entry point for an event is AsyncDispatcher#handle, which puts the event in the eventQueue, and is processed asynchronously in a single thread. There is no way this could be circumvented, because it is used as rmContext.getDispatcher() all over the place. We must retain this entry point.
 However I have a strong sense that the performance  bottleneck is actually the AsyncDispatcher#eventQueue (a BlockingQueue). In my opinion, the solution is exactly the suggestion that is already described in the documentation of AsyncDispatcher:
{code:java}
/**
  Dispatches {@link Event}s in a separate thread. Currently only single thread
  does that. Potentially there could be multiple channels for each event type
  class and a thread pool can be used to dispatch the events.
 */
{code}
My suggestion would be:
 # Store a new BlockingQueue for each event type in a HashMap
 # Create a new thread for each of the registered event type / eventQueue
 # Every thread is responsible for one eventQueue processing
 # The Dispatcher would map to an N:N:N (EventQueue:Thread:EventHandler) system (or a N:M:N where M is smaller than N in order to reduce the amount of threads), where N is the number of EventTypes registered

A more fine-grained solution is possible by making an M*N:M*N:N (EventQueue:Thread:EventHandler) system, where M is a number given on registration (how many thread should be processing this kind of event) and N is the number of EventTypes registered (as far as I am concerned the EventHandlers do not use locks internally, and they are thread safe). I am not sure if this is feasible, because of the external locks used in EventHandlers (eg. NodeEventHandler uses getRMNodes(), which is locked behind a ConcurrentMap -> I think this is the feedback which was given by [~adam.antal] and [~epayne]).

A dummy implementation of the aforementioned system would be:
{code:java}
public class ThreadedDispatcher {
    private final ConcurrentMap<EventType, BlockingQueue<Event>> events;
    private final ConcurrentMap<EventType, EventHandler<Event>> eventHandlers;

      public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
          new Thread(() -> {
         EventHandler<Event> handler = eventHandlers.get(eventType);
         BlockingQueue<Event> eventQueue = events.get(eventType);
         while (!stopped && !Thread.currentThread().isInterrupted()) {
                Event event = eventQueue.take();
                handler.handle(event);
             }
          }).run();

      class GenericEventHandler implements EventHandler<Event> {

          public void handle(Event event) {
              events.get(event.getType()).put(event);
          }
      }

      }
{code}
This could also be the less disruptive solution, by simply changing the AsyncDispatcher to this ThreadDispatcher and retaining the single entry point of the GenericEventHandler#handle. Ideally, nothing needs to be changed apart from the initialisation of the rmDispatcher. 

cc: [~pbacsko]


was (Author: gandras):
Thank you [~hcarrot] for raising this issue and [~zhuqi] for stepping up to continue this undertaking.
 These are my feedback and suggestions, which might not mirror the actual situation, therefore feel free to discuss the false information. I think the current patch is not the best approach of this problem. It does include the concerns already raised in this jira, and my insights as well.

The single entry point for an event is AsyncDispatcher#handle, which puts the event in the eventQueue, and is processed asynchronously in a single thread. There is no way this could be circumvented, because it is used as rmContext.getDispatcher() all over the place. We must retain this entry point.
 However I have a strong sense that the performance  bottleneck is actually the AsyncDispatcher#eventQueue (a BlockingQueue). In my opinion, the solution is exactly the suggestion that is already described in the documentation of AsyncDispatcher:
{code:java}
/**
  Dispatches {@link Event}s in a separate thread. Currently only single thread
  does that. Potentially there could be multiple channels for each event type
  class and a thread pool can be used to dispatch the events.
 */
{code}
My suggestion would be:
 # Store a new BlockingQueue for each event type in a HashMap
 # Create a new thread for each of the registered event type / eventQueue
 # Every thread is responsible for one eventQueue processing
 # The Dispatcher would map to an N:N:N (EventQueue:Thread:EventHandler) system (or a N:M:N where M is smaller than N in order to reduce the amount of threads), where N is the number of EventTypes registered

A more fine-grained solution is possible by making an M*N:M*N:N (EventQueue:Thread:EventHandler) system, where M is a number given on registration (how many thread should be processing this kind of event) and N is the number of EventTypes registered (as far as I am concerned the EventHandlers do not use locks internally, and they are thread safe). I am not sure if this is feasible, because of the external locks used in EventHandlers (eg. NodeEventHandler uses getRMNodes(), which is locked behind a ConcurrentMap -> I think this is the feedback which was given by [~adam.antal] and [~epayne]).

A dummy implementation of the aforementioned system would be:
{code:java}
public class ThreadedDispatcher {
    private final ConcurrentMap<EventType, BlockingQueue<Event>> events;
    private final ConcurrentMap<EventType, EventHandler<Event>> eventHandlers;

      public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
          new Thread(() -> {
         EventHandler<Event> handler = eventHandlers.get(eventType);
         BlockingQueue<Event> eventQueue = events.get(eventType);
         while (!stopped && !Thread.currentThread().isInterrupted()) {
                Event event = eventQueue.take();
                handler.handle(event);
             }
          }).run();

      class GenericEventHandler implements EventHandler<Event> {

          public void handle(Event event) {
              events.get(event.getType()).put(event);
          }
      }

      }
{code}
cc: [~pbacsko]

> RM multi-thread event processing mechanism
> ------------------------------------------
>
>                 Key: YARN-9927
>                 URL: https://issues.apache.org/jira/browse/YARN-9927
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: yarn
>    Affects Versions: 3.0.0, 2.9.2
>            Reporter: hcarrot
>            Assignee: Qi Zhu
>            Priority: Major
>         Attachments: RM multi-thread event processing mechanism.pdf, YARN-9927.001.patch
>
>
> Recently, we have observed serious event blocking in RM event dispatcher queue. After analysis of RM event monitoring data and RM event processing logic, we found that
> 1) environment: a cluster with thousands of nodes
> 2) RMNodeStatusEvent dominates 90% time consumption of RM event scheduler
> 3) Meanwhile, RM event processing is in a single-thread mode, and It results in the low headroom of RM event scheduler, thus performance of RM.
> So we proposed a RM multi-thread event processing mechanism to improve RM performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: yarn-issues-help@hadoop.apache.org