You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/07/29 21:32:14 UTC
[19/31] git commit: optimize queue size computation and minor other
optimizations
optimize queue size computation and minor other optimizations
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/e17b6094
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/e17b6094
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/e17b6094
Branch: refs/heads/master
Commit: e17b6094518abf48c298d3dac9b73c279d44be94
Parents: 40b63e2
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Jul 23 19:11:53 2014 -0700
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Jul 29 10:42:09 2014 -0400
----------------------------------------------------------------------
.../brooklyn/util/task/DynamicSequentialTask.java | 15 +++++++++------
.../brooklyn/util/task/SingleThreadedScheduler.java | 10 ++++++----
2 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e17b6094/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
index a17ffbe..520c8b0 100644
--- a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
+++ b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
@@ -184,9 +184,9 @@ public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChi
/** submits the indicated task for execution in the current execution context, and returns immediately */
protected void submitBackgroundInheritingContext(Task<?> task) {
BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
- if (log.isTraceEnabled())
- log.trace("task {} - submitting background task {} ({})", new Object[] {
- Tasks.current(), task, ec });
+ if (log.isTraceEnabled()) {
+ log.trace("task {} - submitting background task {} ({})", new Object[] { Tasks.current(), task, ec });
+ }
if (ec==null) {
String message = Tasks.current()!=null ?
// user forgot ExecContext:
@@ -197,10 +197,13 @@ public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChi
throw new IllegalStateException(message);
}
synchronized (task) {
- if (task.isSubmitted() && !task.isDone())
- log.debug("DST "+this+" skipping submission of child "+task+" because it is already submitted");
- else
+ if (task.isSubmitted() && !task.isDone()) {
+ if (log.isTraceEnabled()) {
+ log.trace("DST "+this+" skipping submission of child "+task+" because it is already submitted");
+ }
+ } else {
ec.submit(task);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e17b6094/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java b/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
index 420002f..14c08c4 100644
--- a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
+++ b/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
@@ -49,6 +49,7 @@ public class SingleThreadedScheduler implements TaskScheduler, CanSetName {
private static final Logger LOG = LoggerFactory.getLogger(SingleThreadedScheduler.class);
private final Queue<QueuedSubmission<?>> order = new ConcurrentLinkedQueue<QueuedSubmission<?>>();
+ private int queueSize = 0;
private final AtomicBoolean running = new AtomicBoolean(false);
private ExecutorService executor;
@@ -77,13 +78,13 @@ public class SingleThreadedScheduler implements TaskScheduler, CanSetName {
} else {
WrappingFuture<T> f = new WrappingFuture<T>();
order.add(new QueuedSubmission<T>(c, f));
- int size = order.size();
- if (size>0 && (size == 50 || (size<=500 && (size%100)==0) || (size%1000)==0) && size!=lastSizeWarn) {
- LOG.warn("{} is backing up, {} tasks queued", this, size);
+ queueSize++;
+ if (queueSize>0 && (queueSize == 50 || (queueSize<=500 && (queueSize%100)==0) || (queueSize%1000)==0) && queueSize!=lastSizeWarn) {
+ LOG.warn("{} is backing up, {} tasks queued", this, queueSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Task queue backing up detail, queue "+this+"; task context is "+Tasks.current()+"; latest task is "+c+"; first task is "+order.peek());
}
- lastSizeWarn = size;
+ lastSizeWarn = queueSize;
}
return f;
}
@@ -99,6 +100,7 @@ public class SingleThreadedScheduler implements TaskScheduler, CanSetName {
done = true;
} else {
QueuedSubmission<?> qs = order.remove();
+ queueSize--;
if (!qs.f.isCancelled()) {
Future future = executeNow(qs.c);
qs.f.setDelegate(future);