You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2018/09/06 12:44:03 UTC

oozie git commit: OOZIE-3160 PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (pbacsko)

Repository: oozie
Updated Branches:
  refs/heads/master fe2da6e57 -> 13bfd4949


OOZIE-3160 PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (pbacsko)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/13bfd494
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/13bfd494
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/13bfd494

Branch: refs/heads/master
Commit: 13bfd49495d095f51d22d0f51042690837b701a4
Parents: fe2da6e
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Thu Sep 6 14:42:59 2018 +0200
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Thu Sep 6 14:42:59 2018 +0200

----------------------------------------------------------------------
 .../oozie/service/AsyncXCommandExecutor.java    | 412 +++++++++++++++++
 .../oozie/service/CallableQueueService.java     | 145 ++++--
 .../apache/oozie/util/PriorityDelayQueue.java   |  26 ++
 core/src/main/resources/oozie-default.xml       |  25 +
 .../service/TestAsyncXCommandExecutor.java      | 462 +++++++++++++++++++
 .../oozie/service/TestCallableQueueService.java | 292 +++++++++++-
 release-log.txt                                 |   1 +
 7 files changed, 1308 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
new file mode 100644
index 0000000..b18a37a
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.oozie.service.CallableQueueService.CallableWrapper;
+import org.apache.oozie.util.NamedThreadFactory;
+import org.apache.oozie.util.XCallable;
+import org.apache.oozie.util.XLog;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressWarnings("deprecation")
+public class AsyncXCommandExecutor {
+    public static final int MIN_PRIORITY = 0;
+    public static final long ANTI_STARVATION_INTERVAL = 500;
+    private static XLog log = XLog.getLog(AsyncXCommandExecutor.class);
+    private final ThreadPoolExecutor executor;
+    private final ScheduledThreadPoolExecutor scheduledExecutor;
+    private final boolean needConcurrencyCheck;
+    private final CallableQueueService callableQueueService;
+    private final AtomicInteger activeCommands;
+    private final long maxActiveCommands;  // equivalent of "queueSize" in CQS
+    private final long maxWait;
+    private final long maxPriority;
+
+    private final BlockingQueue<CallableWrapper<?>> priorityBlockingQueue;
+    private final BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayWorkQueue;
+    private final ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType;
+    private long lastAntiStarvationCheck = 0;
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    @SuppressFBWarnings( value = "SIC_INNER_SHOULD_BE_STATIC_ANON",
+        justification = "Unnecessary to refactor innen classes defined here")
+    public AsyncXCommandExecutor(int threads,
+            int delayedCallableThreads,
+            boolean needConcurrencyCheck,
+            CallableQueueService callableAccess,
+            long maxActiveCommands,
+            long maxWait,
+            int priorities) {
+
+        priorityBlockingQueue = new PriorityBlockingQueue<CallableWrapper<?>>(100, new PriorityComparator());
+
+        executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS,
+                (BlockingQueue) priorityBlockingQueue,
+                new NamedThreadFactory("CallableQueue")) {
+            protected void beforeExecute(Thread t, Runnable r) {
+                XLog.Info.get().clear();
+            }
+
+            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+                return (RunnableFuture<T>)callable;
+            }
+        };
+
+        this.scheduledExecutor = new ScheduledThreadPoolExecutor(delayedCallableThreads,
+                new NamedThreadFactory("ScheduledCallable")) {
+            protected <V> RunnableScheduledFuture<V> decorateTask(
+                    Runnable runnable, RunnableScheduledFuture<V> task) {
+
+                    AccessibleRunnableScheduledFuture<V> arsf =
+                            new AccessibleRunnableScheduledFuture<>(task, runnable);
+
+                    return arsf;
+            }
+        };
+
+        this.delayWorkQueue = (BlockingQueue) scheduledExecutor.getQueue();
+        this.needConcurrencyCheck = needConcurrencyCheck;
+        this.callableQueueService = callableAccess;
+        this.maxActiveCommands = maxActiveCommands;
+        this.maxWait = maxWait;
+        this.activeCommands = new AtomicInteger(0);
+        this.pendingCommandsPerType = new ConcurrentHashMap<>();
+        Preconditions.checkArgument(priorities > 0, "Number of priorities must be >0");
+        this.maxPriority = priorities - 1;
+    }
+
+    @VisibleForTesting
+    AsyncXCommandExecutor(boolean needConcurrencyCheck,
+            CallableQueueService callableAccess,
+            long maxActiveCommands,
+            ThreadPoolExecutor executor,
+            ScheduledThreadPoolExecutor scheduledExecutor,
+            PriorityBlockingQueue<CallableWrapper<?>> priorityBlockingQueue,
+            BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayQueue,
+            ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType,
+            AtomicInteger activeCommands,
+            long maxWait,
+            long priorities) {
+
+        this.priorityBlockingQueue = priorityBlockingQueue;
+        this.delayWorkQueue = delayQueue;
+        this.pendingCommandsPerType = pendingCommandsPerType;
+        this.executor = executor;
+        this.scheduledExecutor = scheduledExecutor;
+        this.needConcurrencyCheck = needConcurrencyCheck;
+        this.callableQueueService = callableAccess;
+        this.maxActiveCommands = maxActiveCommands;
+        this.activeCommands = activeCommands;
+        this.maxWait = maxWait;
+        this.maxPriority = priorities - 1;
+    }
+
+    public synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) {
+        if (!ignoreQueueSize && activeCommands.get() >= maxActiveCommands) {
+            log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
+            return false;
+        }
+
+        if (wrapper.filterDuplicates()) {
+            wrapper.addToUniqueCallables();
+
+            int priority = wrapper.getPriority();
+            long initialDelay = wrapper.getInitialDelay();
+
+            try {
+                if (priority > maxPriority || priority < MIN_PRIORITY) {
+                    throw new IllegalArgumentException("priority out of range: " + priority);
+                }
+
+                if (initialDelay == 0) {
+                    executor.execute(wrapper);
+                } else {
+                    ScheduledXCallable scheduledXCallable = new ScheduledXCallable(wrapper);
+                    long schedDelay = wrapper.getDelay(TimeUnit.MILLISECONDS);
+                    scheduledExecutor.schedule(scheduledXCallable,
+                            schedDelay, TimeUnit.MILLISECONDS);
+                }
+
+                activeCommands.incrementAndGet();
+            } catch (Throwable ree) {
+                wrapper.removeFromUniqueCallables();
+                throw new RuntimeException(ree);
+            }
+        }
+
+        return true;
+    }
+
+    public void handleConcurrencyExceeded(CallableWrapper<?> command) {
+        String type = command.getElement().getType();
+
+        Set<CallableWrapper<?>> commandsForType = pendingCommandsPerType.get(type);
+        if (commandsForType == null) {
+            commandsForType = new ConcurrentHashSet<>();
+            Set<CallableWrapper<?>> oldCommandForType;
+            oldCommandForType = pendingCommandsPerType.putIfAbsent(type, commandsForType);
+
+            if (oldCommandForType != null) {
+                // a different thread was faster
+                commandsForType = oldCommandForType;
+            }
+        }
+
+        commandsForType.add(command);
+    }
+
+    public void checkMaxConcurrency(String type) {
+        Set<CallableWrapper<?>> commandsForType = pendingCommandsPerType.get(type);
+
+        if (commandsForType != null) {
+            // Only a single thread should be doing stuff here! Reason: concurrent executions might
+            // submit an eligible XCallable multiple times, which must be avoided.
+            synchronized (commandsForType) {
+                boolean doAntiStarvation = false;
+                int priorityModified = 0;
+                long now = System.currentTimeMillis();
+                if (now - lastAntiStarvationCheck > ANTI_STARVATION_INTERVAL) {
+                    doAntiStarvation = true;
+                }
+
+                for (Iterator<CallableWrapper<?>> itr = commandsForType.iterator(); itr.hasNext();) {
+                    CallableWrapper<?> command = itr.next();
+
+                    // Anti-starvation logic: try to promote callables that have been waiting for too long
+                    int currentPrio = command.getPriority();
+                    if (doAntiStarvation
+                            && command.getDelay(TimeUnit.MILLISECONDS) < -maxWait
+                            && currentPrio < maxPriority) {
+                        command.setDelay(0, TimeUnit.MILLISECONDS);
+                        command.setPriority(++currentPrio);
+                        priorityModified++;
+                    }
+
+                    if (callableQueueService.canSubmitCallable(command.getElement())) {
+                        if (activeCommands.get() >= maxActiveCommands) {
+                            log.warn("queue full, ignoring queuing for [{0}]", command.getElement().getKey());
+                            activeCommands.decrementAndGet();
+                        } else {
+                            executor.execute(command);
+                        }
+
+                        itr.remove();
+                    }
+                }
+
+                if (doAntiStarvation) {
+                    lastAntiStarvationCheck = System.currentTimeMillis();
+                }
+
+                if (priorityModified > 0) {
+                    log.debug("Anti-starvation: handled [{0}] elements", priorityModified);
+                }
+            }
+        }
+    }
+
+    public void commandFinished() {
+        // Note: this is to track the number of elements. Otherwise we'd have to combine the size of
+        // two queues + a list.
+        activeCommands.decrementAndGet();
+    }
+
+    public ThreadPoolExecutor getExecutorService() {
+        return executor;
+    }
+
+    public void shutdown() {
+        try {
+            shutdownExecutor(executor, "executor");
+            shutdownExecutor(scheduledExecutor, "scheduled executor");
+        } catch (InterruptedException e) {
+            log.warn("Interrupted while waiting for executor shutdown");
+        }
+    }
+
+    public List<String> getQueueDump() {
+        List<CallableWrapper<?>> copyOfPending = new ArrayList<>(100);
+        List<String> queueDump = new ArrayList<>(100);
+
+        // Safe to iterate
+        for (Map.Entry<String, Set<CallableWrapper<?>>> entry : pendingCommandsPerType.entrySet()) {
+            Set<CallableWrapper<?>> pendingCommandsPerType = entry.getValue();
+            copyOfPending.addAll(pendingCommandsPerType);
+        }
+
+        // Safe to iterate
+        for (final CallableWrapper<?> wrapper : priorityBlockingQueue) {
+            queueDump.add(wrapper.toString());
+        }
+
+        // Safe to iterate
+        for (final AccessibleRunnableScheduledFuture<ScheduledXCallable> future : delayWorkQueue) {
+            ScheduledXCallable delayedXCallable = (ScheduledXCallable) future.getTask();
+            queueDump.add(delayedXCallable.getCallableWrapper().toString());
+        }
+
+        for (final CallableWrapper<?> wrapper : copyOfPending) {
+            queueDump.add(wrapper.toString());
+        }
+
+        return queueDump;
+    }
+
+    public int getSize() {
+        return activeCommands.get();
+    }
+
+    public class ScheduledXCallable implements Runnable {
+        private CallableWrapper<?> target;
+
+        public ScheduledXCallable(CallableWrapper<?> target) {
+            this.target = target;
+        }
+
+        @Override
+        public void run() {
+            if (needConcurrencyCheck && !callableQueueService.canSubmitCallable(target.getElement())) {
+                XCallable<?> callable = target.getElement();
+                handleConcurrencyExceeded(target);
+
+                // need this to deal with a special race condition: we detect that concurrency
+                // exceeded, but an XCommand (or more!) with the same type just happens to finish. If that
+                // happens, this callable might never get scheduled again (or much later), so we have to guard
+                // against this condition.
+                checkMaxConcurrency(callable.getType());
+            } else {
+                executor.execute(target);
+            }
+        }
+
+        public CallableWrapper<?> getCallableWrapper() {
+            return target;
+        }
+    }
+
+    @SuppressFBWarnings(value = "SE_COMPARATOR_SHOULD_BE_SERIALIZABLE",
+            justification = "PriorityBlockingQueue which uses this comparator will never be serialized")
+    public static class PriorityComparator implements Comparator<CallableWrapper<?>> {
+        @Override
+        public int compare(CallableWrapper<?> o1, CallableWrapper<?> o2) {
+            return Integer.compare(o2.getPriority(), o1.getPriority());
+        }
+    }
+
+    // We have to use this so that scheduled elements in the DelayWorkQueue are accessible
+    @SuppressFBWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
+            justification = "This class has a natural ordering (expiration) which is inconsistent with equals")
+    public static class AccessibleRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
+        private final Runnable task;
+        private RunnableScheduledFuture<V> originalFuture;
+
+        public AccessibleRunnableScheduledFuture(RunnableScheduledFuture<V> originalFuture,
+                Runnable task) {
+            this.task = task;
+            this.originalFuture = originalFuture;
+        }
+
+        @Override
+        public void run() {
+            originalFuture.run();
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return originalFuture.cancel(mayInterruptIfRunning);
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return originalFuture.isCancelled();
+        }
+
+        @Override
+        public boolean isDone() {
+            return originalFuture.isDone();
+        }
+
+        @Override
+        public V get() throws InterruptedException, ExecutionException {
+            return originalFuture.get();
+        }
+
+        @Override
+        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return originalFuture.get(timeout, unit);
+        }
+
+        @Override
+        public long getDelay(TimeUnit unit) {
+            return originalFuture.getDelay(unit);
+        }
+
+        @Override
+        public int compareTo(Delayed o) {
+            return originalFuture.compareTo(o);
+        }
+
+        @Override
+        public boolean isPeriodic() {
+            return originalFuture.isPeriodic();
+        }
+
+        public Runnable getTask() {
+            return task;
+        }
+    }
+
+    private void shutdownExecutor(ExecutorService executor, String name) throws InterruptedException {
+        long limit = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30);
+        executor.shutdown();
+        while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
+            log.info("Waiting for [{0}] to shutdown", name);
+            if (System.currentTimeMillis() > limit) {
+                log.warn("Gave up, continuing without waiting for executor to shutdown");
+                break;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index ef8d58d..dc9a099 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -72,6 +72,7 @@ import com.google.common.collect.ImmutableSet;
  * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
  * sometime in the future.
  */
+@SuppressWarnings("deprecation")
 public class CallableQueueService implements Service, Instrumentable {
     private static final String INSTRUMENTATION_GROUP = "callablequeue";
     private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
@@ -85,14 +86,17 @@ public class CallableQueueService implements Service, Instrumentable {
 
     public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
     public static final String CONF_THREADS = CONF_PREFIX + "threads";
+    public static final String CONF_OLDIMPL = CONF_PREFIX + "queue.oldImpl";
+    public static final String CONF_DELAYED_CALLABLE_THREADS = CONF_PREFIX + "delayedcallable.threads";
     public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
     public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
     public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
     public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
 
     public static final int CONCURRENCY_DELAY = 500;
-
     public static final int SAFE_MODE_DELAY = 60000;
+    public static final int MAX_CALLABLE_WAITTIME_MS = 30_000;
+    public static final int PRIORITIES = 3;
 
     private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
 
@@ -131,6 +135,11 @@ public class CallableQueueService implements Service, Instrumentable {
                 counter.decrementAndGet();
             }
         }
+
+        if (!oldImpl) {
+            asyncXCommandExecutor.commandFinished();
+            asyncXCommandExecutor.checkMaxConcurrency(callable.getType());
+        }
     }
 
     private boolean callableReachMaxConcurrency(XCallable<?> callable) {
@@ -146,6 +155,21 @@ public class CallableQueueService implements Service, Instrumentable {
         }
     }
 
+    public boolean canSubmitCallable(XCallable<?> callable) {
+        synchronized (activeCallables) {
+            AtomicInteger counter = activeCallables.get(callable.getType());
+            if (counter == null) {
+                counter = new AtomicInteger(1);
+                activeCallables.put(callable.getType(), counter);
+                return true;
+            }
+            else {
+                int i = counter.get();
+                return i <= maxCallableConcurrency;
+            }
+        }
+    }
+
     // Callables are wrapped with the this wrapper for execution, for logging
     // and instrumentation.
     // The wrapper implements Runnable and Comparable to be able to work with an
@@ -414,9 +438,11 @@ public class CallableQueueService implements Service, Instrumentable {
     private XLog log = XLog.getLog(getClass());
 
     private int queueSize;
-    private PriorityDelayQueue<CallableWrapper> queue;
+    private PriorityDelayQueue<CallableWrapper<?>> queue;
     private ThreadPoolExecutor executor;
     private Instrumentation instrumentation;
+    private boolean oldImpl = false;
+    private AsyncXCommandExecutor asyncXCommandExecutor;
 
     /**
      * Convenience method for instrumentation counters.
@@ -458,7 +484,10 @@ public class CallableQueueService implements Service, Instrumentable {
         interruptTypes = ImmutableSet.copyOf(interruptTypes);
 
         if (!callableNextEligible) {
-            queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
+            queue = new PriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,
+                    MAX_CALLABLE_WAITTIME_MS,
+                    TimeUnit.MILLISECONDS,
+                    queueSize) {
                 @Override
                 protected void debug(String msgTemplate, Object... msgArgs) {
                     log.trace(msgTemplate, msgArgs);
@@ -471,7 +500,10 @@ public class CallableQueueService implements Service, Instrumentable {
             // which has not yet reach max concurrency.Overrided method
             // 'eligibleToPoll' to check if the
             // element of this queue has reached the maximum concurrency.
-            queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
+            queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,
+                    MAX_CALLABLE_WAITTIME_MS,
+                    TimeUnit.MILLISECONDS,
+                    queueSize) {
                 @Override
                 protected void debug(String msgTemplate, Object... msgArgs) {
                     log.trace(msgTemplate, msgArgs);
@@ -493,28 +525,45 @@ public class CallableQueueService implements Service, Instrumentable {
 
         interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE);
 
+        oldImpl = ConfigurationService.getBoolean(CONF_OLDIMPL, false);
+        log.info("Using old queue implementation: [{0}]", oldImpl);
+
+        if (oldImpl) {
+            executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
+                    new NamedThreadFactory("CallableQueue")) {
+                protected void beforeExecute(Thread t, Runnable r) {
+                    super.beforeExecute(t,r);
+                    XLog.Info.get().clear();
+                }
+                protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+                    return (RunnableFuture<T>)callable;
+                }
+            };
+        } else {
+            int delayedCallableThreads = ConfigurationService.getInt(CONF_DELAYED_CALLABLE_THREADS, 1);
+
+            asyncXCommandExecutor = new AsyncXCommandExecutor(threads,
+                    delayedCallableThreads,
+                    callableNextEligible,
+                    this,
+                    queueSize,
+                    MAX_CALLABLE_WAITTIME_MS,
+                    PRIORITIES);
+
+            executor = asyncXCommandExecutor.getExecutorService();
+        }
+
         // IMPORTANT: The ThreadPoolExecutor does not always the execute
         // commands out of the queue, there are
         // certain conditions where commands are pushed directly to a thread.
         // As we are using a queue with DELAYED semantics (i.e. execute the
         // command in 5 mins) we need to make
         // sure that the commands are always pushed to the queue.
-        // To achieve this (by looking a the ThreadPoolExecutor.execute()
+        // To achieve this (by looking at the ThreadPoolExecutor.execute()
         // implementation, we are making the pool
         // minimum size equals to the maximum size (thus threads are keep always
         // running) and we are warming up
         // all those threads (the for loop that runs dummy runnables).
-        executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
-                new NamedThreadFactory("CallableQueue")) {
-            protected void beforeExecute(Thread t, Runnable r) {
-                super.beforeExecute(t,r);
-                XLog.Info.get().clear();
-            }
-            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
-                return (RunnableFuture<T>)callable;
-            }
-        };
-
         for (int i = 0; i < threads; i++) {
             executor.execute(new Runnable() {
                 public void run() {
@@ -547,6 +596,10 @@ public class CallableQueueService implements Service, Instrumentable {
                     break;
                 }
             }
+
+            if (!oldImpl) {
+                asyncXCommandExecutor.shutdown();
+            }
         }
         catch (InterruptedException ex) {
             log.warn(ex);
@@ -567,29 +620,34 @@ public class CallableQueueService implements Service, Instrumentable {
      * @return int size of queue
      */
     public synchronized int queueSize() {
-        return queue.size();
+        return oldImpl ? queue.size() : asyncXCommandExecutor.getSize();
     }
 
-    private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) {
-        if (!ignoreQueueSize && queue.size() >= queueSize) {
-            log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
-            return false;
-        }
-        if (!executor.isShutdown()) {
-            if (wrapper.filterDuplicates()) {
-                wrapper.addToUniqueCallables();
-                try {
-                    executor.execute(wrapper);
-                }
-                catch (Throwable ree) {
-                    wrapper.removeFromUniqueCallables();
-                    throw new RuntimeException(ree);
+    private synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) {
+        if (oldImpl) {
+            if (!ignoreQueueSize && queue.size() >= queueSize) {
+                log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
+                return false;
+            }
+            if (!executor.isShutdown()) {
+                if (wrapper.filterDuplicates()) {
+                    wrapper.addToUniqueCallables();
+                    try {
+                        executor.execute(wrapper);
+                    }
+                    catch (Throwable ree) {
+                        wrapper.removeFromUniqueCallables();
+                        throw new RuntimeException(ree);
+                    }
                 }
             }
+            else {
+                log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
+            }
+        } else {
+            asyncXCommandExecutor.queue(wrapper, ignoreQueueSize);
         }
-        else {
-            log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
-        }
+
         return true;
     }
 
@@ -637,7 +695,7 @@ public class CallableQueueService implements Service, Instrumentable {
         }
         else {
             checkInterruptTypes(callable);
-            queued = queue(new CallableWrapper(callable, delay), false);
+            queued = queue(new CallableWrapper<>(callable, delay), false);
             if (queued) {
                 incrCounter(INSTR_QUEUED_COUNTER, 1);
             }
@@ -762,14 +820,18 @@ public class CallableQueueService implements Service, Instrumentable {
      * @return the list of string that representing each CallableWrapper
      */
     public List<String> getQueueDump() {
-        List<String> list = new ArrayList<String>();
-        for (QueueElement<CallableWrapper> qe : queue) {
-            if (qe.toString() == null) {
-                continue;
+        if (oldImpl) {
+            List<String> list = new ArrayList<String>();
+            for (QueueElement<CallableWrapper<?>> qe : queue) {
+                if (qe.toString() == null) {
+                    continue;
+                }
+                list.add(qe.toString());
             }
-            list.add(qe.toString());
+            return list;
+        } else {
+            return asyncXCommandExecutor.getQueueDump();
         }
-        return list;
     }
 
     /**
@@ -794,5 +856,4 @@ public class CallableQueueService implements Service, Instrumentable {
     public Set<String> getInterruptTypes() {
         return interruptTypes;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
index 75c2069..365f918 100644
--- a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
+++ b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
@@ -51,7 +51,10 @@ import java.util.concurrent.locks.ReentrantLock;
  * <p>
  * This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and
  * seeking operations. This check is performed, the most every 1/2 second.
+ *
+ * @deprecated this implementation will be removed in the future and AsyncCommandExecutor will be used.
  */
+@Deprecated
 public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.QueueElement<E>>
         implements BlockingQueue<PriorityDelayQueue.QueueElement<E>> {
 
@@ -65,6 +68,7 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
         private int priority;
         private long baseTime;
         boolean inQueue;
+        private long initialDelay;
 
         /**
          * Create an Element wrapper.
@@ -88,6 +92,7 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
             this.element = element;
             this.priority = priority;
             setDelay(delay, unit);
+            this.initialDelay = delay;
         }
 
         /**
@@ -100,6 +105,15 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
         }
 
         /**
+         * Sets the priority of the element.
+         *
+         * @param priority the priority of the element
+         */
+        public void setPriority(int priority) {
+            this.priority = priority;
+        }
+
+        /**
          * Return the priority of the element.
          *
          * @return the priority of the element.
@@ -116,6 +130,7 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
          */
         public void setDelay(long delay, TimeUnit unit) {
             baseTime = System.currentTimeMillis() + unit.toMillis(delay);
+            initialDelay = delay;
         }
 
         /**
@@ -130,6 +145,17 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
         }
 
         /**
+         * Returns the original delay of the element. As time goes on, this value remains static,
+         * as opposed to getDelay(), where the delay depends on how much time has passed since the
+         * creation.
+         *
+         * @return the initial delay of this element in milliseconds.
+         */
+        public long getInitialDelay() {
+            return initialDelay;
+        }
+
+        /**
          * Compare the age of this wrapper element with another. The priority is not used for the comparision.
          *
          * @param o the other wrapper element to compare with.

http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index c354f02..c3bcdfc 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -504,6 +504,31 @@
     </property>
 
     <property>
+        <name>oozie.service.CallableQueueService.delayedcallable.threads</name>
+        <value>1</value>
+        <description>
+            The number of threads where delayed tasks are executed. Upon expiration, the tasks are immediately
+            inserted into the main queue to properly handle priorities. This means that no actual business logic
+            is executed in this thread pool, so under normal circumstances, this value can be set to a low number.
+
+            Note that this property is completely unrelated to oozie.service.SchedulerService.threads which
+            tells how many scheduled background tasks can run in parallel at the same time (like PurgeService,
+            StatusTransitService, etc).
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.CallableQueueService.queue.oldImpl</name>
+        <value>false</value>
+        <description>
+            If set to false, then CallableQueueService will use a more performant, less CPU-intensive
+            queuing mechanism to execute asynchronous tasks internally. The old implementation generates
+            noticeable CPU load even if Oozie is completely idle, especially when oozie.service.CallableQueueService.threads
+            set to a large number. The previous queuing mechanism is kept as a fallback option.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.service.CallableQueueService.callable.concurrency</name>
         <value>3</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
new file mode 100644
index 0000000..f9ec4d6
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.oozie.service.AsyncXCommandExecutor.AccessibleRunnableScheduledFuture;
+import org.apache.oozie.service.AsyncXCommandExecutor.ScheduledXCallable;
+import org.apache.oozie.service.CallableQueueService.CallableWrapper;
+import org.apache.oozie.util.XCallable;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Sets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.times;
+import static org.mockito.Matchers.same;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+
+@RunWith(MockitoJUnitRunner.class)
+@SuppressWarnings("deprecation")
+public class TestAsyncXCommandExecutor {
+    private static final String DEFAULT_TYPE = "test";
+    private static final int DEFAULT_MAX_ACTIVE_COMMANDS = 5;
+    private static final boolean DEFAULT_ENABLE_CONCURRENCY_CHECK = true;
+    private static final long DEFAULT_MAXWAIT = 30_000;
+    private static final int TEST_PRIORITIES = 5;
+    private static final int MAX_PRIORITY = TEST_PRIORITIES - 1;
+
+    @Mock
+    private ThreadPoolExecutor executor;
+
+    @Mock
+    private ScheduledThreadPoolExecutor scheduledExecutor;
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private CallableWrapper<?> callableWrapper;
+
+    @Mock
+    private CallableQueueService callableQueueService;
+
+    private PriorityBlockingQueue<CallableWrapper<?>> priorityBlockingQueue;
+    private BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayQueue;
+    private ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType;
+    private AtomicInteger activeCommands;
+    private AsyncXCommandExecutor asyncExecutor;
+
+    @Before
+    public void setup() {
+        activeCommands = new AtomicInteger(0);
+        priorityBlockingQueue = new PriorityBlockingQueue<>();
+        pendingCommandsPerType = new ConcurrentHashMap<>();
+        delayQueue = new LinkedBlockingQueue<>();  // in reality it's not LBQ, but it's fine here
+        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT);
+        when(callableWrapper.filterDuplicates()).thenReturn(true);
+        when(callableWrapper.getElement().getKey()).thenReturn("key");
+        when(callableWrapper.getElement().getType()).thenReturn(DEFAULT_TYPE);
+    }
+
+    @Test
+    public void testSubmitCallableWithNoDelay() {
+        boolean result = asyncExecutor.queue(callableWrapper, false);
+
+        verify(executor).execute(same(callableWrapper));
+        verifyZeroInteractions(scheduledExecutor);
+        assertEquals("Active commands", 1, asyncExecutor.getSize());
+        assertTrue("Queuing result", result);
+    }
+
+    @Test
+    public void testSubmitCallableWithDelay() {
+        when(callableWrapper.getInitialDelay()).thenReturn(111L);
+        when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(222L);
+
+        boolean result = asyncExecutor.queue(callableWrapper, false);
+
+        verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(222L), eq(TimeUnit.MILLISECONDS));
+        verifyZeroInteractions(executor);
+        assertEquals("Active commands", 1, asyncExecutor.getSize());
+        assertTrue("Queuing result", result);
+    }
+
+    @Test
+    public void testSubmissionSuccessfulAfterDelay() {
+        when(callableWrapper.getInitialDelay()).thenReturn(100L);
+        when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
+        when(callableQueueService.canSubmitCallable(any(XCallable.class))).thenReturn(true);
+        configureMockScheduler();
+
+        asyncExecutor.queue(callableWrapper, false);
+
+        verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L),
+                eq(TimeUnit.MILLISECONDS));
+        verify(executor).execute(callableWrapper);
+    }
+
+    @Test
+    public void testSubmissionFailsAfterDelay() {
+        when(callableWrapper.getInitialDelay()).thenReturn(100L);
+        when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
+        configureMockScheduler();
+
+        asyncExecutor.queue(callableWrapper, false);
+
+        verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L),
+                eq(TimeUnit.MILLISECONDS));
+        verifyZeroInteractions(executor);
+    }
+
+    @Test
+    public void testSubmissionSuccessfulAfterDelayWhenMaxConcurrencyCheckDisabled() {
+        asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT);
+        when(callableWrapper.getInitialDelay()).thenReturn(100L);
+        when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
+        XCallable<?> wrappedCommand = mock(XCallable.class);
+        Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(wrappedCommand);
+        configureMockScheduler();
+
+        asyncExecutor.queue(callableWrapper, false);
+
+        verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L),
+                eq(TimeUnit.MILLISECONDS));
+        verify(executor).execute(eq(callableWrapper));
+    }
+
+    @Test
+    public void testCannotSubmitDueToFiltering() {
+        when(callableWrapper.filterDuplicates()).thenReturn(false);
+
+        boolean result = asyncExecutor.queue(callableWrapper, false);
+
+        verifyZeroInteractions(scheduledExecutor);
+        verifyZeroInteractions(executor);
+        assertEquals("Active commands", 0, asyncExecutor.getSize());
+        assertTrue("Queuing result", result);
+    }
+
+    @Test
+    public void testExceptionThrownDuringSubmission() {
+        doThrow(new RuntimeException()).when(executor).execute(any(Runnable.class));
+
+        boolean exceptionThrown = false;
+        try {
+            asyncExecutor.queue(callableWrapper, false);
+        } catch (RuntimeException e) {
+            exceptionThrown = true;
+        }
+
+        assertTrue("Exception was not thrown", exceptionThrown);
+        verify(callableWrapper).removeFromUniqueCallables();
+        verifyZeroInteractions(scheduledExecutor);
+    }
+
+    @Test
+    public void testSubmitWithNegativePriority() {
+        testIllegalPriority(-1);
+    }
+
+    @Test
+    public void testSubmitWithTooHighPriority() {
+        testIllegalPriority(MAX_PRIORITY + 1);
+    }
+
+    @Test
+    public void testQueueSizeWhenCommandIsFinished() {
+        CallableWrapper<?> delayedCommand = mock(CallableWrapper.class);
+        when(delayedCommand.getInitialDelay()).thenReturn(100L);
+        when(delayedCommand.filterDuplicates()).thenReturn(true);
+
+        asyncExecutor.queue(callableWrapper, false);
+        asyncExecutor.queue(delayedCommand, false);
+        int sizeAfterQueue = asyncExecutor.getSize();
+        asyncExecutor.commandFinished();
+        asyncExecutor.commandFinished();
+
+        assertEquals("Size after queue", 2, sizeAfterQueue);
+        assertEquals("Active commands", 0, asyncExecutor.getSize());
+    }
+
+    @Test
+    public void testQueueSizeWhenQueueIsFullDuringMaxConcurrencyCheck() {
+        XCallable<?> callable = mock(XCallable.class);
+        when(callable.getType()).thenReturn(DEFAULT_TYPE);
+        Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
+        when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true);
+        asyncExecutor.handleConcurrencyExceeded(callableWrapper);
+        activeCommands.set(20);
+
+        asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+        assertEquals("Active commands", 19, activeCommands.get());
+    }
+
+    @Test
+    public void testSubmissionWhenQueueIsFull() {
+        asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT);
+        callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
+        when(callableWrapper.filterDuplicates()).thenReturn(true);
+        when(callableWrapper.getElement().getKey()).thenReturn("key");
+
+        asyncExecutor.queue(callableWrapper, false);
+        asyncExecutor.queue(callableWrapper, false);
+        boolean finalResult = asyncExecutor.queue(callableWrapper, false);
+
+        assertFalse("Last submission shouldn't have succeeded", finalResult);
+        verify(executor, times(2)).execute(same(callableWrapper));
+    }
+
+    @Test
+    public void testSubmissionWhenQueueSizeIsIgnored() {
+        asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT);
+        callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
+        when(callableWrapper.filterDuplicates()).thenReturn(true);
+        when(callableWrapper.getElement().getKey()).thenReturn("key");
+
+        asyncExecutor.queue(callableWrapper, false);
+        asyncExecutor.queue(callableWrapper, false);
+        boolean finalResult = asyncExecutor.queue(callableWrapper, true);
+
+        assertTrue("Last submission should have succeeded", finalResult);
+        verify(executor, times(3)).execute(same(callableWrapper));
+    }
+
+    @Test
+    public void testPendingCommandSubmission() {
+        XCallable<?> callable = mock(XCallable.class);
+        when(callable.getType()).thenReturn(DEFAULT_TYPE);
+        Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
+        when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true);
+
+        asyncExecutor.handleConcurrencyExceeded(callableWrapper);
+        asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+        verify(executor).execute(eq(callableWrapper));
+        assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
+        Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
+        assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
+        assertEquals("List of pending commands should be empty", 0, pendingCommandsList.size());
+    }
+
+    @Test
+    public void testPendingCommandsWithSameType() {
+        XCallable<?> callable = mock(XCallable.class);
+        when(callable.getType()).thenReturn(DEFAULT_TYPE);
+        Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
+
+        XCallable<?> secondCallable = mock(XCallable.class);
+        when(secondCallable.getType()).thenReturn(DEFAULT_TYPE);
+        CallableWrapper<?> secondWrapper = mock(CallableWrapper.class);
+        Mockito.<XCallable<?>>when(secondWrapper.getElement()).thenReturn(secondCallable);
+
+        asyncExecutor.handleConcurrencyExceeded(callableWrapper);
+        asyncExecutor.handleConcurrencyExceeded(secondWrapper);
+
+        assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
+        Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
+        assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
+        assertEquals("List of pending commands", 2, pendingCommandsList.size());
+    }
+
+    @Test
+    public void testPendingCommandSubmissionWhenQueueIsFull() {
+        XCallable<?> callable = mock(XCallable.class);
+        when(callable.getType()).thenReturn(DEFAULT_TYPE);
+        Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
+
+        when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true);
+
+        activeCommands.set(10);
+        asyncExecutor.handleConcurrencyExceeded(callableWrapper);
+        asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+        verifyZeroInteractions(executor);
+        assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
+        Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
+        assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
+        assertEquals("List of pending commands should be empty", 0, pendingCommandsList.size());
+    }
+
+    @Test
+    public void testPendingCommandSubmissionWhenMaxConcurrencyReached() {
+        XCallable<?> callable = mock(XCallable.class);
+        when(callable.getType()).thenReturn(DEFAULT_TYPE);
+        Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
+        when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(false);
+
+        asyncExecutor.handleConcurrencyExceeded(callableWrapper);
+        asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+        verifyZeroInteractions(executor);
+        assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
+        Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
+        assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
+        assertEquals("List of pending commands list should not be empty", 1, pendingCommandsList.size());
+    }
+
+    @Test
+    public void testQueueDump() {
+        CallableWrapper<?> pendingCallable = mock(CallableWrapper.class);
+        CallableWrapper<?> waitingCallable = mock(CallableWrapper.class);
+        ScheduledXCallable delayedXCallable = mock(ScheduledXCallable.class);
+        @SuppressWarnings("unchecked")
+        AccessibleRunnableScheduledFuture<ScheduledXCallable> asrf = mock(AccessibleRunnableScheduledFuture.class);
+        Mockito.<CallableWrapper<?>>when(delayedXCallable.getCallableWrapper()).thenReturn(waitingCallable);
+        when(asrf.getTask()).thenReturn(delayedXCallable);
+        when(pendingCallable.toString()).thenReturn("pendingCallable");
+        when(waitingCallable.toString()).thenReturn("waitingCallable");
+        when(callableWrapper.toString()).thenReturn("callableWrapper");
+
+        priorityBlockingQueue.add(callableWrapper);
+        delayQueue.add(asrf);
+        pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(pendingCallable));
+
+        List<String> queueDump = asyncExecutor.getQueueDump();
+        assertEquals("Size", 3, queueDump.size());
+        assertTrue("PendingCallable not found", queueDump.contains("pendingCallable"));
+        assertTrue("WaitingCallable not found", queueDump.contains("waitingCallable"));
+        assertTrue("CallableWrapper not found", queueDump.contains("callableWrapper"));
+    }
+
+    @Test
+    public void testAntiStarvationWhenDelayIsAboveMaxWait() {
+        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500);
+        when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-40000L);
+        when(callableWrapper.getPriority()).thenReturn(0);
+        pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
+
+        asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+        verify(callableWrapper).setPriority(1);
+        verify(callableWrapper).setDelay(eq(0L), eq(TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testAntiStarvationWhenDelayIsBelowMaxWait() {
+        when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-200L);
+        when(callableWrapper.getPriority()).thenReturn(0);
+        pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
+
+        asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+        verify(callableWrapper, never()).setPriority(anyInt());
+        verify(callableWrapper, never()).setDelay(anyLong(), any(TimeUnit.class));
+    }
+
+    @Test
+    public void testAntiStarvationWhenPriorityIsHighest() {
+        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500);
+        when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-1000L);
+        when(callableWrapper.getPriority()).thenReturn(MAX_PRIORITY);
+        pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
+
+        asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+        verify(callableWrapper, never()).setPriority(anyInt());
+        verify(callableWrapper, never()).setDelay(anyLong(), any(TimeUnit.class));
+    }
+
+    @Test
+    public void testShutDown() throws InterruptedException {
+        when(executor.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
+        when(scheduledExecutor.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
+        asyncExecutor.shutdown();
+
+        verify(executor).shutdown();
+        verify(executor).awaitTermination(eq(1000L), eq(TimeUnit.MILLISECONDS));
+        verify(scheduledExecutor).shutdown();
+        verify(scheduledExecutor).awaitTermination(eq(1000L), eq(TimeUnit.MILLISECONDS));
+    }
+
+    private void testIllegalPriority(int prio) {
+        when(callableWrapper.getPriority()).thenReturn(prio);
+
+        boolean exceptionThrown = false;
+        Throwable cause = null;
+        try {
+            asyncExecutor.queue(callableWrapper, false);
+        } catch (RuntimeException e) {
+            exceptionThrown = true;
+            cause = e.getCause();
+        }
+
+        assertTrue("Exception was not thrown", exceptionThrown);
+        verifyZeroInteractions(scheduledExecutor);
+        verifyZeroInteractions(executor);
+        assertTrue("Illegal exception", cause instanceof IllegalArgumentException);
+        verify(callableWrapper).removeFromUniqueCallables();
+    }
+
+    private void configureMockScheduler() {
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                ScheduledXCallable target = (ScheduledXCallable) invocation.getArguments()[0];
+                target.run();
+                return null;
+            }
+        }).when(scheduledExecutor).schedule(any(ScheduledXCallable.class), any(Long.class),
+                any(TimeUnit.class));
+    }
+
+    private AsyncXCommandExecutor createExecutor(boolean needMaxConcurrencyCheck, int maxActiveCallables,
+            long maxWait) {
+        return new AsyncXCommandExecutor(needMaxConcurrencyCheck,
+                callableQueueService,
+                maxActiveCallables,
+                executor,
+                scheduledExecutor,
+                priorityBlockingQueue,
+                delayQueue,
+                pendingCommandsPerType,
+                activeCommands,
+                DEFAULT_MAXWAIT,
+                TEST_PRIORITIES);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
index 9c2a11d..5d546ff 100644
--- a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
@@ -25,16 +25,30 @@ import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.XCommand;
 import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.util.XCallable;
+import org.apache.oozie.util.XLog;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class TestCallableQueueService extends XTestCase {
     static AtomicLong EXEC_ORDER = new AtomicLong();
+    private static XLog log = XLog.getLog(TestCallableQueueService.class);
+    private AtomicLong counter = new AtomicLong();
+    private CountDownLatch finished = new CountDownLatch(1);
 
     public static class MyCallable implements XCallable<Void> {
         String type;
@@ -93,6 +107,8 @@ public class TestCallableQueueService extends XTestCase {
             StringBuilder sb = new StringBuilder();
             sb.append("Type:").append(getType());
             sb.append(",Priority:").append(getPriority());
+            sb.append(",Key:").append(getKey());
+            sb.append(",Wait:").append(wait);
             return sb.toString();
         }
 
@@ -409,20 +425,19 @@ public class TestCallableQueueService extends XTestCase {
             queueservice.queue(c, 10);
         }
 
-        float originalRatio = XTestCase.WAITFOR_RATIO;
-        try{
-            XTestCase.WAITFOR_RATIO = 1;
-            waitFor(2000, new Predicate() {
-                public boolean evaluate() throws Exception {
-                    return queueservice.queueSize() == 0;
+        waitFor(3000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                boolean completed = true;
+
+                for (MyCallable callable : callables) {
+                    completed &= (callable.executed != 0);
                 }
-            });
-        }
-        finally {
-            XTestCase.WAITFOR_RATIO = originalRatio;
-        }
 
-        System.out.println("Callable Queue Size :" + queueservice.queueSize());
+                completed &= (callableOther.executed != 0);
+
+                return completed;
+            }
+        });
 
         long last = Long.MIN_VALUE;
         for (MyCallable c : callables) {
@@ -887,7 +902,7 @@ public class TestCallableQueueService extends XTestCase {
     }
 
     public void testRemoveUniqueCallables() throws Exception {
-        XCommand command = new XCommand("Test", "type", 100) {
+        XCommand<?> command = new XCommand<Object>("Test", "type", 100) {
             @Override
             protected boolean isLockRequired() {
                 return false;
@@ -928,4 +943,255 @@ public class TestCallableQueueService extends XTestCase {
         uniquesAfter.removeAll(uniquesBefore);
         assertTrue(uniquesAfter.toString(), uniquesAfter.isEmpty());
     }
+
+    public void testPriorityExecutionOrder() throws InterruptedException, ServiceException {
+        Services.get().destroy();
+        setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+        setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "1000000");
+        new Services().init();
+
+        final int taskCount = 999_999;
+        Multimap<Integer, Long> executions = Multimaps.synchronizedMultimap(ArrayListMultimap.create());
+        List<BookingCallable> callables = new ArrayList<>(taskCount);
+
+        for (int i = 2; i >= 0; i--) {
+            String type = String.valueOf(i);
+            for (int j = 0; j < taskCount / 3; j++) {
+                String key = type + "_" + UUID.randomUUID().toString();
+                BookingCallable dc = new BookingCallable(executions, taskCount, key, type, i, 0);
+                callables.add(dc);
+            }
+        }
+
+        CallableQueueService queueservice = Services.get().get(CallableQueueService.class);
+
+        for (int i = 0; i < taskCount; i++) {
+            queueservice.queue(callables.get(i));
+        }
+
+        try {
+            finished.await(10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("Error", e);
+            fail("Exception during test: " + e.getMessage());
+        }
+        // It's necessary because after finished.await() returns, the last XCallable
+        // could still be running
+        waitFor(1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return queueservice.queueSize() == 0;
+            }
+        });
+
+        Map<Integer, Long> minTime = new HashMap<>();
+        Map<Integer, Long> maxTime = new HashMap<>();
+
+        for (Map.Entry<Integer, Collection<Long>> entry : executions.asMap().entrySet()) {
+            int prio = entry.getKey();
+            Collection<Long> values = entry.getValue();
+            minTime.put(prio, Collections.min(values));
+            maxTime.put(prio, Collections.max(values));
+        }
+
+        // Expected timeline of execution times:
+        // --> [min] Prio #2 [max] --> [min] Prio #1 [max] --> [min] Prio #0 [max]
+
+        assertTrue("Failed: maxTime prio #2: " + maxTime.get(2) + " / minTime prio #1: " + minTime.get(1),
+                maxTime.get(2) <= minTime.get(1));
+        assertTrue("Failed: maxTime prio #1: " + maxTime.get(1) + " / minTime prio #0: " + minTime.get(0),
+                maxTime.get(1) <= minTime.get(0));
+    }
+
+    public void testMaxConcurrencyReached() throws Exception {
+        Services.get().destroy();
+        setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "100000");
+        new Services().init();
+
+        int partitions = 10;
+        int taskPerPartition = 10000;
+
+        final int taskCount = partitions * taskPerPartition;
+
+        List<DummyCallable> callables = new ArrayList<>(taskCount);
+
+        for (int i = 0; i < partitions; i++) {
+            String type = String.valueOf(i);
+            for (int j = 0; j < taskPerPartition; j++) {
+                String key = type + "_" + UUID.randomUUID().toString();
+                DummyCallable dc = new DummyCallable(taskCount, key, type, 0, 0);
+                callables.add(dc);
+            }
+        }
+
+        CallableQueueService queueservice = Services.get().get(CallableQueueService.class);
+
+        for (int i = 0; i < taskCount; i++) {
+            queueservice.queue(callables.get(i));
+        }
+
+        try {
+            finished.await(100, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("Error", e);
+            fail("Exception during test: " + e.getMessage());
+        }
+
+        assertEquals("Not all callables have been executed", counter.get(), taskCount);
+    }
+
+    public void testQueueSizeWithDelayedElements() throws InterruptedException {
+        final int taskCount = 10_000;
+
+        List<DummyCallable> callables = new ArrayList<>(taskCount);
+        for (int i = 0; i < taskCount; i++) {
+            String keyAndType = String.valueOf(i);
+            DummyCallable dc = new DummyCallable(taskCount, keyAndType, keyAndType, 0, 0);
+            callables.add(dc);
+        }
+
+        CallableQueueService queueservice = Services.get().get(CallableQueueService.class);
+
+        for (int i = 0; i < taskCount; i++) {
+            queueservice.queue(callables.get(i), 2000);
+        }
+
+        int queueSizeAfterSubmission = queueservice.queueSize();
+
+        try {
+            finished.await(10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("Error", e);
+            fail("Exception during test: " + e.getMessage());
+        }
+
+        assertEquals("Queue size after submission", taskCount, queueSizeAfterSubmission);
+        assertEquals("Queue size after execution", 0, queueservice.queueSize());
+    }
+
+    public void testQueueSizeAfterNormalSubmission() throws InterruptedException {
+        final int taskCount = 10_000;
+
+        List<DummyCallable> callables = new ArrayList<>(taskCount);
+        for (int i = 0; i < taskCount; i++) {
+            String keyAndType = String.valueOf(i);
+            DummyCallable dc = new DummyCallable(taskCount, keyAndType, keyAndType, 0, 0);
+            callables.add(dc);
+        }
+
+        CallableQueueService queueservice = Services.get().get(CallableQueueService.class);
+
+        for (int i = 0; i < taskCount; i++) {
+            queueservice.queue(callables.get(i));
+        }
+
+        // Not an exact number - it's close to 10,000 but keeps fluctuating
+        // We can still verify that it's larger than a certain number though
+        int queueSizeAfterSubmission = queueservice.queueSize();
+        try {
+            finished.await(10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("Error", e);
+            fail("Exception during test: " + e.getMessage());
+        }
+        // It's necessary because after finished.await() returns, the last XCallable
+        // could still be running
+        waitFor(1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return queueservice.queueSize() == 0;
+            }
+        });
+
+        assertTrue("Too few elements in the queue: " + queueSizeAfterSubmission + ", should be >9000",
+                queueSizeAfterSubmission > 9000);
+        assertEquals("Queue size after execution", 0, queueservice.queueSize());
+    }
+
+    public void testQueueSizeWhenMaxConcurrencyIsReached() throws InterruptedException {
+        int partitions = 10;
+        int taskPerPartition = 1000;
+
+        final int taskCount = partitions * taskPerPartition;
+
+        List<DummyCallable> callables = new ArrayList<>(taskCount);
+
+        for (int i = 0; i < partitions; i++) {
+            String type = String.valueOf(i);
+            for (int j = 0; j < taskPerPartition; j++) {
+                String key = type + "_" + UUID.randomUUID().toString();
+                DummyCallable dc = new DummyCallable(taskCount, key, type, 0, 0);
+                callables.add(dc);
+            }
+        }
+
+        CallableQueueService queueservice = Services.get().get(CallableQueueService.class);
+
+        for (int i = 0; i < taskCount; i++) {
+            queueservice.queue(callables.get(i));
+        }
+
+        // Not an exact number - it's close to 10,000 but keeps fluctuating
+        // We can still verify that it's larger than a certain number though
+        int queueSizeAfterSubmission = queueservice.queueSize();
+        try {
+            finished.await(10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("Error", e);
+            fail("Exception during test: " + e.getMessage());
+        }
+        // It's necessary because after finished.await() returns, the last XCallable
+        // could still be running
+        waitFor(1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return queueservice.queueSize() == 0;
+            }
+        });
+
+        assertTrue("Too few elements in the queue: " + queueSizeAfterSubmission + ", should be >9000",
+                queueSizeAfterSubmission > 9000);
+        assertEquals("Queue size after execution", 0, queueservice.queueSize());
+    }
+
+    private class DummyCallable extends MyCallable {
+        private final int taskCount;
+
+        public DummyCallable(int taskCount, String key, String type, int priority, int wait) {
+            super(key, type, priority, wait);
+            this.taskCount = taskCount;
+        }
+
+        public Void call() throws Exception {
+            if (counter.incrementAndGet() == taskCount) {
+                finished.countDown();
+            }
+
+            return null;
+        }
+    }
+
+    private class BookingCallable extends MyCallable {
+        private final int taskCount;
+        private final Multimap<Integer, Long> executions;
+
+        public BookingCallable(Multimap<Integer, Long> executions,
+                int taskCount,
+                String key,
+                String type,
+                int priority,
+                int wait) {
+            super(key, type, priority, wait);
+            this.taskCount = taskCount;
+            this.executions = executions;
+        }
+
+        public Void call() throws Exception {
+            executions.put(getPriority(), System.currentTimeMillis());
+            if (counter.incrementAndGet() == taskCount) {
+                finished.countDown();
+            }
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 78f25e7..a16b3f9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3160 PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (pbacsko)
 OOZIE-2877 Git action (clayb, andras.piros via pbacsko, gezapeti)
 OOZIE-3061 Kill only those child jobs which are not already killed (matijhs via gezapeti, andras.piros)
 OOZIE-3155 [ui] Job DAG is not refreshed when a job is finished (asalamon74 via andras.piros)