You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/07/25 19:36:11 UTC
[04/29] hadoop git commit: HADOOP-12189. Improve
CallQueueManager#swapQueue to make queue elements drop nearly impossible.
Contributed by Zhihai Xu.
HADOOP-12189. Improve CallQueueManager#swapQueue to make queue elements drop nearly impossible. Contributed by Zhihai Xu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6736a1ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6736a1ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6736a1ab
Branch: refs/heads/HADOOP-12111
Commit: 6736a1ab7033523ed5f304fdfed46d7f348665b4
Parents: 813cf89
Author: Andrew Wang <wa...@apache.org>
Authored: Thu Jul 23 14:42:35 2015 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Thu Jul 23 14:42:35 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++
.../org/apache/hadoop/ipc/CallQueueManager.java | 27 +++++++++++++-------
.../apache/hadoop/ipc/TestCallQueueManager.java | 6 ++---
3 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6736a1ab/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index f1a3bc9..6c18add 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -719,6 +719,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12161. Add getStoragePolicy API to the FileSystem interface.
(Brahma Reddy Battula via Arpit Agarwal)
+ HADOOP-12189. Improve CallQueueManager#swapQueue to make queue elements
+ drop nearly impossible. (Zhihai Xu via wang)
+
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6736a1ab/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 1568bd6..c10f839 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -32,11 +32,15 @@ import org.apache.hadoop.conf.Configuration;
*/
public class CallQueueManager<E> {
public static final Log LOG = LogFactory.getLog(CallQueueManager.class);
+ // Number of checkpoints for empty queue.
+ private static final int CHECKPOINT_NUM = 20;
+ // Interval to check empty queue.
+ private static final long CHECKPOINT_INTERVAL_MS = 10;
@SuppressWarnings("unchecked")
static <E> Class<? extends BlockingQueue<E>> convertQueueClass(
- Class<?> queneClass, Class<E> elementClass) {
- return (Class<? extends BlockingQueue<E>>)queneClass;
+ Class<?> queueClass, Class<E> elementClass) {
+ return (Class<? extends BlockingQueue<E>>)queueClass;
}
private final boolean clientBackOffEnabled;
@@ -159,18 +163,23 @@ public class CallQueueManager<E> {
}
/**
- * Checks if queue is empty by checking at two points in time.
+ * Checks if queue is empty by checking at CHECKPOINT_NUM points with
+ * CHECKPOINT_INTERVAL_MS interval.
* This doesn't mean the queue might not fill up at some point later, but
* it should decrease the probability that we lose a call this way.
*/
private boolean queueIsReallyEmpty(BlockingQueue<?> q) {
- boolean wasEmpty = q.isEmpty();
- try {
- Thread.sleep(10);
- } catch (InterruptedException ie) {
- return false;
+ for (int i = 0; i < CHECKPOINT_NUM; i++) {
+ try {
+ Thread.sleep(CHECKPOINT_INTERVAL_MS);
+ } catch (InterruptedException ie) {
+ return false;
+ }
+ if (!q.isEmpty()) {
+ return false;
+ }
}
- return q.isEmpty() && wasEmpty;
+ return true;
}
private String stringRepr(Object o) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6736a1ab/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
index 6e1838e..51a9750 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
@@ -165,7 +165,7 @@ public class TestCallQueueManager {
HashMap<Runnable, Thread> threads = new HashMap<Runnable, Thread>();
// Create putters and takers
- for (int i=0; i < 50; i++) {
+ for (int i=0; i < 1000; i++) {
Putter p = new Putter(manager, -1, -1);
Thread pt = new Thread(p);
producers.add(p);
@@ -174,7 +174,7 @@ public class TestCallQueueManager {
pt.start();
}
- for (int i=0; i < 20; i++) {
+ for (int i=0; i < 100; i++) {
Taker t = new Taker(manager, -1, -1);
Thread tt = new Thread(t);
consumers.add(t);
@@ -183,7 +183,7 @@ public class TestCallQueueManager {
tt.start();
}
- Thread.sleep(10);
+ Thread.sleep(500);
for (int i=0; i < 5; i++) {
manager.swapQueue(queueClass, 5000, "", null);