You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/07/29 21:32:14 UTC

[19/31] git commit: optimize queue size computation and minor other optimizations

optimize queue size computation and minor other optimizations


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/e17b6094
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/e17b6094
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/e17b6094

Branch: refs/heads/master
Commit: e17b6094518abf48c298d3dac9b73c279d44be94
Parents: 40b63e2
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Jul 23 19:11:53 2014 -0700
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Jul 29 10:42:09 2014 -0400

----------------------------------------------------------------------
 .../brooklyn/util/task/DynamicSequentialTask.java    | 15 +++++++++------
 .../brooklyn/util/task/SingleThreadedScheduler.java  | 10 ++++++----
 2 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e17b6094/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
index a17ffbe..520c8b0 100644
--- a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
+++ b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
@@ -184,9 +184,9 @@ public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChi
     /** submits the indicated task for execution in the current execution context, and returns immediately */
     protected void submitBackgroundInheritingContext(Task<?> task) {
         BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
-        if (log.isTraceEnabled())
-            log.trace("task {} - submitting background task {} ({})", new Object[] { 
-                Tasks.current(), task, ec });
+        if (log.isTraceEnabled()) {
+            log.trace("task {} - submitting background task {} ({})", new Object[] { Tasks.current(), task, ec });
+        }
         if (ec==null) {
             String message = Tasks.current()!=null ?
                     // user forgot ExecContext:
@@ -197,10 +197,13 @@ public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChi
             throw new IllegalStateException(message);
         }
         synchronized (task) {
-            if (task.isSubmitted() && !task.isDone())
-                log.debug("DST "+this+" skipping submission of child "+task+" because it is already submitted");
-            else
+            if (task.isSubmitted() && !task.isDone()) {
+                if (log.isTraceEnabled()) {
+                    log.trace("DST "+this+" skipping submission of child "+task+" because it is already submitted");
+                }
+            } else {
                 ec.submit(task);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e17b6094/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java b/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
index 420002f..14c08c4 100644
--- a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
+++ b/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
@@ -49,6 +49,7 @@ public class SingleThreadedScheduler implements TaskScheduler, CanSetName {
     private static final Logger LOG = LoggerFactory.getLogger(SingleThreadedScheduler.class);
     
     private final Queue<QueuedSubmission<?>> order = new ConcurrentLinkedQueue<QueuedSubmission<?>>();
+    private int queueSize = 0;
     private final AtomicBoolean running = new AtomicBoolean(false);
     
     private ExecutorService executor;
@@ -77,13 +78,13 @@ public class SingleThreadedScheduler implements TaskScheduler, CanSetName {
         } else {
             WrappingFuture<T> f = new WrappingFuture<T>();
             order.add(new QueuedSubmission<T>(c, f));
-            int size = order.size();
-            if (size>0 && (size == 50 || (size<=500 && (size%100)==0) || (size%1000)==0) && size!=lastSizeWarn) {
-                LOG.warn("{} is backing up, {} tasks queued", this, size);
+            queueSize++;
+            if (queueSize>0 && (queueSize == 50 || (queueSize<=500 && (queueSize%100)==0) || (queueSize%1000)==0) && queueSize!=lastSizeWarn) {
+                LOG.warn("{} is backing up, {} tasks queued", this, queueSize);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Task queue backing up detail, queue "+this+"; task context is "+Tasks.current()+"; latest task is "+c+"; first task is "+order.peek());
                 }
-                lastSizeWarn = size;
+                lastSizeWarn = queueSize;
             }
             return f;
         }
@@ -99,6 +100,7 @@ public class SingleThreadedScheduler implements TaskScheduler, CanSetName {
                 done = true;
             } else {
                 QueuedSubmission<?> qs = order.remove();
+                queueSize--;
                 if (!qs.f.isCancelled()) {
                     Future future = executeNow(qs.c);
                     qs.f.setDelegate(future);