You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/01/02 20:09:47 UTC

hadoop git commit: YARN-2991. Fixed DrainDispatcher to reuse the draining code path in AsyncDispatcher. Contributed by Rohith Sharmaks.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0c45946e6 -> 947578c1c


YARN-2991. Fixed DrainDispatcher to reuse the draining code path in AsyncDispatcher. Contributed by Rohith Sharmaks.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/947578c1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/947578c1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/947578c1

Branch: refs/heads/trunk
Commit: 947578c1c1413f9043ceb1e87df6a97df048e854
Parents: 0c45946
Author: Zhijie Shen <zj...@apache.org>
Authored: Fri Jan 2 11:08:47 2015 -0800
Committer: Zhijie Shen <zj...@apache.org>
Committed: Fri Jan 2 11:08:47 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../hadoop/yarn/event/AsyncDispatcher.java      |  7 +++
 .../hadoop/yarn/event/DrainDispatcher.java      | 50 +-------------------
 3 files changed, 11 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/947578c1/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 01df44f..e6694f1 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -305,6 +305,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2987. Fixed ClientRMService#getQueueInfo to check against queue and
     app ACLs. (Varun Saxena via jianhe)
 
+    YARN-2991. Fixed DrainDispatcher to reuse the draining code path in
+    AsyncDispatcher. (Rohith Sharmaks via zjshen)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/947578c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 28be6ac..d36d841 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Dispatches {@link Event}s in a separate thread. Currently only single thread
  * does that. Potentially there could be multiple channels for each event type
@@ -282,4 +284,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       }
     };
   }
+
+  @VisibleForTesting
+  protected boolean isDrained() {
+    return this.drained;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/947578c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index 803b2bb..da5ae44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -23,68 +23,20 @@ import java.util.concurrent.LinkedBlockingQueue;
 @SuppressWarnings("rawtypes")
 public class DrainDispatcher extends AsyncDispatcher {
 
-// flagrant initialize abuse throughout, but safe per
-// http://java.sun.com/docs/books/jls/third_edition/html/typesValues.html#96595
-// and similar grotesqueries
-  private volatile boolean drained = false;
-  private final BlockingQueue<Event> queue;
-  final Object mutex;
-
   public DrainDispatcher() {
     this(new LinkedBlockingQueue<Event>());
   }
 
   private DrainDispatcher(BlockingQueue<Event> eventQueue) {
     super(eventQueue);
-    this.queue = eventQueue;
-    this.mutex = this;
   }
 
   /**
    * Busy loop waiting for all queued events to drain.
    */
   public void await() {
-    while (!drained) {
+    while (!isDrained()) {
       Thread.yield();
     }
   }
-
-  @Override
-  Runnable createThread() {
-    return new Runnable() {
-      @Override
-      public void run() {
-        while (!Thread.currentThread().isInterrupted()) {
-          synchronized (mutex) {
-            // !drained if dispatch queued new events on this dispatcher
-            drained = queue.isEmpty();
-          }
-          Event event;
-          try {
-            event = queue.take();
-          } catch(InterruptedException ie) {
-            return;
-          }
-          if (event != null) {
-            dispatch(event);
-          }
-        }
-      }
-    };
-  }
-
-  @Override
-  public EventHandler getEventHandler() {
-    final EventHandler actual = super.getEventHandler();
-    return new EventHandler() {
-      @Override
-      public void handle(Event event) {
-        synchronized (mutex) {
-          actual.handle(event);
-          drained = false;
-        }
-      }
-    };
-  }
-
 }