You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/07/13 20:59:43 UTC
[31/48] hadoop git commit: YARN-3878. AsyncDispatcher can hang while
stopping if it is configured for draining events on stop. (Varun Saxena via
kasha)
YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/73508ad1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/73508ad1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/73508ad1
Branch: refs/heads/YARN-2928
Commit: 73508ad1c3704384cfbbfbbc51a99e7c333e19ec
Parents: 48d290c
Author: Karthik Kambatla <ka...@apache.org>
Authored: Thu Jul 9 09:48:29 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Jul 13 11:51:14 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/event/AsyncDispatcher.java | 24 ++++----
.../hadoop/yarn/event/DrainDispatcher.java | 13 ++++-
.../hadoop/yarn/event/TestAsyncDispatcher.java | 61 ++++++++++++++++++++
4 files changed, 87 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/73508ad1/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b43b51f..f1114c5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -736,6 +736,9 @@ Release 2.7.2 - UNRELEASED
YARN-3690. [JDK8] 'mvn site' fails. (Brahma Reddy Battula via aajisaka)
+ YARN-3878. AsyncDispatcher can hang while stopping if it is configured for
+ draining events on stop. (Varun Saxena via kasha)
+
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/73508ad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index c54b9c7..646611f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -55,9 +55,6 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
// stop functionality.
private volatile boolean drainEventsOnStop = false;
- // Indicates all the remaining dispatcher's events on stop have been drained
- // and processed.
- private volatile boolean drained = true;
private Object waitForDrained = new Object();
// For drainEventsOnStop enabled only, block newly coming events into the
@@ -84,13 +81,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
- drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {
synchronized (waitForDrained) {
- if (drained) {
+ if (eventQueue.isEmpty()) {
waitForDrained.notify();
}
}
@@ -139,7 +135,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
synchronized (waitForDrained) {
- while (!drained && eventHandlingThread.isAlive()) {
+ while (!eventQueue.isEmpty() && eventHandlingThread.isAlive()) {
waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState());
@@ -223,12 +219,21 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
return handlerInstance;
}
+ @VisibleForTesting
+ protected boolean hasPendingEvents() {
+ return !eventQueue.isEmpty();
+ }
+
+ @VisibleForTesting
+ protected boolean isEventThreadWaiting() {
+ return eventHandlingThread.getState() == Thread.State.WAITING;
+ }
+
class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) {
if (blockNewEvents) {
return;
}
- drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
@@ -285,9 +290,4 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
};
}
-
- @VisibleForTesting
- protected boolean isDrained() {
- return this.drained;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/73508ad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index da5ae44..d1f4fe9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -27,15 +27,24 @@ public class DrainDispatcher extends AsyncDispatcher {
this(new LinkedBlockingQueue<Event>());
}
- private DrainDispatcher(BlockingQueue<Event> eventQueue) {
+ public DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(eventQueue);
}
/**
+ * Wait till event thread enters WAITING state (i.e. waiting for new events).
+ */
+ public void waitForEventThreadToWait() {
+ while (!isEventThreadWaiting()) {
+ Thread.yield();
+ }
+ }
+
+ /**
* Busy loop waiting for all queued events to drain.
*/
public void await() {
- while (!isDrained()) {
+ while (hasPendingEvents()) {
Thread.yield();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/73508ad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
new file mode 100644
index 0000000..ee17ddd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.event;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAsyncDispatcher {
+
+ /* This test checks whether dispatcher hangs on close if following two things
+ * happen :
+ * 1. A thread which was putting event to event queue is interrupted.
+ * 2. Event queue is empty on close.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test(timeout=10000)
+ public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
+ BlockingQueue<Event> eventQueue = spy(new LinkedBlockingQueue<Event>());
+ Event event = mock(Event.class);
+ doThrow(new InterruptedException()).when(eventQueue).put(event);
+ DrainDispatcher disp = new DrainDispatcher(eventQueue);
+ disp.init(new Configuration());
+ disp.setDrainEventsOnStop();
+ disp.start();
+ // Wait for event handler thread to start and begin waiting for events.
+ disp.waitForEventThreadToWait();
+ try {
+ disp.getEventHandler().handle(event);
+ } catch (YarnRuntimeException e) {
+ }
+ // Queue should be empty and dispatcher should not hang on close
+ Assert.assertTrue("Event Queue should have been empty",
+ eventQueue.isEmpty());
+ disp.close();
+ }
+}