You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2018/09/06 12:44:03 UTC
oozie git commit: OOZIE-3160 PriorityDelayQueue put()/take() can
cause significant CPU load due to busy waiting (pbacsko)
Repository: oozie
Updated Branches:
refs/heads/master fe2da6e57 -> 13bfd4949
OOZIE-3160 PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (pbacsko)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/13bfd494
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/13bfd494
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/13bfd494
Branch: refs/heads/master
Commit: 13bfd49495d095f51d22d0f51042690837b701a4
Parents: fe2da6e
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Thu Sep 6 14:42:59 2018 +0200
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Thu Sep 6 14:42:59 2018 +0200
----------------------------------------------------------------------
.../oozie/service/AsyncXCommandExecutor.java | 412 +++++++++++++++++
.../oozie/service/CallableQueueService.java | 145 ++++--
.../apache/oozie/util/PriorityDelayQueue.java | 26 ++
core/src/main/resources/oozie-default.xml | 25 +
.../service/TestAsyncXCommandExecutor.java | 462 +++++++++++++++++++
.../oozie/service/TestCallableQueueService.java | 292 +++++++++++-
release-log.txt | 1 +
7 files changed, 1308 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
new file mode 100644
index 0000000..b18a37a
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.oozie.service.CallableQueueService.CallableWrapper;
+import org.apache.oozie.util.NamedThreadFactory;
+import org.apache.oozie.util.XCallable;
+import org.apache.oozie.util.XLog;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressWarnings("deprecation")
+public class AsyncXCommandExecutor {
+ public static final int MIN_PRIORITY = 0;
+ public static final long ANTI_STARVATION_INTERVAL = 500;
+ private static XLog log = XLog.getLog(AsyncXCommandExecutor.class);
+ private final ThreadPoolExecutor executor;
+ private final ScheduledThreadPoolExecutor scheduledExecutor;
+ private final boolean needConcurrencyCheck;
+ private final CallableQueueService callableQueueService;
+ private final AtomicInteger activeCommands;
+ private final long maxActiveCommands; // equivalent of "queueSize" in CQS
+ private final long maxWait;
+ private final long maxPriority;
+
+ private final BlockingQueue<CallableWrapper<?>> priorityBlockingQueue;
+ private final BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayWorkQueue;
+ private final ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType;
+ private long lastAntiStarvationCheck = 0;
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @SuppressFBWarnings( value = "SIC_INNER_SHOULD_BE_STATIC_ANON",
+ justification = "Unnecessary to refactor innen classes defined here")
+ public AsyncXCommandExecutor(int threads,
+ int delayedCallableThreads,
+ boolean needConcurrencyCheck,
+ CallableQueueService callableAccess,
+ long maxActiveCommands,
+ long maxWait,
+ int priorities) {
+
+ priorityBlockingQueue = new PriorityBlockingQueue<CallableWrapper<?>>(100, new PriorityComparator());
+
+ executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS,
+ (BlockingQueue) priorityBlockingQueue,
+ new NamedThreadFactory("CallableQueue")) {
+ protected void beforeExecute(Thread t, Runnable r) {
+ XLog.Info.get().clear();
+ }
+
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ return (RunnableFuture<T>)callable;
+ }
+ };
+
+ this.scheduledExecutor = new ScheduledThreadPoolExecutor(delayedCallableThreads,
+ new NamedThreadFactory("ScheduledCallable")) {
+ protected <V> RunnableScheduledFuture<V> decorateTask(
+ Runnable runnable, RunnableScheduledFuture<V> task) {
+
+ AccessibleRunnableScheduledFuture<V> arsf =
+ new AccessibleRunnableScheduledFuture<>(task, runnable);
+
+ return arsf;
+ }
+ };
+
+ this.delayWorkQueue = (BlockingQueue) scheduledExecutor.getQueue();
+ this.needConcurrencyCheck = needConcurrencyCheck;
+ this.callableQueueService = callableAccess;
+ this.maxActiveCommands = maxActiveCommands;
+ this.maxWait = maxWait;
+ this.activeCommands = new AtomicInteger(0);
+ this.pendingCommandsPerType = new ConcurrentHashMap<>();
+ Preconditions.checkArgument(priorities > 0, "Number of priorities must be >0");
+ this.maxPriority = priorities - 1;
+ }
+
+ @VisibleForTesting
+ AsyncXCommandExecutor(boolean needConcurrencyCheck,
+ CallableQueueService callableAccess,
+ long maxActiveCommands,
+ ThreadPoolExecutor executor,
+ ScheduledThreadPoolExecutor scheduledExecutor,
+ PriorityBlockingQueue<CallableWrapper<?>> priorityBlockingQueue,
+ BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayQueue,
+ ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType,
+ AtomicInteger activeCommands,
+ long maxWait,
+ long priorities) {
+
+ this.priorityBlockingQueue = priorityBlockingQueue;
+ this.delayWorkQueue = delayQueue;
+ this.pendingCommandsPerType = pendingCommandsPerType;
+ this.executor = executor;
+ this.scheduledExecutor = scheduledExecutor;
+ this.needConcurrencyCheck = needConcurrencyCheck;
+ this.callableQueueService = callableAccess;
+ this.maxActiveCommands = maxActiveCommands;
+ this.activeCommands = activeCommands;
+ this.maxWait = maxWait;
+ this.maxPriority = priorities - 1;
+ }
+
+ public synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) {
+ if (!ignoreQueueSize && activeCommands.get() >= maxActiveCommands) {
+ log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
+ return false;
+ }
+
+ if (wrapper.filterDuplicates()) {
+ wrapper.addToUniqueCallables();
+
+ int priority = wrapper.getPriority();
+ long initialDelay = wrapper.getInitialDelay();
+
+ try {
+ if (priority > maxPriority || priority < MIN_PRIORITY) {
+ throw new IllegalArgumentException("priority out of range: " + priority);
+ }
+
+ if (initialDelay == 0) {
+ executor.execute(wrapper);
+ } else {
+ ScheduledXCallable scheduledXCallable = new ScheduledXCallable(wrapper);
+ long schedDelay = wrapper.getDelay(TimeUnit.MILLISECONDS);
+ scheduledExecutor.schedule(scheduledXCallable,
+ schedDelay, TimeUnit.MILLISECONDS);
+ }
+
+ activeCommands.incrementAndGet();
+ } catch (Throwable ree) {
+ wrapper.removeFromUniqueCallables();
+ throw new RuntimeException(ree);
+ }
+ }
+
+ return true;
+ }
+
+ public void handleConcurrencyExceeded(CallableWrapper<?> command) {
+ String type = command.getElement().getType();
+
+ Set<CallableWrapper<?>> commandsForType = pendingCommandsPerType.get(type);
+ if (commandsForType == null) {
+ commandsForType = new ConcurrentHashSet<>();
+ Set<CallableWrapper<?>> oldCommandForType;
+ oldCommandForType = pendingCommandsPerType.putIfAbsent(type, commandsForType);
+
+ if (oldCommandForType != null) {
+ // a different thread was faster
+ commandsForType = oldCommandForType;
+ }
+ }
+
+ commandsForType.add(command);
+ }
+
+ public void checkMaxConcurrency(String type) {
+ Set<CallableWrapper<?>> commandsForType = pendingCommandsPerType.get(type);
+
+ if (commandsForType != null) {
+ // Only a single thread should be doing stuff here! Reason: concurrent executions might
+ // submit an eligible XCallable multiple times, which must be avoided.
+ synchronized (commandsForType) {
+ boolean doAntiStarvation = false;
+ int priorityModified = 0;
+ long now = System.currentTimeMillis();
+ if (now - lastAntiStarvationCheck > ANTI_STARVATION_INTERVAL) {
+ doAntiStarvation = true;
+ }
+
+ for (Iterator<CallableWrapper<?>> itr = commandsForType.iterator(); itr.hasNext();) {
+ CallableWrapper<?> command = itr.next();
+
+ // Anti-starvation logic: try to promote callables that have been waiting for too long
+ int currentPrio = command.getPriority();
+ if (doAntiStarvation
+ && command.getDelay(TimeUnit.MILLISECONDS) < -maxWait
+ && currentPrio < maxPriority) {
+ command.setDelay(0, TimeUnit.MILLISECONDS);
+ command.setPriority(++currentPrio);
+ priorityModified++;
+ }
+
+ if (callableQueueService.canSubmitCallable(command.getElement())) {
+ if (activeCommands.get() >= maxActiveCommands) {
+ log.warn("queue full, ignoring queuing for [{0}]", command.getElement().getKey());
+ activeCommands.decrementAndGet();
+ } else {
+ executor.execute(command);
+ }
+
+ itr.remove();
+ }
+ }
+
+ if (doAntiStarvation) {
+ lastAntiStarvationCheck = System.currentTimeMillis();
+ }
+
+ if (priorityModified > 0) {
+ log.debug("Anti-starvation: handled [{0}] elements", priorityModified);
+ }
+ }
+ }
+ }
+
+ public void commandFinished() {
+ // Note: this is to track the number of elements. Otherwise we'd have to combine the size of
+ // two queues + a list.
+ activeCommands.decrementAndGet();
+ }
+
+ public ThreadPoolExecutor getExecutorService() {
+ return executor;
+ }
+
+ public void shutdown() {
+ try {
+ shutdownExecutor(executor, "executor");
+ shutdownExecutor(scheduledExecutor, "scheduled executor");
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while waiting for executor shutdown");
+ }
+ }
+
+ public List<String> getQueueDump() {
+ List<CallableWrapper<?>> copyOfPending = new ArrayList<>(100);
+ List<String> queueDump = new ArrayList<>(100);
+
+ // Safe to iterate
+ for (Map.Entry<String, Set<CallableWrapper<?>>> entry : pendingCommandsPerType.entrySet()) {
+ Set<CallableWrapper<?>> pendingCommandsPerType = entry.getValue();
+ copyOfPending.addAll(pendingCommandsPerType);
+ }
+
+ // Safe to iterate
+ for (final CallableWrapper<?> wrapper : priorityBlockingQueue) {
+ queueDump.add(wrapper.toString());
+ }
+
+ // Safe to iterate
+ for (final AccessibleRunnableScheduledFuture<ScheduledXCallable> future : delayWorkQueue) {
+ ScheduledXCallable delayedXCallable = (ScheduledXCallable) future.getTask();
+ queueDump.add(delayedXCallable.getCallableWrapper().toString());
+ }
+
+ for (final CallableWrapper<?> wrapper : copyOfPending) {
+ queueDump.add(wrapper.toString());
+ }
+
+ return queueDump;
+ }
+
+ public int getSize() {
+ return activeCommands.get();
+ }
+
+ public class ScheduledXCallable implements Runnable {
+ private CallableWrapper<?> target;
+
+ public ScheduledXCallable(CallableWrapper<?> target) {
+ this.target = target;
+ }
+
+ @Override
+ public void run() {
+ if (needConcurrencyCheck && !callableQueueService.canSubmitCallable(target.getElement())) {
+ XCallable<?> callable = target.getElement();
+ handleConcurrencyExceeded(target);
+
+ // need this to deal with a special race condition: we detect that concurrency
+ // exceeded, but an XCommand (or more!) with the same type just happens to finish. If that
+ // happens, this callable might never get scheduled again (or much later), so we have to guard
+ // against this condition.
+ checkMaxConcurrency(callable.getType());
+ } else {
+ executor.execute(target);
+ }
+ }
+
+ public CallableWrapper<?> getCallableWrapper() {
+ return target;
+ }
+ }
+
+ @SuppressFBWarnings(value = "SE_COMPARATOR_SHOULD_BE_SERIALIZABLE",
+ justification = "PriorityBlockingQueue which uses this comparator will never be serialized")
+ public static class PriorityComparator implements Comparator<CallableWrapper<?>> {
+ @Override
+ public int compare(CallableWrapper<?> o1, CallableWrapper<?> o2) {
+ return Integer.compare(o2.getPriority(), o1.getPriority());
+ }
+ }
+
+ // We have to use this so that scheduled elements in the DelayWorkQueue are accessible
+ @SuppressFBWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
+ justification = "This class has a natural ordering (expiration) which is inconsistent with equals")
+ public static class AccessibleRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
+ private final Runnable task;
+ private RunnableScheduledFuture<V> originalFuture;
+
+ public AccessibleRunnableScheduledFuture(RunnableScheduledFuture<V> originalFuture,
+ Runnable task) {
+ this.task = task;
+ this.originalFuture = originalFuture;
+ }
+
+ @Override
+ public void run() {
+ originalFuture.run();
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return originalFuture.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return originalFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return originalFuture.isDone();
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ return originalFuture.get();
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return originalFuture.get(timeout, unit);
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return originalFuture.getDelay(unit);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ return originalFuture.compareTo(o);
+ }
+
+ @Override
+ public boolean isPeriodic() {
+ return originalFuture.isPeriodic();
+ }
+
+ public Runnable getTask() {
+ return task;
+ }
+ }
+
+ private void shutdownExecutor(ExecutorService executor, String name) throws InterruptedException {
+ long limit = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30);
+ executor.shutdown();
+ while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
+ log.info("Waiting for [{0}] to shutdown", name);
+ if (System.currentTimeMillis() > limit) {
+ log.warn("Gave up, continuing without waiting for executor to shutdown");
+ break;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index ef8d58d..dc9a099 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -72,6 +72,7 @@ import com.google.common.collect.ImmutableSet;
* execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
* sometime in the future.
*/
+@SuppressWarnings("deprecation")
public class CallableQueueService implements Service, Instrumentable {
private static final String INSTRUMENTATION_GROUP = "callablequeue";
private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
@@ -85,14 +86,17 @@ public class CallableQueueService implements Service, Instrumentable {
public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
public static final String CONF_THREADS = CONF_PREFIX + "threads";
+ public static final String CONF_OLDIMPL = CONF_PREFIX + "queue.oldImpl";
+ public static final String CONF_DELAYED_CALLABLE_THREADS = CONF_PREFIX + "delayedcallable.threads";
public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
public static final int CONCURRENCY_DELAY = 500;
-
public static final int SAFE_MODE_DELAY = 60000;
+ public static final int MAX_CALLABLE_WAITTIME_MS = 30_000;
+ public static final int PRIORITIES = 3;
private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
@@ -131,6 +135,11 @@ public class CallableQueueService implements Service, Instrumentable {
counter.decrementAndGet();
}
}
+
+ if (!oldImpl) {
+ asyncXCommandExecutor.commandFinished();
+ asyncXCommandExecutor.checkMaxConcurrency(callable.getType());
+ }
}
private boolean callableReachMaxConcurrency(XCallable<?> callable) {
@@ -146,6 +155,21 @@ public class CallableQueueService implements Service, Instrumentable {
}
}
+ public boolean canSubmitCallable(XCallable<?> callable) {
+ synchronized (activeCallables) {
+ AtomicInteger counter = activeCallables.get(callable.getType());
+ if (counter == null) {
+ counter = new AtomicInteger(1);
+ activeCallables.put(callable.getType(), counter);
+ return true;
+ }
+ else {
+ int i = counter.get();
+ return i <= maxCallableConcurrency;
+ }
+ }
+ }
+
// Callables are wrapped with the this wrapper for execution, for logging
// and instrumentation.
// The wrapper implements Runnable and Comparable to be able to work with an
@@ -414,9 +438,11 @@ public class CallableQueueService implements Service, Instrumentable {
private XLog log = XLog.getLog(getClass());
private int queueSize;
- private PriorityDelayQueue<CallableWrapper> queue;
+ private PriorityDelayQueue<CallableWrapper<?>> queue;
private ThreadPoolExecutor executor;
private Instrumentation instrumentation;
+ private boolean oldImpl = false;
+ private AsyncXCommandExecutor asyncXCommandExecutor;
/**
* Convenience method for instrumentation counters.
@@ -458,7 +484,10 @@ public class CallableQueueService implements Service, Instrumentable {
interruptTypes = ImmutableSet.copyOf(interruptTypes);
if (!callableNextEligible) {
- queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
+ queue = new PriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,
+ MAX_CALLABLE_WAITTIME_MS,
+ TimeUnit.MILLISECONDS,
+ queueSize) {
@Override
protected void debug(String msgTemplate, Object... msgArgs) {
log.trace(msgTemplate, msgArgs);
@@ -471,7 +500,10 @@ public class CallableQueueService implements Service, Instrumentable {
// which has not yet reach max concurrency.Overrided method
// 'eligibleToPoll' to check if the
// element of this queue has reached the maximum concurrency.
- queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
+ queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,
+ MAX_CALLABLE_WAITTIME_MS,
+ TimeUnit.MILLISECONDS,
+ queueSize) {
@Override
protected void debug(String msgTemplate, Object... msgArgs) {
log.trace(msgTemplate, msgArgs);
@@ -493,28 +525,45 @@ public class CallableQueueService implements Service, Instrumentable {
interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE);
+ oldImpl = ConfigurationService.getBoolean(CONF_OLDIMPL, false);
+ log.info("Using old queue implementation: [{0}]", oldImpl);
+
+ if (oldImpl) {
+ executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
+ new NamedThreadFactory("CallableQueue")) {
+ protected void beforeExecute(Thread t, Runnable r) {
+ super.beforeExecute(t,r);
+ XLog.Info.get().clear();
+ }
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ return (RunnableFuture<T>)callable;
+ }
+ };
+ } else {
+ int delayedCallableThreads = ConfigurationService.getInt(CONF_DELAYED_CALLABLE_THREADS, 1);
+
+ asyncXCommandExecutor = new AsyncXCommandExecutor(threads,
+ delayedCallableThreads,
+ callableNextEligible,
+ this,
+ queueSize,
+ MAX_CALLABLE_WAITTIME_MS,
+ PRIORITIES);
+
+ executor = asyncXCommandExecutor.getExecutorService();
+ }
+
// IMPORTANT: The ThreadPoolExecutor does not always the execute
// commands out of the queue, there are
// certain conditions where commands are pushed directly to a thread.
// As we are using a queue with DELAYED semantics (i.e. execute the
// command in 5 mins) we need to make
// sure that the commands are always pushed to the queue.
- // To achieve this (by looking a the ThreadPoolExecutor.execute()
+ // To achieve this (by looking at the ThreadPoolExecutor.execute()
// implementation, we are making the pool
// minimum size equals to the maximum size (thus threads are keep always
// running) and we are warming up
// all those threads (the for loop that runs dummy runnables).
- executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
- new NamedThreadFactory("CallableQueue")) {
- protected void beforeExecute(Thread t, Runnable r) {
- super.beforeExecute(t,r);
- XLog.Info.get().clear();
- }
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- return (RunnableFuture<T>)callable;
- }
- };
-
for (int i = 0; i < threads; i++) {
executor.execute(new Runnable() {
public void run() {
@@ -547,6 +596,10 @@ public class CallableQueueService implements Service, Instrumentable {
break;
}
}
+
+ if (!oldImpl) {
+ asyncXCommandExecutor.shutdown();
+ }
}
catch (InterruptedException ex) {
log.warn(ex);
@@ -567,29 +620,34 @@ public class CallableQueueService implements Service, Instrumentable {
* @return int size of queue
*/
public synchronized int queueSize() {
- return queue.size();
+ return oldImpl ? queue.size() : asyncXCommandExecutor.getSize();
}
- private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) {
- if (!ignoreQueueSize && queue.size() >= queueSize) {
- log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
- return false;
- }
- if (!executor.isShutdown()) {
- if (wrapper.filterDuplicates()) {
- wrapper.addToUniqueCallables();
- try {
- executor.execute(wrapper);
- }
- catch (Throwable ree) {
- wrapper.removeFromUniqueCallables();
- throw new RuntimeException(ree);
+ private synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) {
+ if (oldImpl) {
+ if (!ignoreQueueSize && queue.size() >= queueSize) {
+ log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
+ return false;
+ }
+ if (!executor.isShutdown()) {
+ if (wrapper.filterDuplicates()) {
+ wrapper.addToUniqueCallables();
+ try {
+ executor.execute(wrapper);
+ }
+ catch (Throwable ree) {
+ wrapper.removeFromUniqueCallables();
+ throw new RuntimeException(ree);
+ }
}
}
+ else {
+ log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
+ }
+ } else {
+ asyncXCommandExecutor.queue(wrapper, ignoreQueueSize);
}
- else {
- log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
- }
+
return true;
}
@@ -637,7 +695,7 @@ public class CallableQueueService implements Service, Instrumentable {
}
else {
checkInterruptTypes(callable);
- queued = queue(new CallableWrapper(callable, delay), false);
+ queued = queue(new CallableWrapper<>(callable, delay), false);
if (queued) {
incrCounter(INSTR_QUEUED_COUNTER, 1);
}
@@ -762,14 +820,18 @@ public class CallableQueueService implements Service, Instrumentable {
* @return the list of string that representing each CallableWrapper
*/
public List<String> getQueueDump() {
- List<String> list = new ArrayList<String>();
- for (QueueElement<CallableWrapper> qe : queue) {
- if (qe.toString() == null) {
- continue;
+ if (oldImpl) {
+ List<String> list = new ArrayList<String>();
+ for (QueueElement<CallableWrapper<?>> qe : queue) {
+ if (qe.toString() == null) {
+ continue;
+ }
+ list.add(qe.toString());
}
- list.add(qe.toString());
+ return list;
+ } else {
+ return asyncXCommandExecutor.getQueueDump();
}
- return list;
}
/**
@@ -794,5 +856,4 @@ public class CallableQueueService implements Service, Instrumentable {
public Set<String> getInterruptTypes() {
return interruptTypes;
}
-
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
index 75c2069..365f918 100644
--- a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
+++ b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
@@ -51,7 +51,10 @@ import java.util.concurrent.locks.ReentrantLock;
* <p>
* This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and
* seeking operations. This check is performed, the most every 1/2 second.
+ *
+ * @deprecated this implementation will be removed in the future and AsyncCommandExecutor will be used.
*/
+@Deprecated
public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.QueueElement<E>>
implements BlockingQueue<PriorityDelayQueue.QueueElement<E>> {
@@ -65,6 +68,7 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
private int priority;
private long baseTime;
boolean inQueue;
+ private long initialDelay;
/**
* Create an Element wrapper.
@@ -88,6 +92,7 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
this.element = element;
this.priority = priority;
setDelay(delay, unit);
+ this.initialDelay = delay;
}
/**
@@ -100,6 +105,15 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
}
/**
+ * Sets the priority of the element.
+ *
+ * @param priority the priority of the element
+ */
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ /**
* Return the priority of the element.
*
* @return the priority of the element.
@@ -116,6 +130,7 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
*/
public void setDelay(long delay, TimeUnit unit) {
baseTime = System.currentTimeMillis() + unit.toMillis(delay);
+ initialDelay = delay;
}
/**
@@ -130,6 +145,17 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
}
/**
+ * Returns the original delay of the element. As time goes on, this value remains static,
+ * as opposed to getDelay(), where the delay depends on how much time has passed since the
+ * creation.
+ *
+ * @return the initial delay of this element in milliseconds.
+ */
+ public long getInitialDelay() {
+ return initialDelay;
+ }
+
+ /**
* Compare the age of this wrapper element with another. The priority is not used for the comparision.
*
* @param o the other wrapper element to compare with.
http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index c354f02..c3bcdfc 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -504,6 +504,31 @@
</property>
<property>
+ <name>oozie.service.CallableQueueService.delayedcallable.threads</name>
+ <value>1</value>
+ <description>
+ The number of threads where delayed tasks are executed. Upon expiration, the tasks are immediately
+ inserted into the main queue to properly handle priorities. This means that no actual business logic
+ is executed in this thread pool, so under normal circumstances, this value can be set to a low number.
+
+ Note that this property is completely unrelated to oozie.service.SchedulerService.threads which
+ tells how many scheduled background tasks can run in parallel at the same time (like PurgeService,
+ StatusTransitService, etc).
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.CallableQueueService.queue.oldImpl</name>
+ <value>false</value>
+ <description>
+ If set to false, then CallableQueueService will use a more performant, less CPU-intensive
+ queuing mechanism to execute asynchronous tasks internally. The old implementation generates
+ noticeable CPU load even if Oozie is completely idle, especially when oozie.service.CallableQueueService.threads
+ set to a large number. The previous queuing mechanism is kept as a fallback option.
+ </description>
+ </property>
+
+ <property>
<name>oozie.service.CallableQueueService.callable.concurrency</name>
<value>3</value>
<description>
http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
new file mode 100644
index 0000000..f9ec4d6
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.oozie.service.AsyncXCommandExecutor.AccessibleRunnableScheduledFuture;
+import org.apache.oozie.service.AsyncXCommandExecutor.ScheduledXCallable;
+import org.apache.oozie.service.CallableQueueService.CallableWrapper;
+import org.apache.oozie.util.XCallable;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Sets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.times;
+import static org.mockito.Matchers.same;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+
+@RunWith(MockitoJUnitRunner.class)
+@SuppressWarnings("deprecation")
+public class TestAsyncXCommandExecutor {
+ private static final String DEFAULT_TYPE = "test";
+ private static final int DEFAULT_MAX_ACTIVE_COMMANDS = 5;
+ private static final boolean DEFAULT_ENABLE_CONCURRENCY_CHECK = true;
+ private static final long DEFAULT_MAXWAIT = 30_000;
+ private static final int TEST_PRIORITIES = 5;
+ private static final int MAX_PRIORITY = TEST_PRIORITIES - 1;
+
+ @Mock
+ private ThreadPoolExecutor executor;
+
+ @Mock
+ private ScheduledThreadPoolExecutor scheduledExecutor;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private CallableWrapper<?> callableWrapper;
+
+ @Mock
+ private CallableQueueService callableQueueService;
+
+ private PriorityBlockingQueue<CallableWrapper<?>> priorityBlockingQueue;
+ private BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayQueue;
+ private ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType;
+ private AtomicInteger activeCommands;
+ private AsyncXCommandExecutor asyncExecutor;
+
+ @Before
+ public void setup() {
+ activeCommands = new AtomicInteger(0);
+ priorityBlockingQueue = new PriorityBlockingQueue<>();
+ pendingCommandsPerType = new ConcurrentHashMap<>();
+ delayQueue = new LinkedBlockingQueue<>(); // in reality it's not LBQ, but it's fine here
+ asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT);
+ when(callableWrapper.filterDuplicates()).thenReturn(true);
+ when(callableWrapper.getElement().getKey()).thenReturn("key");
+ when(callableWrapper.getElement().getType()).thenReturn(DEFAULT_TYPE);
+ }
+
+ @Test
+ public void testSubmitCallableWithNoDelay() {
+ boolean result = asyncExecutor.queue(callableWrapper, false);
+
+ verify(executor).execute(same(callableWrapper));
+ verifyZeroInteractions(scheduledExecutor);
+ assertEquals("Active commands", 1, asyncExecutor.getSize());
+ assertTrue("Queuing result", result);
+ }
+
+ @Test
+ public void testSubmitCallableWithDelay() {
+ when(callableWrapper.getInitialDelay()).thenReturn(111L);
+ when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(222L);
+
+ boolean result = asyncExecutor.queue(callableWrapper, false);
+
+ verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(222L), eq(TimeUnit.MILLISECONDS));
+ verifyZeroInteractions(executor);
+ assertEquals("Active commands", 1, asyncExecutor.getSize());
+ assertTrue("Queuing result", result);
+ }
+
+ @Test
+ public void testSubmissionSuccessfulAfterDelay() {
+ when(callableWrapper.getInitialDelay()).thenReturn(100L);
+ when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
+ when(callableQueueService.canSubmitCallable(any(XCallable.class))).thenReturn(true);
+ configureMockScheduler();
+
+ asyncExecutor.queue(callableWrapper, false);
+
+ verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L),
+ eq(TimeUnit.MILLISECONDS));
+ verify(executor).execute(callableWrapper);
+ }
+
+ @Test
+ public void testSubmissionFailsAfterDelay() {
+ when(callableWrapper.getInitialDelay()).thenReturn(100L);
+ when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
+ configureMockScheduler();
+
+ asyncExecutor.queue(callableWrapper, false);
+
+ verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L),
+ eq(TimeUnit.MILLISECONDS));
+ verifyZeroInteractions(executor);
+ }
+
+ @Test
+ public void testSubmissionSuccessfulAfterDelayWhenMaxConcurrencyCheckDisabled() {
+ asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT);
+ when(callableWrapper.getInitialDelay()).thenReturn(100L);
+ when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
+ XCallable<?> wrappedCommand = mock(XCallable.class);
+ Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(wrappedCommand);
+ configureMockScheduler();
+
+ asyncExecutor.queue(callableWrapper, false);
+
+ verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L),
+ eq(TimeUnit.MILLISECONDS));
+ verify(executor).execute(eq(callableWrapper));
+ }
+
+ @Test
+ public void testCannotSubmitDueToFiltering() {
+ when(callableWrapper.filterDuplicates()).thenReturn(false);
+
+ boolean result = asyncExecutor.queue(callableWrapper, false);
+
+ verifyZeroInteractions(scheduledExecutor);
+ verifyZeroInteractions(executor);
+ assertEquals("Active commands", 0, asyncExecutor.getSize());
+ assertTrue("Queuing result", result);
+ }
+
+ @Test
+ public void testExceptionThrownDuringSubmission() {
+ doThrow(new RuntimeException()).when(executor).execute(any(Runnable.class));
+
+ boolean exceptionThrown = false;
+ try {
+ asyncExecutor.queue(callableWrapper, false);
+ } catch (RuntimeException e) {
+ exceptionThrown = true;
+ }
+
+ assertTrue("Exception was not thrown", exceptionThrown);
+ verify(callableWrapper).removeFromUniqueCallables();
+ verifyZeroInteractions(scheduledExecutor);
+ }
+
+ @Test
+ public void testSubmitWithNegativePriority() {
+ testIllegalPriority(-1);
+ }
+
+ @Test
+ public void testSubmitWithTooHighPriority() {
+ testIllegalPriority(MAX_PRIORITY + 1);
+ }
+
+ @Test
+ public void testQueueSizeWhenCommandIsFinished() {
+ CallableWrapper<?> delayedCommand = mock(CallableWrapper.class);
+ when(delayedCommand.getInitialDelay()).thenReturn(100L);
+ when(delayedCommand.filterDuplicates()).thenReturn(true);
+
+ asyncExecutor.queue(callableWrapper, false);
+ asyncExecutor.queue(delayedCommand, false);
+ int sizeAfterQueue = asyncExecutor.getSize();
+ asyncExecutor.commandFinished();
+ asyncExecutor.commandFinished();
+
+ assertEquals("Size after queue", 2, sizeAfterQueue);
+ assertEquals("Active commands", 0, asyncExecutor.getSize());
+ }
+
+ @Test
+ public void testQueueSizeWhenQueueIsFullDuringMaxConcurrencyCheck() {
+ XCallable<?> callable = mock(XCallable.class);
+ when(callable.getType()).thenReturn(DEFAULT_TYPE);
+ Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
+ when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true);
+ asyncExecutor.handleConcurrencyExceeded(callableWrapper);
+ activeCommands.set(20);
+
+ asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+ assertEquals("Active commands", 19, activeCommands.get());
+ }
+
+ @Test
+ public void testSubmissionWhenQueueIsFull() {
+ asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT);
+ callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
+ when(callableWrapper.filterDuplicates()).thenReturn(true);
+ when(callableWrapper.getElement().getKey()).thenReturn("key");
+
+ asyncExecutor.queue(callableWrapper, false);
+ asyncExecutor.queue(callableWrapper, false);
+ boolean finalResult = asyncExecutor.queue(callableWrapper, false);
+
+ assertFalse("Last submission shouldn't have succeeded", finalResult);
+ verify(executor, times(2)).execute(same(callableWrapper));
+ }
+
+ @Test
+ public void testSubmissionWhenQueueSizeIsIgnored() {
+ asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT);
+ callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
+ when(callableWrapper.filterDuplicates()).thenReturn(true);
+ when(callableWrapper.getElement().getKey()).thenReturn("key");
+
+ asyncExecutor.queue(callableWrapper, false);
+ asyncExecutor.queue(callableWrapper, false);
+ boolean finalResult = asyncExecutor.queue(callableWrapper, true);
+
+ assertTrue("Last submission should have succeeded", finalResult);
+ verify(executor, times(3)).execute(same(callableWrapper));
+ }
+
+ @Test
+ public void testPendingCommandSubmission() {
+ XCallable<?> callable = mock(XCallable.class);
+ when(callable.getType()).thenReturn(DEFAULT_TYPE);
+ Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
+ when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true);
+
+ asyncExecutor.handleConcurrencyExceeded(callableWrapper);
+ asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+ verify(executor).execute(eq(callableWrapper));
+ assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
+ Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
+ assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
+ assertEquals("List of pending commands should be empty", 0, pendingCommandsList.size());
+ }
+
+ @Test
+ public void testPendingCommandsWithSameType() {
+ XCallable<?> callable = mock(XCallable.class);
+ when(callable.getType()).thenReturn(DEFAULT_TYPE);
+ Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
+
+ XCallable<?> secondCallable = mock(XCallable.class);
+ when(secondCallable.getType()).thenReturn(DEFAULT_TYPE);
+ CallableWrapper<?> secondWrapper = mock(CallableWrapper.class);
+ Mockito.<XCallable<?>>when(secondWrapper.getElement()).thenReturn(secondCallable);
+
+ asyncExecutor.handleConcurrencyExceeded(callableWrapper);
+ asyncExecutor.handleConcurrencyExceeded(secondWrapper);
+
+ assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
+ Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
+ assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
+ assertEquals("List of pending commands", 2, pendingCommandsList.size());
+ }
+
+ @Test
+ public void testPendingCommandSubmissionWhenQueueIsFull() {
+ XCallable<?> callable = mock(XCallable.class);
+ when(callable.getType()).thenReturn(DEFAULT_TYPE);
+ Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
+
+ when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true);
+
+ activeCommands.set(10);
+ asyncExecutor.handleConcurrencyExceeded(callableWrapper);
+ asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+ verifyZeroInteractions(executor);
+ assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
+ Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
+ assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
+ assertEquals("List of pending commands should be empty", 0, pendingCommandsList.size());
+ }
+
+ @Test
+ public void testPendingCommandSubmissionWhenMaxConcurrencyReached() {
+ XCallable<?> callable = mock(XCallable.class);
+ when(callable.getType()).thenReturn(DEFAULT_TYPE);
+ Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
+ when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(false);
+
+ asyncExecutor.handleConcurrencyExceeded(callableWrapper);
+ asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+ verifyZeroInteractions(executor);
+ assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
+ Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
+ assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
+ assertEquals("List of pending commands list should not be empty", 1, pendingCommandsList.size());
+ }
+
+ @Test
+ public void testQueueDump() {
+ CallableWrapper<?> pendingCallable = mock(CallableWrapper.class);
+ CallableWrapper<?> waitingCallable = mock(CallableWrapper.class);
+ ScheduledXCallable delayedXCallable = mock(ScheduledXCallable.class);
+ @SuppressWarnings("unchecked")
+ AccessibleRunnableScheduledFuture<ScheduledXCallable> asrf = mock(AccessibleRunnableScheduledFuture.class);
+ Mockito.<CallableWrapper<?>>when(delayedXCallable.getCallableWrapper()).thenReturn(waitingCallable);
+ when(asrf.getTask()).thenReturn(delayedXCallable);
+ when(pendingCallable.toString()).thenReturn("pendingCallable");
+ when(waitingCallable.toString()).thenReturn("waitingCallable");
+ when(callableWrapper.toString()).thenReturn("callableWrapper");
+
+ priorityBlockingQueue.add(callableWrapper);
+ delayQueue.add(asrf);
+ pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(pendingCallable));
+
+ List<String> queueDump = asyncExecutor.getQueueDump();
+ assertEquals("Size", 3, queueDump.size());
+ assertTrue("PendingCallable not found", queueDump.contains("pendingCallable"));
+ assertTrue("WaitingCallable not found", queueDump.contains("waitingCallable"));
+ assertTrue("CallableWrapper not found", queueDump.contains("callableWrapper"));
+ }
+
+ @Test
+ public void testAntiStarvationWhenDelayIsAboveMaxWait() {
+ asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500);
+ when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-40000L);
+ when(callableWrapper.getPriority()).thenReturn(0);
+ pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
+
+ asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+ verify(callableWrapper).setPriority(1);
+ verify(callableWrapper).setDelay(eq(0L), eq(TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testAntiStarvationWhenDelayIsBelowMaxWait() {
+ when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-200L);
+ when(callableWrapper.getPriority()).thenReturn(0);
+ pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
+
+ asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+ verify(callableWrapper, never()).setPriority(anyInt());
+ verify(callableWrapper, never()).setDelay(anyLong(), any(TimeUnit.class));
+ }
+
+ @Test
+ public void testAntiStarvationWhenPriorityIsHighest() {
+ asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500);
+ when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-1000L);
+ when(callableWrapper.getPriority()).thenReturn(MAX_PRIORITY);
+ pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
+
+ asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
+
+ verify(callableWrapper, never()).setPriority(anyInt());
+ verify(callableWrapper, never()).setDelay(anyLong(), any(TimeUnit.class));
+ }
+
+ @Test
+ public void testShutDown() throws InterruptedException {
+ when(executor.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
+ when(scheduledExecutor.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
+ asyncExecutor.shutdown();
+
+ verify(executor).shutdown();
+ verify(executor).awaitTermination(eq(1000L), eq(TimeUnit.MILLISECONDS));
+ verify(scheduledExecutor).shutdown();
+ verify(scheduledExecutor).awaitTermination(eq(1000L), eq(TimeUnit.MILLISECONDS));
+ }
+
+ private void testIllegalPriority(int prio) {
+ when(callableWrapper.getPriority()).thenReturn(prio);
+
+ boolean exceptionThrown = false;
+ Throwable cause = null;
+ try {
+ asyncExecutor.queue(callableWrapper, false);
+ } catch (RuntimeException e) {
+ exceptionThrown = true;
+ cause = e.getCause();
+ }
+
+ assertTrue("Exception was not thrown", exceptionThrown);
+ verifyZeroInteractions(scheduledExecutor);
+ verifyZeroInteractions(executor);
+ assertTrue("Illegal exception", cause instanceof IllegalArgumentException);
+ verify(callableWrapper).removeFromUniqueCallables();
+ }
+
+ private void configureMockScheduler() {
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ ScheduledXCallable target = (ScheduledXCallable) invocation.getArguments()[0];
+ target.run();
+ return null;
+ }
+ }).when(scheduledExecutor).schedule(any(ScheduledXCallable.class), any(Long.class),
+ any(TimeUnit.class));
+ }
+
+ private AsyncXCommandExecutor createExecutor(boolean needMaxConcurrencyCheck, int maxActiveCallables,
+ long maxWait) {
+ return new AsyncXCommandExecutor(needMaxConcurrencyCheck,
+ callableQueueService,
+ maxActiveCallables,
+ executor,
+ scheduledExecutor,
+ priorityBlockingQueue,
+ delayQueue,
+ pendingCommandsPerType,
+ activeCommands,
+ DEFAULT_MAXWAIT,
+ TEST_PRIORITIES);
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
index 9c2a11d..5d546ff 100644
--- a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
@@ -25,16 +25,30 @@ import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.XCommand;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.XCallable;
+import org.apache.oozie.util.XLog;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class TestCallableQueueService extends XTestCase {
static AtomicLong EXEC_ORDER = new AtomicLong();
+ private static XLog log = XLog.getLog(TestCallableQueueService.class);
+ private AtomicLong counter = new AtomicLong();
+ private CountDownLatch finished = new CountDownLatch(1);
public static class MyCallable implements XCallable<Void> {
String type;
@@ -93,6 +107,8 @@ public class TestCallableQueueService extends XTestCase {
StringBuilder sb = new StringBuilder();
sb.append("Type:").append(getType());
sb.append(",Priority:").append(getPriority());
+ sb.append(",Key:").append(getKey());
+ sb.append(",Wait:").append(wait);
return sb.toString();
}
@@ -409,20 +425,19 @@ public class TestCallableQueueService extends XTestCase {
queueservice.queue(c, 10);
}
- float originalRatio = XTestCase.WAITFOR_RATIO;
- try{
- XTestCase.WAITFOR_RATIO = 1;
- waitFor(2000, new Predicate() {
- public boolean evaluate() throws Exception {
- return queueservice.queueSize() == 0;
+ waitFor(3000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ boolean completed = true;
+
+ for (MyCallable callable : callables) {
+ completed &= (callable.executed != 0);
}
- });
- }
- finally {
- XTestCase.WAITFOR_RATIO = originalRatio;
- }
- System.out.println("Callable Queue Size :" + queueservice.queueSize());
+ completed &= (callableOther.executed != 0);
+
+ return completed;
+ }
+ });
long last = Long.MIN_VALUE;
for (MyCallable c : callables) {
@@ -887,7 +902,7 @@ public class TestCallableQueueService extends XTestCase {
}
public void testRemoveUniqueCallables() throws Exception {
- XCommand command = new XCommand("Test", "type", 100) {
+ XCommand<?> command = new XCommand<Object>("Test", "type", 100) {
@Override
protected boolean isLockRequired() {
return false;
@@ -928,4 +943,255 @@ public class TestCallableQueueService extends XTestCase {
uniquesAfter.removeAll(uniquesBefore);
assertTrue(uniquesAfter.toString(), uniquesAfter.isEmpty());
}
+
+ public void testPriorityExecutionOrder() throws InterruptedException, ServiceException {
+ Services.get().destroy();
+ setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+ setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "1000000");
+ new Services().init();
+
+ final int taskCount = 999_999;
+ Multimap<Integer, Long> executions = Multimaps.synchronizedMultimap(ArrayListMultimap.create());
+ List<BookingCallable> callables = new ArrayList<>(taskCount);
+
+ for (int i = 2; i >= 0; i--) {
+ String type = String.valueOf(i);
+ for (int j = 0; j < taskCount / 3; j++) {
+ String key = type + "_" + UUID.randomUUID().toString();
+ BookingCallable dc = new BookingCallable(executions, taskCount, key, type, i, 0);
+ callables.add(dc);
+ }
+ }
+
+ CallableQueueService queueservice = Services.get().get(CallableQueueService.class);
+
+ for (int i = 0; i < taskCount; i++) {
+ queueservice.queue(callables.get(i));
+ }
+
+ try {
+ finished.await(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("Error", e);
+ fail("Exception during test: " + e.getMessage());
+ }
+ // It's necessary because after finished.await() returns, the last XCallable
+ // could still be running
+ waitFor(1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return queueservice.queueSize() == 0;
+ }
+ });
+
+ Map<Integer, Long> minTime = new HashMap<>();
+ Map<Integer, Long> maxTime = new HashMap<>();
+
+ for (Map.Entry<Integer, Collection<Long>> entry : executions.asMap().entrySet()) {
+ int prio = entry.getKey();
+ Collection<Long> values = entry.getValue();
+ minTime.put(prio, Collections.min(values));
+ maxTime.put(prio, Collections.max(values));
+ }
+
+ // Expected timeline of execution times:
+ // --> [min] Prio #2 [max] --> [min] Prio #1 [max] --> [min] Prio #0 [max]
+
+ assertTrue("Failed: maxTime prio #2: " + maxTime.get(2) + " / minTime prio #1: " + minTime.get(1),
+ maxTime.get(2) <= minTime.get(1));
+ assertTrue("Failed: maxTime prio #1: " + maxTime.get(1) + " / minTime prio #0: " + minTime.get(0),
+ maxTime.get(1) <= minTime.get(0));
+ }
+
+ public void testMaxConcurrencyReached() throws Exception {
+ Services.get().destroy();
+ setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "100000");
+ new Services().init();
+
+ int partitions = 10;
+ int taskPerPartition = 10000;
+
+ final int taskCount = partitions * taskPerPartition;
+
+ List<DummyCallable> callables = new ArrayList<>(taskCount);
+
+ for (int i = 0; i < partitions; i++) {
+ String type = String.valueOf(i);
+ for (int j = 0; j < taskPerPartition; j++) {
+ String key = type + "_" + UUID.randomUUID().toString();
+ DummyCallable dc = new DummyCallable(taskCount, key, type, 0, 0);
+ callables.add(dc);
+ }
+ }
+
+ CallableQueueService queueservice = Services.get().get(CallableQueueService.class);
+
+ for (int i = 0; i < taskCount; i++) {
+ queueservice.queue(callables.get(i));
+ }
+
+ try {
+ finished.await(100, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("Error", e);
+ fail("Exception during test: " + e.getMessage());
+ }
+
+ assertEquals("Not all callables have been executed", counter.get(), taskCount);
+ }
+
+ public void testQueueSizeWithDelayedElements() throws InterruptedException {
+ final int taskCount = 10_000;
+
+ List<DummyCallable> callables = new ArrayList<>(taskCount);
+ for (int i = 0; i < taskCount; i++) {
+ String keyAndType = String.valueOf(i);
+ DummyCallable dc = new DummyCallable(taskCount, keyAndType, keyAndType, 0, 0);
+ callables.add(dc);
+ }
+
+ CallableQueueService queueservice = Services.get().get(CallableQueueService.class);
+
+ for (int i = 0; i < taskCount; i++) {
+ queueservice.queue(callables.get(i), 2000);
+ }
+
+ int queueSizeAfterSubmission = queueservice.queueSize();
+
+ try {
+ finished.await(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("Error", e);
+ fail("Exception during test: " + e.getMessage());
+ }
+
+ assertEquals("Queue size after submission", taskCount, queueSizeAfterSubmission);
+ assertEquals("Queue size after execution", 0, queueservice.queueSize());
+ }
+
+ public void testQueueSizeAfterNormalSubmission() throws InterruptedException {
+ final int taskCount = 10_000;
+
+ List<DummyCallable> callables = new ArrayList<>(taskCount);
+ for (int i = 0; i < taskCount; i++) {
+ String keyAndType = String.valueOf(i);
+ DummyCallable dc = new DummyCallable(taskCount, keyAndType, keyAndType, 0, 0);
+ callables.add(dc);
+ }
+
+ CallableQueueService queueservice = Services.get().get(CallableQueueService.class);
+
+ for (int i = 0; i < taskCount; i++) {
+ queueservice.queue(callables.get(i));
+ }
+
+ // Not an exact number - it's close to 10,000 but keeps fluctuating
+ // We can still verify that it's larger than a certain number though
+ int queueSizeAfterSubmission = queueservice.queueSize();
+ try {
+ finished.await(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("Error", e);
+ fail("Exception during test: " + e.getMessage());
+ }
+ // It's necessary because after finished.await() returns, the last XCallable
+ // could still be running
+ waitFor(1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return queueservice.queueSize() == 0;
+ }
+ });
+
+ assertTrue("Too few elements in the queue: " + queueSizeAfterSubmission + ", should be >9000",
+ queueSizeAfterSubmission > 9000);
+ assertEquals("Queue size after execution", 0, queueservice.queueSize());
+ }
+
+ public void testQueueSizeWhenMaxConcurrencyIsReached() throws InterruptedException {
+ int partitions = 10;
+ int taskPerPartition = 1000;
+
+ final int taskCount = partitions * taskPerPartition;
+
+ List<DummyCallable> callables = new ArrayList<>(taskCount);
+
+ for (int i = 0; i < partitions; i++) {
+ String type = String.valueOf(i);
+ for (int j = 0; j < taskPerPartition; j++) {
+ String key = type + "_" + UUID.randomUUID().toString();
+ DummyCallable dc = new DummyCallable(taskCount, key, type, 0, 0);
+ callables.add(dc);
+ }
+ }
+
+ CallableQueueService queueservice = Services.get().get(CallableQueueService.class);
+
+ for (int i = 0; i < taskCount; i++) {
+ queueservice.queue(callables.get(i));
+ }
+
+ // Not an exact number - it's close to 10,000 but keeps fluctuating
+ // We can still verify that it's larger than a certain number though
+ int queueSizeAfterSubmission = queueservice.queueSize();
+ try {
+ finished.await(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("Error", e);
+ fail("Exception during test: " + e.getMessage());
+ }
+ // It's necessary because after finished.await() returns, the last XCallable
+ // could still be running
+ waitFor(1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return queueservice.queueSize() == 0;
+ }
+ });
+
+ assertTrue("Too few elements in the queue: " + queueSizeAfterSubmission + ", should be >9000",
+ queueSizeAfterSubmission > 9000);
+ assertEquals("Queue size after execution", 0, queueservice.queueSize());
+ }
+
+ private class DummyCallable extends MyCallable {
+ private final int taskCount;
+
+ public DummyCallable(int taskCount, String key, String type, int priority, int wait) {
+ super(key, type, priority, wait);
+ this.taskCount = taskCount;
+ }
+
+ public Void call() throws Exception {
+ if (counter.incrementAndGet() == taskCount) {
+ finished.countDown();
+ }
+
+ return null;
+ }
+ }
+
+ private class BookingCallable extends MyCallable {
+ private final int taskCount;
+ private final Multimap<Integer, Long> executions;
+
+ public BookingCallable(Multimap<Integer, Long> executions,
+ int taskCount,
+ String key,
+ String type,
+ int priority,
+ int wait) {
+ super(key, type, priority, wait);
+ this.taskCount = taskCount;
+ this.executions = executions;
+ }
+
+ public Void call() throws Exception {
+ executions.put(getPriority(), System.currentTimeMillis());
+ if (counter.incrementAndGet() == taskCount) {
+ finished.countDown();
+ }
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 78f25e7..a16b3f9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-3160 PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (pbacsko)
OOZIE-2877 Git action (clayb, andras.piros via pbacsko, gezapeti)
OOZIE-3061 Kill only those child jobs which are not already killed (matijhs via gezapeti, andras.piros)
OOZIE-3155 [ui] Job DAG is not refreshed when a job is finished (asalamon74 via andras.piros)