You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by pb...@apache.org on 2021/05/11 13:18:44 UTC
[hadoop] branch branch-3.1 updated: YARN-10642. Race condition:
AsyncDispatcher can get stuck by the changes introduced in YARN-8995.
Contributed by zhengchenyu.
This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 4cd1596 YARN-10642. Race condition: AsyncDispatcher can get stuck by the changes introduced in YARN-8995. Contributed by zhengchenyu.
4cd1596 is described below
commit 4cd1596fe7c8d80d7124a517ea6cdc3b3df90292
Author: Peter Bacsko <pb...@cloudera.com>
AuthorDate: Tue May 11 15:18:16 2021 +0200
YARN-10642. Race condition: AsyncDispatcher can get stuck by the changes introduced in YARN-8995. Contributed by zhengchenyu.
---
.../apache/hadoop/yarn/event/AsyncDispatcher.java | 19 ++++---
.../hadoop/yarn/event/TestAsyncDispatcher.java | 58 +++++++++++++++++++++-
2 files changed, 68 insertions(+), 9 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 81448c5..acfba27 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -20,11 +20,11 @@ package org.apache.hadoop.yarn.event;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -265,11 +265,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
class GenericEventHandler implements EventHandler<Event> {
- private void printEventQueueDetails(BlockingQueue<Event> queue) {
- Map<Enum, Long> counterMap = eventQueue.stream().
- collect(Collectors.
- groupingBy(e -> e.getType(), Collectors.counting())
- );
+ private void printEventQueueDetails() {
+ Iterator<Event> iterator = eventQueue.iterator();
+ Map<Enum, Long> counterMap = new HashMap<>();
+ while (iterator.hasNext()) {
+ Enum eventType = iterator.next().getType();
+ if (!counterMap.containsKey(eventType)) {
+ counterMap.put(eventType, 0L);
+ }
+ counterMap.put(eventType, counterMap.get(eventType) + 1);
+ }
for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
long num = entry.getValue();
LOG.info("Event type: " + entry.getKey()
@@ -292,7 +297,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
if (qSize != 0 && qSize % detailsInterval == 0
&& lastEventDetailsQueueSizeLogged != qSize) {
lastEventDetailsQueueSizeLogged = qSize;
- printEventQueueDetails(eventQueue);
+ printEventQueueDetails();
printTrigger = true;
}
int remCapacity = eventQueue.remainingCapacity();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
index 53e2d05..88855fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
@@ -97,12 +97,23 @@ public class TestAsyncDispatcher {
}
private static class TestHandler implements EventHandler<Event> {
+
+ private long sleepTime = 1500;
+
+ TestHandler() {
+ }
+
+ TestHandler(long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
@Override
public void handle(Event event) {
try {
// As long as 10000 events queued
- Thread.sleep(1500);
- } catch (InterruptedException e) {}
+ Thread.sleep(this.sleepTime);
+ } catch (InterruptedException e) {
+ }
}
}
@@ -170,11 +181,54 @@ public class TestAsyncDispatcher {
//Make sure more than one event to take
verify(log, atLeastOnce()).
info("Latest dispatch event type: TestEventType");
+ } finally {
+ //... restore logger object
+ logger.set(null, oldLog);
dispatcher.stop();
+ }
+ }
+
+ //Test print dispatcher details when the blocking queue is heavy
+ @Test(timeout = 60000)
+ public void testPrintDispatcherEventDetailsAvoidDeadLoop() throws Exception {
+ for (int i = 0; i < 5; i++) {
+ testPrintDispatcherEventDetailsAvoidDeadLoopInternal();
+ }
+ }
+
+ public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal()
+ throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.
+ YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 10);
+ Log log = mock(Log.class);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+
+ Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
+ logger.setAccessible(true);
+ Field modifiers = Field.class.getDeclaredField("modifiers");
+ modifiers.setAccessible(true);
+ modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
+ Object oldLog = logger.get(null);
+
+ try {
+ logger.set(null, log);
+ dispatcher.register(TestEnum.class, new TestHandler(0));
+ dispatcher.start();
+
+ for (int i = 0; i < 10000; ++i) {
+ Event event = mock(Event.class);
+ when(event.getType()).thenReturn(TestEnum.TestEventType);
+ dispatcher.getEventHandler().handle(event);
+ }
+ Thread.sleep(3000);
} finally {
//... restore logger object
logger.set(null, oldLog);
+ dispatcher.stop();
}
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org