You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2019/09/05 08:55:56 UTC
[hadoop] branch branch-3.1 updated: YARN-8995. Log events info in
AsyncDispatcher when event queue size cumulatively reaches a certain number
every time. Contributed by zhuqi.
This is an automated email from the ASF dual-hosted git repository.
taoyang pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d74c069 YARN-8995. Log events info in AsyncDispatcher when event queue size cumulatively reaches a certain number every time. Contributed by zhuqi.
d74c069 is described below
commit d74c0694272dbe87675970497ece5983d48cf3bb
Author: Tao Yang <ta...@apache.org>
AuthorDate: Thu Sep 5 16:20:05 2019 +0800
YARN-8995. Log events info in AsyncDispatcher when event queue size cumulatively reaches a certain number every time. Contributed by zhuqi.
---
.../apache/hadoop/yarn/conf/YarnConfiguration.java | 14 ++++++
.../apache/hadoop/yarn/event/AsyncDispatcher.java | 39 +++++++++++++++
.../src/main/resources/yarn-default.xml | 13 +++++
.../hadoop/yarn/event/TestAsyncDispatcher.java | 57 ++++++++++++++++++++++
4 files changed, 123 insertions(+)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index abf6fcf..124169c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2279,6 +2279,20 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000;
/**
+ * The threshold used to trigger the logging of event types and counts
+ * in RM's main event dispatcher. Default value is 5000,
+ * which means RM will print events info when the queue size cumulatively
+ * reaches 5000 every time. Such info can be used to reveal what
+ * kind of events that RM is stuck at processing mostly,
+ * it can help to narrow down certain performance issues.
+ */
+ public static final String
+ YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD =
+ YARN_PREFIX + "dispatcher.print-events-info.threshold";
+ public static final int
+ DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD = 5000;
+
+ /**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 4a78a22..81448c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -24,11 +24,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -50,8 +52,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
private final BlockingQueue<Event> eventQueue;
private volatile int lastEventQueueSizeLogged = 0;
+ private volatile int lastEventDetailsQueueSizeLogged = 0;
private volatile boolean stopped = false;
+ //Configuration for control the details queue event printing.
+ private int detailsInterval;
+ private boolean printTrigger = false;
+
// Configuration flag for enabling/disabling draining dispatcher's events on
// stop functionality.
private volatile boolean drainEventsOnStop = false;
@@ -124,6 +131,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
if (event != null) {
dispatch(event);
+ if (printTrigger) {
+ //Log the latest dispatch event type
+ // may cause the too many events queued
+ LOG.info("Latest dispatch event type: " + event.getType());
+ printTrigger = false;
+ }
}
}
}
@@ -136,6 +149,15 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
@Override
+ protected void serviceInit(Configuration conf) throws Exception{
+ super.serviceInit(conf);
+ this.detailsInterval = getConfig().getInt(YarnConfiguration.
+ YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD,
+ YarnConfiguration.
+ DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD);
+ }
+
+ @Override
protected void serviceStart() throws Exception {
//start all the components
super.serviceStart();
@@ -243,6 +265,17 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
class GenericEventHandler implements EventHandler<Event> {
+ private void printEventQueueDetails(BlockingQueue<Event> queue) {
+ Map<Enum, Long> counterMap = eventQueue.stream().
+ collect(Collectors.
+ groupingBy(e -> e.getType(), Collectors.counting())
+ );
+ for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
+ long num = entry.getValue();
+ LOG.info("Event type: " + entry.getKey()
+ + ", Event record counter: " + num);
+ }
+ }
public void handle(Event event) {
if (blockNewEvents) {
return;
@@ -256,6 +289,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of event-queue is " + qSize);
}
+ if (qSize != 0 && qSize % detailsInterval == 0
+ && lastEventDetailsQueueSizeLogged != qSize) {
+ lastEventDetailsQueueSizeLogged = qSize;
+ printEventQueueDetails(eventQueue);
+ printTrigger = true;
+ }
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index dfb6cbe..748329c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -108,6 +108,19 @@
</property>
<property>
+ <description>
+ The threshold used to trigger the logging of event types
+ and counts in RM's main event dispatcher. Default length is 5000,
+ which means RM will print events info when the queue size cumulatively
+ reaches 5000 every time. Such info can be used to reveal what kind of events
+ that RM is stuck at processing mostly, it can help to
+ narrow down certain performance issues.
+ </description>
+ <name>yarn.dispatcher.print-events-info.threshold</name>
+ <value>5000</value>
+ </property>
+
+ <property>
<description>The expiry interval for application master reporting.</description>
<name>yarn.am.liveness-monitor.expiry-interval-ms</name>
<value>600000</value>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
index 2b9d745..762e228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.yarn.event;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import org.slf4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -93,6 +96,20 @@ public class TestAsyncDispatcher {
DUMMY
}
+ private static class TestHandler implements EventHandler<Event> {
+ @Override
+ public void handle(Event event) {
+ try {
+ // As long as 10000 events queued
+ Thread.sleep(1500);
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ private enum TestEnum {
+ TestEventType
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
private void dispatchDummyEvents(Dispatcher disp, int count) {
for (int i = 0; i < count; i++) {
@@ -119,5 +136,45 @@ public class TestAsyncDispatcher {
disp.close();
assertEquals(0, queue.size());
}
+
+ //Test print dispatcher details when the blocking queue is heavy
+ @Test(timeout = 10000)
+ public void testPrintDispatcherEventDetails() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.
+ YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 5000);
+ Logger log = mock(Logger.class);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+
+ Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
+ logger.setAccessible(true);
+ Field modifiers = Field.class.getDeclaredField("modifiers");
+ modifiers.setAccessible(true);
+ modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
+ Object oldLog = logger.get(null);
+
+ try {
+ logger.set(null, log);
+ dispatcher.register(TestEnum.class, new TestHandler());
+ dispatcher.start();
+
+ for (int i = 0; i < 10000; ++i) {
+ Event event = mock(Event.class);
+ when(event.getType()).thenReturn(TestEnum.TestEventType);
+ dispatcher.getEventHandler().handle(event);
+ }
+ verify(log, atLeastOnce()).info("Event type: TestEventType, " +
+ "Event record counter: 5000");
+ Thread.sleep(2000);
+ //Make sure more than one event to take
+ verify(log, atLeastOnce()).
+ info("Latest dispatch event type: TestEventType");
+ dispatcher.stop();
+ } finally {
+ //... restore logger object
+ logger.set(null, oldLog);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org