You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2015/07/09 18:48:38 UTC

hadoop git commit: YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 527c40e4d -> aa067c6aa


YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa067c6a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa067c6a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa067c6a

Branch: refs/heads/trunk
Commit: aa067c6aa47b4c79577096817acc00ad6421180c
Parents: 527c40e
Author: Karthik Kambatla <ka...@apache.org>
Authored: Thu Jul 9 09:48:29 2015 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Thu Jul 9 09:48:29 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../hadoop/yarn/event/AsyncDispatcher.java      | 24 ++++----
 .../hadoop/yarn/event/DrainDispatcher.java      | 13 ++++-
 .../hadoop/yarn/event/TestAsyncDispatcher.java  | 61 ++++++++++++++++++++
 4 files changed, 87 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa067c6a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 19f0854..3c232eb 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -624,6 +624,9 @@ Release 2.7.2 - UNRELEASED
 
     YARN-3690. [JDK8] 'mvn site' fails. (Brahma Reddy Battula via aajisaka)
 
+    YARN-3878. AsyncDispatcher can hang while stopping if it is configured for 
+    draining events on stop. (Varun Saxena via kasha)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa067c6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index c54b9c7..646611f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -55,9 +55,6 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
   // stop functionality.
   private volatile boolean drainEventsOnStop = false;
 
-  // Indicates all the remaining dispatcher's events on stop have been drained
-  // and processed.
-  private volatile boolean drained = true;
   private Object waitForDrained = new Object();
 
   // For drainEventsOnStop enabled only, block newly coming events into the
@@ -84,13 +81,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       @Override
       public void run() {
         while (!stopped && !Thread.currentThread().isInterrupted()) {
-          drained = eventQueue.isEmpty();
           // blockNewEvents is only set when dispatcher is draining to stop,
           // adding this check is to avoid the overhead of acquiring the lock
           // and calling notify every time in the normal run of the loop.
           if (blockNewEvents) {
             synchronized (waitForDrained) {
-              if (drained) {
+              if (eventQueue.isEmpty()) {
                 waitForDrained.notify();
               }
             }
@@ -139,7 +135,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       blockNewEvents = true;
       LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
       synchronized (waitForDrained) {
-        while (!drained && eventHandlingThread.isAlive()) {
+        while (!eventQueue.isEmpty() && eventHandlingThread.isAlive()) {
           waitForDrained.wait(1000);
           LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
               eventHandlingThread.getState());
@@ -223,12 +219,21 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
     return handlerInstance;
   }
 
+  @VisibleForTesting
+  protected boolean hasPendingEvents() {
+    return !eventQueue.isEmpty();
+  }
+
+  @VisibleForTesting
+  protected boolean isEventThreadWaiting() {
+    return eventHandlingThread.getState() == Thread.State.WAITING;
+  }
+
   class GenericEventHandler implements EventHandler<Event> {
     public void handle(Event event) {
       if (blockNewEvents) {
         return;
       }
-      drained = false;
 
       /* all this method does is enqueue all the events onto the queue */
       int qSize = eventQueue.size();
@@ -285,9 +290,4 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       }
     };
   }
-
-  @VisibleForTesting
-  protected boolean isDrained() {
-    return this.drained;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa067c6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index da5ae44..d1f4fe9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -27,15 +27,24 @@ public class DrainDispatcher extends AsyncDispatcher {
     this(new LinkedBlockingQueue<Event>());
   }
 
-  private DrainDispatcher(BlockingQueue<Event> eventQueue) {
+  public DrainDispatcher(BlockingQueue<Event> eventQueue) {
     super(eventQueue);
   }
 
   /**
+   *  Wait till event thread enters WAITING state (i.e. waiting for new events).
+   */
+  public void waitForEventThreadToWait() {
+    while (!isEventThreadWaiting()) {
+      Thread.yield();
+    }
+  }
+
+  /**
    * Busy loop waiting for all queued events to drain.
    */
   public void await() {
-    while (!isDrained()) {
+    while (hasPendingEvents()) {
       Thread.yield();
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa067c6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
new file mode 100644
index 0000000..ee17ddd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.event;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAsyncDispatcher {
+
+  /* This test checks whether dispatcher hangs on close if following two things
+   * happen :
+   * 1. A thread which was putting event to event queue is interrupted.
+   * 2. Event queue is empty on close.
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test(timeout=10000)
+  public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
+    BlockingQueue<Event> eventQueue = spy(new LinkedBlockingQueue<Event>());
+    Event event = mock(Event.class);
+    doThrow(new InterruptedException()).when(eventQueue).put(event);
+    DrainDispatcher disp = new DrainDispatcher(eventQueue);
+    disp.init(new Configuration());
+    disp.setDrainEventsOnStop();
+    disp.start();
+    // Wait for event handler thread to start and begin waiting for events.
+    disp.waitForEventThreadToWait();
+    try {
+      disp.getEventHandler().handle(event);
+    } catch (YarnRuntimeException e) {
+    }
+    // Queue should be empty and dispatcher should not hang on close
+    Assert.assertTrue("Event Queue should have been empty",
+        eventQueue.isEmpty());
+    disp.close();
+  }
+}