You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/10/03 12:44:42 UTC

oozie git commit: OOZIE-3160 amend PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (andras.piros)

Repository: oozie
Updated Branches:
  refs/heads/master 4aa7bbde3 -> cf6540977


OOZIE-3160 amend PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/cf654097
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/cf654097
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/cf654097

Branch: refs/heads/master
Commit: cf654097755c3c4997614e1a9844c79193b5bc48
Parents: 4aa7bbd
Author: Andras Piros <an...@cloudera.com>
Authored: Wed Oct 3 14:42:40 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Wed Oct 3 14:42:40 2018 +0200

----------------------------------------------------------------------
 .../oozie/service/AsyncXCommandExecutor.java    | 21 ++++-
 .../oozie/service/CallableQueueService.java     | 85 +++++++++++---------
 core/src/main/resources/oozie-default.xml       | 24 ++++--
 .../service/TestAsyncXCommandExecutor.java      | 23 +++---
 ...TestHAPartitionDependencyManagerService.java |  1 +
 .../java/org/apache/oozie/test/XTestCase.java   |  3 +
 .../java/org/apache/oozie/test/ZKXTestCase.java |  4 +-
 release-log.txt                                 |  1 +
 8 files changed, 108 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
index b18a37a..1f37622 100644
--- a/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
+++ b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
@@ -63,6 +63,7 @@ public class AsyncXCommandExecutor {
     private final long maxActiveCommands;  // equivalent of "queueSize" in CQS
     private final long maxWait;
     private final long maxPriority;
+    private final int awaitTerminationTimeoutSeconds;
 
     private final BlockingQueue<CallableWrapper<?>> priorityBlockingQueue;
     private final BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayWorkQueue;
@@ -78,7 +79,8 @@ public class AsyncXCommandExecutor {
             CallableQueueService callableAccess,
             long maxActiveCommands,
             long maxWait,
-            int priorities) {
+            int priorities,
+            int awaitTerminationTimeoutSeconds) {
 
         priorityBlockingQueue = new PriorityBlockingQueue<CallableWrapper<?>>(100, new PriorityComparator());
 
@@ -115,6 +117,9 @@ public class AsyncXCommandExecutor {
         this.pendingCommandsPerType = new ConcurrentHashMap<>();
         Preconditions.checkArgument(priorities > 0, "Number of priorities must be >0");
         this.maxPriority = priorities - 1;
+        Preconditions.checkArgument(awaitTerminationTimeoutSeconds > 0,
+                String.format("Await termination timeout must be >0, is %s", awaitTerminationTimeoutSeconds));
+        this.awaitTerminationTimeoutSeconds = awaitTerminationTimeoutSeconds;
     }
 
     @VisibleForTesting
@@ -128,7 +133,8 @@ public class AsyncXCommandExecutor {
             ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType,
             AtomicInteger activeCommands,
             long maxWait,
-            long priorities) {
+            long priorities,
+            int awaitTerminationTimeoutSeconds) {
 
         this.priorityBlockingQueue = priorityBlockingQueue;
         this.delayWorkQueue = delayQueue;
@@ -141,6 +147,7 @@ public class AsyncXCommandExecutor {
         this.activeCommands = activeCommands;
         this.maxWait = maxWait;
         this.maxPriority = priorities - 1;
+        this.awaitTerminationTimeoutSeconds = awaitTerminationTimeoutSeconds;
     }
 
     public synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) {
@@ -266,6 +273,14 @@ public class AsyncXCommandExecutor {
         }
     }
 
+    public boolean isShutDown() {
+        return executor.isShutdown() || scheduledExecutor.isShutdown();
+    }
+
+    public boolean isTerminated() {
+        return executor.isTerminated() || scheduledExecutor.isTerminated();
+    }
+
     public List<String> getQueueDump() {
         List<CallableWrapper<?>> copyOfPending = new ArrayList<>(100);
         List<String> queueDump = new ArrayList<>(100);
@@ -399,7 +414,7 @@ public class AsyncXCommandExecutor {
     }
 
     private void shutdownExecutor(ExecutorService executor, String name) throws InterruptedException {
-        long limit = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30);
+        long limit = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(this.awaitTerminationTimeoutSeconds);
         executor.shutdown();
         while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
             log.info("Waiting for [{0}] to shutdown", name);

http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index dc9a099..a942600 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -86,7 +86,9 @@ public class CallableQueueService implements Service, Instrumentable {
 
     public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
     public static final String CONF_THREADS = CONF_PREFIX + "threads";
-    public static final String CONF_OLDIMPL = CONF_PREFIX + "queue.oldImpl";
+    public static final String CONF_NEWIMPL = CONF_PREFIX + "queue.newImpl";
+    public static final String CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS =
+            CONF_PREFIX + "queue.awaitTermination.timeout.seconds";
     public static final String CONF_DELAYED_CALLABLE_THREADS = CONF_PREFIX + "delayedcallable.threads";
     public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
     public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
@@ -110,6 +112,8 @@ public class CallableQueueService implements Service, Instrumentable {
 
     private int maxCallableConcurrency;
 
+    private int queueAwaitTerminationTimeoutSeconds;
+
     private boolean callableBegin(XCallable<?> callable) {
         synchronized (activeCallables) {
             AtomicInteger counter = activeCallables.get(callable.getType());
@@ -136,7 +140,7 @@ public class CallableQueueService implements Service, Instrumentable {
             }
         }
 
-        if (!oldImpl) {
+        if (newImpl) {
             asyncXCommandExecutor.commandFinished();
             asyncXCommandExecutor.checkMaxConcurrency(callable.getType());
         }
@@ -441,7 +445,7 @@ public class CallableQueueService implements Service, Instrumentable {
     private PriorityDelayQueue<CallableWrapper<?>> queue;
     private ThreadPoolExecutor executor;
     private Instrumentation instrumentation;
-    private boolean oldImpl = false;
+    private boolean newImpl = false;
     private AsyncXCommandExecutor asyncXCommandExecutor;
 
     /**
@@ -525,21 +529,11 @@ public class CallableQueueService implements Service, Instrumentable {
 
         interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE);
 
-        oldImpl = ConfigurationService.getBoolean(CONF_OLDIMPL, false);
-        log.info("Using old queue implementation: [{0}]", oldImpl);
+        newImpl = ConfigurationService.getBoolean(CONF_NEWIMPL, true);
+        log.info("Using new queue implementation: [{0}]", newImpl);
+        queueAwaitTerminationTimeoutSeconds = ConfigurationService.getInt(conf, CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS);
 
-        if (oldImpl) {
-            executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
-                    new NamedThreadFactory("CallableQueue")) {
-                protected void beforeExecute(Thread t, Runnable r) {
-                    super.beforeExecute(t,r);
-                    XLog.Info.get().clear();
-                }
-                protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
-                    return (RunnableFuture<T>)callable;
-                }
-            };
-        } else {
+        if (newImpl) {
             int delayedCallableThreads = ConfigurationService.getInt(CONF_DELAYED_CALLABLE_THREADS, 1);
 
             asyncXCommandExecutor = new AsyncXCommandExecutor(threads,
@@ -548,9 +542,21 @@ public class CallableQueueService implements Service, Instrumentable {
                     this,
                     queueSize,
                     MAX_CALLABLE_WAITTIME_MS,
-                    PRIORITIES);
+                    PRIORITIES,
+                    queueAwaitTerminationTimeoutSeconds);
 
             executor = asyncXCommandExecutor.getExecutorService();
+        } else {
+            executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
+                    new NamedThreadFactory("CallableQueue")) {
+                protected void beforeExecute(Thread t, Runnable r) {
+                    super.beforeExecute(t,r);
+                    XLog.Info.get().clear();
+                }
+                protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+                    return (RunnableFuture<T>)callable;
+                }
+            };
         }
 
         // IMPORTANT: The ThreadPoolExecutor does not always the execute
@@ -586,20 +592,22 @@ public class CallableQueueService implements Service, Instrumentable {
     @Override
     public void destroy() {
         try {
-            long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
-            executor.shutdown();
             queue.clear();
-            while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
-                log.info("Waiting for executor to shutdown");
-                if (System.currentTimeMillis() > limit) {
-                    log.warn("Gave up, continuing without waiting for executor to shutdown");
-                    break;
-                }
-            }
-
-            if (!oldImpl) {
+            if (newImpl) {
                 asyncXCommandExecutor.shutdown();
             }
+            else {
+                long limit = System.currentTimeMillis() + queueAwaitTerminationTimeoutSeconds * 1000;
+                executor.shutdown();
+                queue.clear();
+                while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
+                    log.info("Waiting for executor to shutdown");
+                    if (System.currentTimeMillis() > limit) {
+                        log.warn("Gave up, continuing without waiting for executor to shutdown");
+                        break;
+                    }
+                }
+            }
         }
         catch (InterruptedException ex) {
             log.warn(ex);
@@ -620,11 +628,18 @@ public class CallableQueueService implements Service, Instrumentable {
      * @return int size of queue
      */
     public synchronized int queueSize() {
-        return oldImpl ? queue.size() : asyncXCommandExecutor.getSize();
+        return newImpl ? asyncXCommandExecutor.getSize() : queue.size();
     }
 
     private synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) {
-        if (oldImpl) {
+        if (newImpl) {
+            if (asyncXCommandExecutor.isShutDown() || asyncXCommandExecutor.isTerminated()) {
+                log.warn("Async executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
+            }
+            else {
+                asyncXCommandExecutor.queue(wrapper, ignoreQueueSize);
+            }
+        } else {
             if (!ignoreQueueSize && queue.size() >= queueSize) {
                 log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
                 return false;
@@ -644,8 +659,6 @@ public class CallableQueueService implements Service, Instrumentable {
             else {
                 log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
             }
-        } else {
-            asyncXCommandExecutor.queue(wrapper, ignoreQueueSize);
         }
 
         return true;
@@ -820,7 +833,9 @@ public class CallableQueueService implements Service, Instrumentable {
      * @return the list of string that representing each CallableWrapper
      */
     public List<String> getQueueDump() {
-        if (oldImpl) {
+        if (newImpl) {
+            return asyncXCommandExecutor.getQueueDump();
+        } else {
             List<String> list = new ArrayList<String>();
             for (QueueElement<CallableWrapper<?>> qe : queue) {
                 if (qe.toString() == null) {
@@ -829,8 +844,6 @@ public class CallableQueueService implements Service, Instrumentable {
                 list.add(qe.toString());
             }
             return list;
-        } else {
-            return asyncXCommandExecutor.getQueueDump();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index f292ad3..6c7fc9d 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -518,13 +518,27 @@
     </property>
 
     <property>
-        <name>oozie.service.CallableQueueService.queue.oldImpl</name>
+        <name>oozie.service.CallableQueueService.queue.newImpl</name>
         <value>true</value>
         <description>
-            If set to false, then CallableQueueService will use a more performant, less CPU-intensive
-            queuing mechanism to execute asynchronous tasks internally. The old implementation generates
-            noticeable CPU load even if Oozie is completely idle, especially when oozie.service.CallableQueueService.threads
-            set to a large number. The previous queuing mechanism is kept as a fallback option.
+            If set to true, then CallableQueueService will use a faster, less CPU-intensive queuing mechanism to execute
+            asynchronous tasks internally.
+            The old implementation generates noticeable CPU load even if Oozie is completely idle, especially when
+            oozie.service.CallableQueueService.threads is set to a large number. The previous queuing mechanism is kept as a
+            fallback option.
+            This is an experimental feature in Oozie 5.1.0 that needs to be re-evaluated upon an upcoming minor release,
+            meaning the old implementation and this feature flag will also be removed.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.CallableQueueService.queue.awaitTermination.timeout.seconds</name>
+        <value>30</value>
+        <description>
+            Number of seconds while awaiting termination of ThreadPoolExecutor instances when CallableQueueService#destroy()
+            is called, in seconds.
+            The more elements you tend to have in your callable queue, the more you want CallableQueueService to wait
+            before shutting down its thread pools.
         </description>
     </property>
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
index 2dce409..e0d14d6 100644
--- a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
+++ b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
@@ -74,6 +74,7 @@ public class TestAsyncXCommandExecutor {
     private static final long DEFAULT_MAXWAIT = 30_000;
     private static final int TEST_PRIORITIES = 5;
     private static final int MAX_PRIORITY = TEST_PRIORITIES - 1;
+    private static final int AWAIT_TERMINATION_TIMEOUT_SECONDS = 1;
 
     @Mock
     private ThreadPoolExecutor executor;
@@ -100,7 +101,7 @@ public class TestAsyncXCommandExecutor {
         pendingCommandsPerType = new ConcurrentHashMap<>();
         delayQueue = new LinkedBlockingQueue<>();  // in reality it's not LBQ, but it's fine here
         asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT,
-                TEST_PRIORITIES);
+                TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
         when(callableWrapper.filterDuplicates()).thenReturn(true);
         when(callableWrapper.getElement().getKey()).thenReturn("key");
         when(callableWrapper.getElement().getType()).thenReturn(DEFAULT_TYPE);
@@ -158,7 +159,7 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void testSubmissionSuccessfulAfterDelayWhenMaxConcurrencyCheckDisabled() {
-        asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES);
+        asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
         when(callableWrapper.getInitialDelay()).thenReturn(100L);
         when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
         XCallable<?> wrappedCommand = mock(XCallable.class);
@@ -242,7 +243,7 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void testSubmissionWhenQueueIsFull() {
-        asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES);
+        asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
         callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
         when(callableWrapper.filterDuplicates()).thenReturn(true);
         when(callableWrapper.getElement().getKey()).thenReturn("key");
@@ -257,7 +258,7 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void testSubmissionWhenQueueSizeIsIgnored() {
-        asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES);
+        asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
         callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
         when(callableWrapper.filterDuplicates()).thenReturn(true);
         when(callableWrapper.getElement().getKey()).thenReturn("key");
@@ -369,7 +370,8 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void testAntiStarvationWhenDelayIsAboveMaxWait() {
-        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES);
+        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES,
+                AWAIT_TERMINATION_TIMEOUT_SECONDS);
         when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-40000L);
         when(callableWrapper.getPriority()).thenReturn(0);
         pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
@@ -394,7 +396,8 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void testAntiStarvationWhenPriorityIsHighest() {
-        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES);
+        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES,
+                AWAIT_TERMINATION_TIMEOUT_SECONDS);
         when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-1000L);
         when(callableWrapper.getPriority()).thenReturn(MAX_PRIORITY);
         pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
@@ -419,7 +422,8 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void testPriorityHandling() {
-        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 100, DEFAULT_MAXWAIT, 100);
+        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 100, DEFAULT_MAXWAIT, 100,
+                AWAIT_TERMINATION_TIMEOUT_SECONDS);
         doAnswer(new Answer<Void>() {
             @Override
             public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -490,7 +494,7 @@ public class TestAsyncXCommandExecutor {
     }
 
     private AsyncXCommandExecutor createExecutor(boolean needMaxConcurrencyCheck, int maxActiveCallables,
-            long maxWait, int priorities) {
+            long maxWait, int priorities, int awaitTerminationTimeoutSeconds) {
         return new AsyncXCommandExecutor(needMaxConcurrencyCheck,
                 callableQueueService,
                 maxActiveCallables,
@@ -501,6 +505,7 @@ public class TestAsyncXCommandExecutor {
                 pendingCommandsPerType,
                 activeCommands,
                 maxWait,
-                priorities);
+                priorities,
+                awaitTerminationTimeoutSeconds);
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
index 3e1df07..a7c7e07 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
@@ -57,6 +57,7 @@ public class TestHAPartitionDependencyManagerService extends ZKXTestCase {
     }
 
     protected void tearDown() throws Exception {
+        services.destroy();
         super.tearDown();
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index 3048dda..1f0cf17 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -81,6 +81,7 @@ import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.hadoop.LauncherMain;
 import org.apache.oozie.dependency.FSURIHandler;
 import org.apache.oozie.dependency.HCatURIHandler;
+import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HCatAccessorService;
 import org.apache.oozie.service.HadoopAccessorException;
@@ -409,6 +410,8 @@ public abstract class XTestCase extends TestCase {
         oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll("org.apache.oozie.service.ShareLibService,",""));
         // Make sure to create the Oozie DB during unit tests
         oozieSiteConf.set(JPAService.CONF_CREATE_DB_SCHEMA, "true");
+        // Make sure thread pools shut down in a timely manner
+        oozieSiteConf.set(CallableQueueService.CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS, "1");
         File target = new File(testCaseConfDir, "oozie-site.xml");
         oozieSiteConf.writeXml(new FileOutputStream(target));
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
index abc7c9f..bfa2d85 100644
--- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
@@ -97,7 +97,9 @@ public abstract class ZKXTestCase extends XDataTestCase {
     @Override
     protected void tearDown() throws Exception {
         super.tearDown();
-        Services.get().destroy();
+        if (Services.get() != null) {
+            Services.get().destroy();
+        }
         sDiscovery.close();
         sDiscovery = null;
         client.close();

http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index d4d0b1e..8a72d5e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3160 amend PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (andras.piros)
 OOZIE-3354 [core] [SSH action] SSH action gets hung (andras.piros)
 OOZIE-3343 amend [build] [tests] Add the first five test errors per module to the report (kmarton via andras.piros)
 OOZIE-3348 [Hive action] Remove dependency hive-contrib (kmarton via andras.piros)