You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/07/14 22:25:56 UTC

[40/50] hadoop git commit: Revert "YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)"

Revert "YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)"

This reverts commit aa067c6aa47b4c79577096817acc00ad6421180c.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2466460d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2466460d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2466460d

Branch: refs/heads/HDFS-7240
Commit: 2466460d4cd13ad5837c044476b26e63082c1d37
Parents: 19295b3
Author: Jian He <ji...@apache.org>
Authored: Mon Jul 13 14:30:35 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Mon Jul 13 14:30:35 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 -
 .../hadoop/yarn/event/AsyncDispatcher.java      | 24 ++++----
 .../hadoop/yarn/event/DrainDispatcher.java      | 13 +----
 .../hadoop/yarn/event/TestAsyncDispatcher.java  | 61 --------------------
 4 files changed, 14 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2466460d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 98f0e8d..5c17f04 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -648,9 +648,6 @@ Release 2.7.2 - UNRELEASED
 
     YARN-3690. [JDK8] 'mvn site' fails. (Brahma Reddy Battula via aajisaka)
 
-    YARN-3878. AsyncDispatcher can hang while stopping if it is configured for 
-    draining events on stop. (Varun Saxena via kasha)
-
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2466460d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 646611f..c54b9c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -55,6 +55,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
   // stop functionality.
   private volatile boolean drainEventsOnStop = false;
 
+  // Indicates all the remaining dispatcher's events on stop have been drained
+  // and processed.
+  private volatile boolean drained = true;
   private Object waitForDrained = new Object();
 
   // For drainEventsOnStop enabled only, block newly coming events into the
@@ -81,12 +84,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       @Override
       public void run() {
         while (!stopped && !Thread.currentThread().isInterrupted()) {
+          drained = eventQueue.isEmpty();
           // blockNewEvents is only set when dispatcher is draining to stop,
           // adding this check is to avoid the overhead of acquiring the lock
           // and calling notify every time in the normal run of the loop.
           if (blockNewEvents) {
             synchronized (waitForDrained) {
-              if (eventQueue.isEmpty()) {
+              if (drained) {
                 waitForDrained.notify();
               }
             }
@@ -135,7 +139,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       blockNewEvents = true;
       LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
       synchronized (waitForDrained) {
-        while (!eventQueue.isEmpty() && eventHandlingThread.isAlive()) {
+        while (!drained && eventHandlingThread.isAlive()) {
           waitForDrained.wait(1000);
           LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
               eventHandlingThread.getState());
@@ -219,21 +223,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
     return handlerInstance;
   }
 
-  @VisibleForTesting
-  protected boolean hasPendingEvents() {
-    return !eventQueue.isEmpty();
-  }
-
-  @VisibleForTesting
-  protected boolean isEventThreadWaiting() {
-    return eventHandlingThread.getState() == Thread.State.WAITING;
-  }
-
   class GenericEventHandler implements EventHandler<Event> {
     public void handle(Event event) {
       if (blockNewEvents) {
         return;
       }
+      drained = false;
 
       /* all this method does is enqueue all the events onto the queue */
       int qSize = eventQueue.size();
@@ -290,4 +285,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       }
     };
   }
+
+  @VisibleForTesting
+  protected boolean isDrained() {
+    return this.drained;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2466460d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index d1f4fe9..da5ae44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -27,24 +27,15 @@ public class DrainDispatcher extends AsyncDispatcher {
     this(new LinkedBlockingQueue<Event>());
   }
 
-  public DrainDispatcher(BlockingQueue<Event> eventQueue) {
+  private DrainDispatcher(BlockingQueue<Event> eventQueue) {
     super(eventQueue);
   }
 
   /**
-   *  Wait till event thread enters WAITING state (i.e. waiting for new events).
-   */
-  public void waitForEventThreadToWait() {
-    while (!isEventThreadWaiting()) {
-      Thread.yield();
-    }
-  }
-
-  /**
    * Busy loop waiting for all queued events to drain.
    */
   public void await() {
-    while (hasPendingEvents()) {
+    while (!isDrained()) {
       Thread.yield();
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2466460d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
deleted file mode 100644
index ee17ddd..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.event;
-
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestAsyncDispatcher {
-
-  /* This test checks whether dispatcher hangs on close if following two things
-   * happen :
-   * 1. A thread which was putting event to event queue is interrupted.
-   * 2. Event queue is empty on close.
-   */
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  @Test(timeout=10000)
-  public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
-    BlockingQueue<Event> eventQueue = spy(new LinkedBlockingQueue<Event>());
-    Event event = mock(Event.class);
-    doThrow(new InterruptedException()).when(eventQueue).put(event);
-    DrainDispatcher disp = new DrainDispatcher(eventQueue);
-    disp.init(new Configuration());
-    disp.setDrainEventsOnStop();
-    disp.start();
-    // Wait for event handler thread to start and begin waiting for events.
-    disp.waitForEventThreadToWait();
-    try {
-      disp.getEventHandler().handle(event);
-    } catch (YarnRuntimeException e) {
-    }
-    // Queue should be empty and dispatcher should not hang on close
-    Assert.assertTrue("Event Queue should have been empty",
-        eventQueue.isEmpty());
-    disp.close();
-  }
-}