You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2019/09/05 08:55:56 UTC

[hadoop] branch branch-3.1 updated: YARN-8995. Log events info in AsyncDispatcher when event queue size cumulatively reaches a certain number every time. Contributed by zhuqi.

This is an automated email from the ASF dual-hosted git repository.

taoyang pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d74c069  YARN-8995. Log events info in AsyncDispatcher when event queue size cumulatively reaches a certain number every time. Contributed by zhuqi.
d74c069 is described below

commit d74c0694272dbe87675970497ece5983d48cf3bb
Author: Tao Yang <ta...@apache.org>
AuthorDate: Thu Sep 5 16:20:05 2019 +0800

    YARN-8995. Log events info in AsyncDispatcher when event queue size cumulatively reaches a certain number every time. Contributed by zhuqi.
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java | 14 ++++++
 .../apache/hadoop/yarn/event/AsyncDispatcher.java  | 39 +++++++++++++++
 .../src/main/resources/yarn-default.xml            | 13 +++++
 .../hadoop/yarn/event/TestAsyncDispatcher.java     | 57 ++++++++++++++++++++++
 4 files changed, 123 insertions(+)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index abf6fcf..124169c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2279,6 +2279,20 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000;
 
   /**
+   * The threshold used to trigger the logging of event types and counts
+   *  in RM's main event dispatcher. Default value is 5000,
+   *  which means RM will print events info when the queue size cumulatively
+   *  reaches 5000 every time. Such info can be used to reveal what
+   *  kind of events that RM is stuck at processing mostly,
+   *  it can help to narrow down certain performance issues.
+   */
+  public static final String
+          YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD =
+          YARN_PREFIX + "dispatcher.print-events-info.threshold";
+  public static final int
+          DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD = 5000;
+
+  /**
    * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
    * entries
    */
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 4a78a22..81448c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -24,11 +24,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -50,8 +52,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
 
   private final BlockingQueue<Event> eventQueue;
   private volatile int lastEventQueueSizeLogged = 0;
+  private volatile int lastEventDetailsQueueSizeLogged = 0;
   private volatile boolean stopped = false;
 
+  //Configuration for control the details queue event printing.
+  private int detailsInterval;
+  private boolean printTrigger = false;
+
   // Configuration flag for enabling/disabling draining dispatcher's events on
   // stop functionality.
   private volatile boolean drainEventsOnStop = false;
@@ -124,6 +131,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
           }
           if (event != null) {
             dispatch(event);
+            if (printTrigger) {
+              //Log the latest dispatch event type
+              // may cause the too many events queued
+              LOG.info("Latest dispatch event type: " + event.getType());
+              printTrigger = false;
+            }
           }
         }
       }
@@ -136,6 +149,15 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
   }
 
   @Override
+  protected void serviceInit(Configuration conf) throws Exception{
+    super.serviceInit(conf);
+    this.detailsInterval = getConfig().getInt(YarnConfiguration.
+                    YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD,
+            YarnConfiguration.
+                    DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD);
+  }
+
+  @Override
   protected void serviceStart() throws Exception {
     //start all the components
     super.serviceStart();
@@ -243,6 +265,17 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
   }
 
   class GenericEventHandler implements EventHandler<Event> {
+    private void printEventQueueDetails(BlockingQueue<Event> queue) {
+      Map<Enum, Long> counterMap = eventQueue.stream().
+              collect(Collectors.
+                      groupingBy(e -> e.getType(), Collectors.counting())
+              );
+      for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
+        long num = entry.getValue();
+        LOG.info("Event type: " + entry.getKey()
+                + ", Event record counter: " + num);
+      }
+    }
     public void handle(Event event) {
       if (blockNewEvents) {
         return;
@@ -256,6 +289,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
         lastEventQueueSizeLogged = qSize;
         LOG.info("Size of event-queue is " + qSize);
       }
+      if (qSize != 0 && qSize % detailsInterval == 0
+              && lastEventDetailsQueueSizeLogged != qSize) {
+        lastEventDetailsQueueSizeLogged = qSize;
+        printEventQueueDetails(eventQueue);
+        printTrigger = true;
+      }
       int remCapacity = eventQueue.remainingCapacity();
       if (remCapacity < 1000) {
         LOG.warn("Very low remaining capacity in the event-queue: "
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index dfb6cbe..748329c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -108,6 +108,19 @@
   </property>
 
   <property>
+    <description>
+      The threshold used to trigger the logging of event types
+      and counts in RM's main event dispatcher. Default length is 5000,
+      which means RM will print events info when the queue size cumulatively
+      reaches 5000 every time. Such info can be used to reveal what kind of events
+      that RM is stuck at processing mostly, it can help to
+      narrow down certain performance issues.
+    </description>
+    <name>yarn.dispatcher.print-events-info.threshold</name>
+    <value>5000</value>
+  </property>
+
+  <property>
     <description>The expiry interval for application master reporting.</description>
     <name>yarn.am.liveness-monitor.expiry-interval-ms</name>
     <value>600000</value>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
index 2b9d745..762e228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.yarn.event;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.slf4j.Logger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -93,6 +96,20 @@ public class TestAsyncDispatcher {
     DUMMY
   }
 
+  private static class TestHandler implements EventHandler<Event> {
+    @Override
+    public void handle(Event event) {
+      try {
+        // As long as 10000 events queued
+        Thread.sleep(1500);
+      } catch (InterruptedException e) {}
+    }
+  }
+
+  private enum TestEnum {
+    TestEventType
+  }
+
   @SuppressWarnings({ "rawtypes", "unchecked" })
   private void dispatchDummyEvents(Dispatcher disp, int count) {
     for (int i = 0; i < count; i++) {
@@ -119,5 +136,45 @@ public class TestAsyncDispatcher {
     disp.close();
     assertEquals(0, queue.size());
   }
+
+  //Test print dispatcher details when the blocking queue is heavy
+  @Test(timeout = 10000)
+  public void testPrintDispatcherEventDetails() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.
+            YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 5000);
+    Logger log = mock(Logger.class);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+
+    Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
+    logger.setAccessible(true);
+    Field modifiers = Field.class.getDeclaredField("modifiers");
+    modifiers.setAccessible(true);
+    modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
+    Object oldLog = logger.get(null);
+
+    try {
+      logger.set(null, log);
+      dispatcher.register(TestEnum.class, new TestHandler());
+      dispatcher.start();
+
+      for (int i = 0; i < 10000; ++i) {
+        Event event = mock(Event.class);
+        when(event.getType()).thenReturn(TestEnum.TestEventType);
+        dispatcher.getEventHandler().handle(event);
+      }
+      verify(log, atLeastOnce()).info("Event type: TestEventType, " +
+              "Event record counter: 5000");
+      Thread.sleep(2000);
+      //Make sure more than one event to take
+      verify(log, atLeastOnce()).
+              info("Latest dispatch event type: TestEventType");
+      dispatcher.stop();
+    } finally {
+      //... restore logger object
+      logger.set(null, oldLog);
+    }
+  }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org