You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/01/02 20:09:47 UTC
hadoop git commit: YARN-2991. Fixed DrainDispatcher to reuse the
draining code path in AsyncDispatcher. Contributed by Rohith Sharmaks.
Repository: hadoop
Updated Branches:
refs/heads/trunk 0c45946e6 -> 947578c1c
YARN-2991. Fixed DrainDispatcher to reuse the draining code path in AsyncDispatcher. Contributed by Rohith Sharmaks.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/947578c1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/947578c1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/947578c1
Branch: refs/heads/trunk
Commit: 947578c1c1413f9043ceb1e87df6a97df048e854
Parents: 0c45946
Author: Zhijie Shen <zj...@apache.org>
Authored: Fri Jan 2 11:08:47 2015 -0800
Committer: Zhijie Shen <zj...@apache.org>
Committed: Fri Jan 2 11:08:47 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 ++
.../hadoop/yarn/event/AsyncDispatcher.java | 7 +++
.../hadoop/yarn/event/DrainDispatcher.java | 50 +-------------------
3 files changed, 11 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/947578c1/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 01df44f..e6694f1 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -305,6 +305,9 @@ Release 2.7.0 - UNRELEASED
YARN-2987. Fixed ClientRMService#getQueueInfo to check against queue and
app ACLs. (Varun Saxena via jianhe)
+ YARN-2991. Fixed DrainDispatcher to reuse the draining code path in
+ AsyncDispatcher. (Rohith Sharmaks via zjshen)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/947578c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 28be6ac..d36d841 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Dispatches {@link Event}s in a separate thread. Currently only single thread
* does that. Potentially there could be multiple channels for each event type
@@ -282,4 +284,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
};
}
+
+ @VisibleForTesting
+ protected boolean isDrained() {
+ return this.drained;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/947578c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index 803b2bb..da5ae44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -23,68 +23,20 @@ import java.util.concurrent.LinkedBlockingQueue;
@SuppressWarnings("rawtypes")
public class DrainDispatcher extends AsyncDispatcher {
-// flagrant initialize abuse throughout, but safe per
-// http://java.sun.com/docs/books/jls/third_edition/html/typesValues.html#96595
-// and similar grotesqueries
- private volatile boolean drained = false;
- private final BlockingQueue<Event> queue;
- final Object mutex;
-
public DrainDispatcher() {
this(new LinkedBlockingQueue<Event>());
}
private DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(eventQueue);
- this.queue = eventQueue;
- this.mutex = this;
}
/**
* Busy loop waiting for all queued events to drain.
*/
public void await() {
- while (!drained) {
+ while (!isDrained()) {
Thread.yield();
}
}
-
- @Override
- Runnable createThread() {
- return new Runnable() {
- @Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- synchronized (mutex) {
- // !drained if dispatch queued new events on this dispatcher
- drained = queue.isEmpty();
- }
- Event event;
- try {
- event = queue.take();
- } catch(InterruptedException ie) {
- return;
- }
- if (event != null) {
- dispatch(event);
- }
- }
- }
- };
- }
-
- @Override
- public EventHandler getEventHandler() {
- final EventHandler actual = super.getEventHandler();
- return new EventHandler() {
- @Override
- public void handle(Event event) {
- synchronized (mutex) {
- actual.handle(event);
- drained = false;
- }
- }
- };
- }
-
}