You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/10/03 12:44:42 UTC
oozie git commit: OOZIE-3160 amend PriorityDelayQueue put()/take()
can cause significant CPU load due to busy waiting (andras.piros)
Repository: oozie
Updated Branches:
refs/heads/master 4aa7bbde3 -> cf6540977
OOZIE-3160 amend PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (andras.piros)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/cf654097
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/cf654097
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/cf654097
Branch: refs/heads/master
Commit: cf654097755c3c4997614e1a9844c79193b5bc48
Parents: 4aa7bbd
Author: Andras Piros <an...@cloudera.com>
Authored: Wed Oct 3 14:42:40 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Wed Oct 3 14:42:40 2018 +0200
----------------------------------------------------------------------
.../oozie/service/AsyncXCommandExecutor.java | 21 ++++-
.../oozie/service/CallableQueueService.java | 85 +++++++++++---------
core/src/main/resources/oozie-default.xml | 24 ++++--
.../service/TestAsyncXCommandExecutor.java | 23 +++---
...TestHAPartitionDependencyManagerService.java | 1 +
.../java/org/apache/oozie/test/XTestCase.java | 3 +
.../java/org/apache/oozie/test/ZKXTestCase.java | 4 +-
release-log.txt | 1 +
8 files changed, 108 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
index b18a37a..1f37622 100644
--- a/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
+++ b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java
@@ -63,6 +63,7 @@ public class AsyncXCommandExecutor {
private final long maxActiveCommands; // equivalent of "queueSize" in CQS
private final long maxWait;
private final long maxPriority;
+ private final int awaitTerminationTimeoutSeconds;
private final BlockingQueue<CallableWrapper<?>> priorityBlockingQueue;
private final BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayWorkQueue;
@@ -78,7 +79,8 @@ public class AsyncXCommandExecutor {
CallableQueueService callableAccess,
long maxActiveCommands,
long maxWait,
- int priorities) {
+ int priorities,
+ int awaitTerminationTimeoutSeconds) {
priorityBlockingQueue = new PriorityBlockingQueue<CallableWrapper<?>>(100, new PriorityComparator());
@@ -115,6 +117,9 @@ public class AsyncXCommandExecutor {
this.pendingCommandsPerType = new ConcurrentHashMap<>();
Preconditions.checkArgument(priorities > 0, "Number of priorities must be >0");
this.maxPriority = priorities - 1;
+ Preconditions.checkArgument(awaitTerminationTimeoutSeconds > 0,
+ String.format("Await termination timeout must be >0, is %s", awaitTerminationTimeoutSeconds));
+ this.awaitTerminationTimeoutSeconds = awaitTerminationTimeoutSeconds;
}
@VisibleForTesting
@@ -128,7 +133,8 @@ public class AsyncXCommandExecutor {
ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType,
AtomicInteger activeCommands,
long maxWait,
- long priorities) {
+ long priorities,
+ int awaitTerminationTimeoutSeconds) {
this.priorityBlockingQueue = priorityBlockingQueue;
this.delayWorkQueue = delayQueue;
@@ -141,6 +147,7 @@ public class AsyncXCommandExecutor {
this.activeCommands = activeCommands;
this.maxWait = maxWait;
this.maxPriority = priorities - 1;
+ this.awaitTerminationTimeoutSeconds = awaitTerminationTimeoutSeconds;
}
public synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) {
@@ -266,6 +273,14 @@ public class AsyncXCommandExecutor {
}
}
+ public boolean isShutDown() {
+ return executor.isShutdown() || scheduledExecutor.isShutdown();
+ }
+
+ public boolean isTerminated() {
+ return executor.isTerminated() || scheduledExecutor.isTerminated();
+ }
+
public List<String> getQueueDump() {
List<CallableWrapper<?>> copyOfPending = new ArrayList<>(100);
List<String> queueDump = new ArrayList<>(100);
@@ -399,7 +414,7 @@ public class AsyncXCommandExecutor {
}
private void shutdownExecutor(ExecutorService executor, String name) throws InterruptedException {
- long limit = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30);
+ long limit = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(this.awaitTerminationTimeoutSeconds);
executor.shutdown();
while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
log.info("Waiting for [{0}] to shutdown", name);
http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index dc9a099..a942600 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -86,7 +86,9 @@ public class CallableQueueService implements Service, Instrumentable {
public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
public static final String CONF_THREADS = CONF_PREFIX + "threads";
- public static final String CONF_OLDIMPL = CONF_PREFIX + "queue.oldImpl";
+ public static final String CONF_NEWIMPL = CONF_PREFIX + "queue.newImpl";
+ public static final String CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS =
+ CONF_PREFIX + "queue.awaitTermination.timeout.seconds";
public static final String CONF_DELAYED_CALLABLE_THREADS = CONF_PREFIX + "delayedcallable.threads";
public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
@@ -110,6 +112,8 @@ public class CallableQueueService implements Service, Instrumentable {
private int maxCallableConcurrency;
+ private int queueAwaitTerminationTimeoutSeconds;
+
private boolean callableBegin(XCallable<?> callable) {
synchronized (activeCallables) {
AtomicInteger counter = activeCallables.get(callable.getType());
@@ -136,7 +140,7 @@ public class CallableQueueService implements Service, Instrumentable {
}
}
- if (!oldImpl) {
+ if (newImpl) {
asyncXCommandExecutor.commandFinished();
asyncXCommandExecutor.checkMaxConcurrency(callable.getType());
}
@@ -441,7 +445,7 @@ public class CallableQueueService implements Service, Instrumentable {
private PriorityDelayQueue<CallableWrapper<?>> queue;
private ThreadPoolExecutor executor;
private Instrumentation instrumentation;
- private boolean oldImpl = false;
+ private boolean newImpl = false;
private AsyncXCommandExecutor asyncXCommandExecutor;
/**
@@ -525,21 +529,11 @@ public class CallableQueueService implements Service, Instrumentable {
interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE);
- oldImpl = ConfigurationService.getBoolean(CONF_OLDIMPL, false);
- log.info("Using old queue implementation: [{0}]", oldImpl);
+ newImpl = ConfigurationService.getBoolean(CONF_NEWIMPL, true);
+ log.info("Using new queue implementation: [{0}]", newImpl);
+ queueAwaitTerminationTimeoutSeconds = ConfigurationService.getInt(conf, CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS);
- if (oldImpl) {
- executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
- new NamedThreadFactory("CallableQueue")) {
- protected void beforeExecute(Thread t, Runnable r) {
- super.beforeExecute(t,r);
- XLog.Info.get().clear();
- }
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- return (RunnableFuture<T>)callable;
- }
- };
- } else {
+ if (newImpl) {
int delayedCallableThreads = ConfigurationService.getInt(CONF_DELAYED_CALLABLE_THREADS, 1);
asyncXCommandExecutor = new AsyncXCommandExecutor(threads,
@@ -548,9 +542,21 @@ public class CallableQueueService implements Service, Instrumentable {
this,
queueSize,
MAX_CALLABLE_WAITTIME_MS,
- PRIORITIES);
+ PRIORITIES,
+ queueAwaitTerminationTimeoutSeconds);
executor = asyncXCommandExecutor.getExecutorService();
+ } else {
+ executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
+ new NamedThreadFactory("CallableQueue")) {
+ protected void beforeExecute(Thread t, Runnable r) {
+ super.beforeExecute(t,r);
+ XLog.Info.get().clear();
+ }
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ return (RunnableFuture<T>)callable;
+ }
+ };
}
// IMPORTANT: The ThreadPoolExecutor does not always the execute
@@ -586,20 +592,22 @@ public class CallableQueueService implements Service, Instrumentable {
@Override
public void destroy() {
try {
- long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
- executor.shutdown();
queue.clear();
- while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
- log.info("Waiting for executor to shutdown");
- if (System.currentTimeMillis() > limit) {
- log.warn("Gave up, continuing without waiting for executor to shutdown");
- break;
- }
- }
-
- if (!oldImpl) {
+ if (newImpl) {
asyncXCommandExecutor.shutdown();
}
+ else {
+ long limit = System.currentTimeMillis() + queueAwaitTerminationTimeoutSeconds * 1000;
+ executor.shutdown();
+ queue.clear();
+ while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
+ log.info("Waiting for executor to shutdown");
+ if (System.currentTimeMillis() > limit) {
+ log.warn("Gave up, continuing without waiting for executor to shutdown");
+ break;
+ }
+ }
+ }
}
catch (InterruptedException ex) {
log.warn(ex);
@@ -620,11 +628,18 @@ public class CallableQueueService implements Service, Instrumentable {
* @return int size of queue
*/
public synchronized int queueSize() {
- return oldImpl ? queue.size() : asyncXCommandExecutor.getSize();
+ return newImpl ? asyncXCommandExecutor.getSize() : queue.size();
}
private synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) {
- if (oldImpl) {
+ if (newImpl) {
+ if (asyncXCommandExecutor.isShutDown() || asyncXCommandExecutor.isTerminated()) {
+ log.warn("Async executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
+ }
+ else {
+ asyncXCommandExecutor.queue(wrapper, ignoreQueueSize);
+ }
+ } else {
if (!ignoreQueueSize && queue.size() >= queueSize) {
log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
return false;
@@ -644,8 +659,6 @@ public class CallableQueueService implements Service, Instrumentable {
else {
log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
}
- } else {
- asyncXCommandExecutor.queue(wrapper, ignoreQueueSize);
}
return true;
@@ -820,7 +833,9 @@ public class CallableQueueService implements Service, Instrumentable {
* @return the list of string that representing each CallableWrapper
*/
public List<String> getQueueDump() {
- if (oldImpl) {
+ if (newImpl) {
+ return asyncXCommandExecutor.getQueueDump();
+ } else {
List<String> list = new ArrayList<String>();
for (QueueElement<CallableWrapper<?>> qe : queue) {
if (qe.toString() == null) {
@@ -829,8 +844,6 @@ public class CallableQueueService implements Service, Instrumentable {
list.add(qe.toString());
}
return list;
- } else {
- return asyncXCommandExecutor.getQueueDump();
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index f292ad3..6c7fc9d 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -518,13 +518,27 @@
</property>
<property>
- <name>oozie.service.CallableQueueService.queue.oldImpl</name>
+ <name>oozie.service.CallableQueueService.queue.newImpl</name>
<value>true</value>
<description>
- If set to false, then CallableQueueService will use a more performant, less CPU-intensive
- queuing mechanism to execute asynchronous tasks internally. The old implementation generates
- noticeable CPU load even if Oozie is completely idle, especially when oozie.service.CallableQueueService.threads
- set to a large number. The previous queuing mechanism is kept as a fallback option.
+ If set to true, then CallableQueueService will use a faster, less CPU-intensive queuing mechanism to execute
+ asynchronous tasks internally.
+ The old implementation generates noticeable CPU load even if Oozie is completely idle, especially when
+ oozie.service.CallableQueueService.threads is set to a large number. The previous queuing mechanism is kept as a
+ fallback option.
+ This is an experimental feature in Oozie 5.1.0 that needs to be re-evaluated upon an upcoming minor release,
+ meaning the old implementation and this feature flag will also be removed.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.CallableQueueService.queue.awaitTermination.timeout.seconds</name>
+ <value>30</value>
+ <description>
+ Number of seconds while awaiting termination of ThreadPoolExecutor instances when CallableQueueService#destroy()
+ is called, in seconds.
+ The more elements you tend to have in your callable queue, the more you want CallableQueueService to wait
+ before shutting down its thread pools.
</description>
</property>
http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
index 2dce409..e0d14d6 100644
--- a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
+++ b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
@@ -74,6 +74,7 @@ public class TestAsyncXCommandExecutor {
private static final long DEFAULT_MAXWAIT = 30_000;
private static final int TEST_PRIORITIES = 5;
private static final int MAX_PRIORITY = TEST_PRIORITIES - 1;
+ private static final int AWAIT_TERMINATION_TIMEOUT_SECONDS = 1;
@Mock
private ThreadPoolExecutor executor;
@@ -100,7 +101,7 @@ public class TestAsyncXCommandExecutor {
pendingCommandsPerType = new ConcurrentHashMap<>();
delayQueue = new LinkedBlockingQueue<>(); // in reality it's not LBQ, but it's fine here
asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT,
- TEST_PRIORITIES);
+ TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
when(callableWrapper.filterDuplicates()).thenReturn(true);
when(callableWrapper.getElement().getKey()).thenReturn("key");
when(callableWrapper.getElement().getType()).thenReturn(DEFAULT_TYPE);
@@ -158,7 +159,7 @@ public class TestAsyncXCommandExecutor {
@Test
public void testSubmissionSuccessfulAfterDelayWhenMaxConcurrencyCheckDisabled() {
- asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES);
+ asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
when(callableWrapper.getInitialDelay()).thenReturn(100L);
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
XCallable<?> wrappedCommand = mock(XCallable.class);
@@ -242,7 +243,7 @@ public class TestAsyncXCommandExecutor {
@Test
public void testSubmissionWhenQueueIsFull() {
- asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES);
+ asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
when(callableWrapper.filterDuplicates()).thenReturn(true);
when(callableWrapper.getElement().getKey()).thenReturn("key");
@@ -257,7 +258,7 @@ public class TestAsyncXCommandExecutor {
@Test
public void testSubmissionWhenQueueSizeIsIgnored() {
- asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES);
+ asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
when(callableWrapper.filterDuplicates()).thenReturn(true);
when(callableWrapper.getElement().getKey()).thenReturn("key");
@@ -369,7 +370,8 @@ public class TestAsyncXCommandExecutor {
@Test
public void testAntiStarvationWhenDelayIsAboveMaxWait() {
- asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES);
+ asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES,
+ AWAIT_TERMINATION_TIMEOUT_SECONDS);
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-40000L);
when(callableWrapper.getPriority()).thenReturn(0);
pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
@@ -394,7 +396,8 @@ public class TestAsyncXCommandExecutor {
@Test
public void testAntiStarvationWhenPriorityIsHighest() {
- asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES);
+ asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES,
+ AWAIT_TERMINATION_TIMEOUT_SECONDS);
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-1000L);
when(callableWrapper.getPriority()).thenReturn(MAX_PRIORITY);
pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
@@ -419,7 +422,8 @@ public class TestAsyncXCommandExecutor {
@Test
public void testPriorityHandling() {
- asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 100, DEFAULT_MAXWAIT, 100);
+ asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 100, DEFAULT_MAXWAIT, 100,
+ AWAIT_TERMINATION_TIMEOUT_SECONDS);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -490,7 +494,7 @@ public class TestAsyncXCommandExecutor {
}
private AsyncXCommandExecutor createExecutor(boolean needMaxConcurrencyCheck, int maxActiveCallables,
- long maxWait, int priorities) {
+ long maxWait, int priorities, int awaitTerminationTimeoutSeconds) {
return new AsyncXCommandExecutor(needMaxConcurrencyCheck,
callableQueueService,
maxActiveCallables,
@@ -501,6 +505,7 @@ public class TestAsyncXCommandExecutor {
pendingCommandsPerType,
activeCommands,
maxWait,
- priorities);
+ priorities,
+ awaitTerminationTimeoutSeconds);
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
index 3e1df07..a7c7e07 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
@@ -57,6 +57,7 @@ public class TestHAPartitionDependencyManagerService extends ZKXTestCase {
}
protected void tearDown() throws Exception {
+ services.destroy();
super.tearDown();
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index 3048dda..1f0cf17 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -81,6 +81,7 @@ import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.hadoop.LauncherMain;
import org.apache.oozie.dependency.FSURIHandler;
import org.apache.oozie.dependency.HCatURIHandler;
+import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HCatAccessorService;
import org.apache.oozie.service.HadoopAccessorException;
@@ -409,6 +410,8 @@ public abstract class XTestCase extends TestCase {
oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll("org.apache.oozie.service.ShareLibService,",""));
// Make sure to create the Oozie DB during unit tests
oozieSiteConf.set(JPAService.CONF_CREATE_DB_SCHEMA, "true");
+ // Make sure thread pools shut down in a timely manner
+ oozieSiteConf.set(CallableQueueService.CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS, "1");
File target = new File(testCaseConfDir, "oozie-site.xml");
oozieSiteConf.writeXml(new FileOutputStream(target));
http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
index abc7c9f..bfa2d85 100644
--- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
@@ -97,7 +97,9 @@ public abstract class ZKXTestCase extends XDataTestCase {
@Override
protected void tearDown() throws Exception {
super.tearDown();
- Services.get().destroy();
+ if (Services.get() != null) {
+ Services.get().destroy();
+ }
sDiscovery.close();
sDiscovery = null;
client.close();
http://git-wip-us.apache.org/repos/asf/oozie/blob/cf654097/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index d4d0b1e..8a72d5e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-3160 amend PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (andras.piros)
OOZIE-3354 [core] [SSH action] SSH action gets hung (andras.piros)
OOZIE-3343 amend [build] [tests] Add the first five test errors per module to the report (kmarton via andras.piros)
OOZIE-3348 [Hive action] Remove dependency hive-contrib (kmarton via andras.piros)