You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/13 23:31:40 UTC
hadoop git commit: Revert "YARN-3878. AsyncDispatcher can hang while
stopping if it is configured for draining events on stop. (Varun Saxena via
kasha)"
Repository: hadoop
Updated Branches:
refs/heads/trunk 19295b36d -> 2466460d4
Revert "YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)"
This reverts commit aa067c6aa47b4c79577096817acc00ad6421180c.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2466460d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2466460d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2466460d
Branch: refs/heads/trunk
Commit: 2466460d4cd13ad5837c044476b26e63082c1d37
Parents: 19295b3
Author: Jian He <ji...@apache.org>
Authored: Mon Jul 13 14:30:35 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Mon Jul 13 14:30:35 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 -
.../hadoop/yarn/event/AsyncDispatcher.java | 24 ++++----
.../hadoop/yarn/event/DrainDispatcher.java | 13 +----
.../hadoop/yarn/event/TestAsyncDispatcher.java | 61 --------------------
4 files changed, 14 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2466460d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 98f0e8d..5c17f04 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -648,9 +648,6 @@ Release 2.7.2 - UNRELEASED
YARN-3690. [JDK8] 'mvn site' fails. (Brahma Reddy Battula via aajisaka)
- YARN-3878. AsyncDispatcher can hang while stopping if it is configured for
- draining events on stop. (Varun Saxena via kasha)
-
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2466460d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 646611f..c54b9c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -55,6 +55,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
// stop functionality.
private volatile boolean drainEventsOnStop = false;
+ // Indicates all the remaining dispatcher's events on stop have been drained
+ // and processed.
+ private volatile boolean drained = true;
private Object waitForDrained = new Object();
// For drainEventsOnStop enabled only, block newly coming events into the
@@ -81,12 +84,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
+ drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {
synchronized (waitForDrained) {
- if (eventQueue.isEmpty()) {
+ if (drained) {
waitForDrained.notify();
}
}
@@ -135,7 +139,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
synchronized (waitForDrained) {
- while (!eventQueue.isEmpty() && eventHandlingThread.isAlive()) {
+ while (!drained && eventHandlingThread.isAlive()) {
waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState());
@@ -219,21 +223,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
return handlerInstance;
}
- @VisibleForTesting
- protected boolean hasPendingEvents() {
- return !eventQueue.isEmpty();
- }
-
- @VisibleForTesting
- protected boolean isEventThreadWaiting() {
- return eventHandlingThread.getState() == Thread.State.WAITING;
- }
-
class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) {
if (blockNewEvents) {
return;
}
+ drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
@@ -290,4 +285,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
};
}
+
+ @VisibleForTesting
+ protected boolean isDrained() {
+ return this.drained;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2466460d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index d1f4fe9..da5ae44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -27,24 +27,15 @@ public class DrainDispatcher extends AsyncDispatcher {
this(new LinkedBlockingQueue<Event>());
}
- public DrainDispatcher(BlockingQueue<Event> eventQueue) {
+ private DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(eventQueue);
}
/**
- * Wait till event thread enters WAITING state (i.e. waiting for new events).
- */
- public void waitForEventThreadToWait() {
- while (!isEventThreadWaiting()) {
- Thread.yield();
- }
- }
-
- /**
* Busy loop waiting for all queued events to drain.
*/
public void await() {
- while (hasPendingEvents()) {
+ while (!isDrained()) {
Thread.yield();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2466460d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
deleted file mode 100644
index ee17ddd..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.event;
-
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestAsyncDispatcher {
-
- /* This test checks whether dispatcher hangs on close if following two things
- * happen :
- * 1. A thread which was putting event to event queue is interrupted.
- * 2. Event queue is empty on close.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Test(timeout=10000)
- public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
- BlockingQueue<Event> eventQueue = spy(new LinkedBlockingQueue<Event>());
- Event event = mock(Event.class);
- doThrow(new InterruptedException()).when(eventQueue).put(event);
- DrainDispatcher disp = new DrainDispatcher(eventQueue);
- disp.init(new Configuration());
- disp.setDrainEventsOnStop();
- disp.start();
- // Wait for event handler thread to start and begin waiting for events.
- disp.waitForEventThreadToWait();
- try {
- disp.getEventHandler().handle(event);
- } catch (YarnRuntimeException e) {
- }
- // Queue should be empty and dispatcher should not hang on close
- Assert.assertTrue("Event Queue should have been empty",
- eventQueue.isEmpty());
- disp.close();
- }
-}