You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ko...@apache.org on 2014/05/22 11:02:55 UTC

git commit: updated refs/heads/master to d5754d9

Repository: cloudstack
Updated Branches:
  refs/heads/master cd62aecb2 -> d5754d910


CLOUDSTACK-6740: Direct agent command throttling improvements
List of changes:
1. Created a separate thread pool for handling cron and ping tasks. The size of the pool is based on direct.agent.pool.size. The existing direct agent pool will run all commands other than cron and ping.
2. For normal tasks (generated as part of user/admin API calls), if throttle limit is reached then tasks get queued up for subsequent execution once threads are available.
3. For cron and ping tasks (internally generated by MS like ping, VM sync etc.), if throttle limit is reached then these gets rejected. Since these are internally generated these can be rejected without any issues.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/d5754d91
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/d5754d91
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/d5754d91

Branch: refs/heads/master
Commit: d5754d910112aed4ce482cffd04892683c7029db
Parents: cd62aec
Author: Koushik Das <ko...@apache.org>
Authored: Thu May 22 14:07:13 2014 +0530
Committer: Koushik Das <ko...@apache.org>
Committed: Thu May 22 14:15:42 2014 +0530

----------------------------------------------------------------------
 .../com/cloud/agent/manager/AgentAttache.java   |   3 -
 .../cloud/agent/manager/AgentManagerImpl.java   |   8 ++
 .../cloud/agent/manager/DirectAgentAttache.java | 107 ++++++++++++++++---
 3 files changed, 100 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d5754d91/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
index ebf9366..f11f69f 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 import org.apache.log4j.Logger;
@@ -106,7 +105,6 @@ public abstract class AgentAttache {
     protected Status _status = Status.Connecting;
     protected boolean _maintenance;
     protected long _nextSequence;
-    protected AtomicInteger _outstandingTaskCount;
 
     protected AgentManagerImpl _agentMgr;
 
@@ -129,7 +127,6 @@ public abstract class AgentAttache {
         _requests = new LinkedList<Request>();
         _agentMgr = agentMgr;
         _nextSequence = new Long(s_rand.nextInt(Short.MAX_VALUE)) << 48;
-        _outstandingTaskCount = new AtomicInteger(0);
     }
 
     public synchronized long getNextSequence() {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d5754d91/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
index c9a35dc..c0a87ad 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
@@ -162,6 +162,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
     protected ExecutorService _executor;
     protected ThreadPoolExecutor _connectExecutor;
     protected ScheduledExecutorService _directAgentExecutor;
+    protected ScheduledExecutorService _cronJobExecutor;
     protected ScheduledExecutorService _monitorExecutor;
 
     private int _directAgentThreadCap;
@@ -219,7 +220,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
         _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this);
         s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers");
 
+        // executes all agent commands other than cron and ping
         _directAgentExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgent"));
+        // executes cron and ping agent commands
+        _cronJobExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgentCronJob"));
         s_logger.debug("Created DirectAgentAttache pool with size: " + DirectAgentPoolSize.value());
         _directAgentThreadCap = Math.round(DirectAgentPoolSize.value() * DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0
 
@@ -1452,6 +1456,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
         return _directAgentExecutor;
     }
 
+    public ScheduledExecutorService getCronJobPool() {
+        return _cronJobExecutor;
+    }
+
     public int getDirectAgentThreadCap() {
         return _directAgentThreadCap;
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d5754d91/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
index 9874ee4..7ca6929 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
@@ -17,12 +17,13 @@
 package com.cloud.agent.manager;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.log4j.Logger;
-
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 
 import com.cloud.agent.api.Answer;
@@ -43,11 +44,16 @@ public class DirectAgentAttache extends AgentAttache {
     List<ScheduledFuture<?>> _futures = new ArrayList<ScheduledFuture<?>>();
     AgentManagerImpl _mgr;
     long _seq = 0;
+    LinkedList<Task> tasks = new LinkedList<Task>();
+    AtomicInteger _outstandingTaskCount;
+    AtomicInteger _outstandingCronTaskCount;
 
     public DirectAgentAttache(AgentManagerImpl agentMgr, long id, String name, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) {
         super(agentMgr, id, name, maintenance);
         _resource = resource;
         _mgr = mgr;
+        _outstandingTaskCount = new AtomicInteger(0);
+        _outstandingCronTaskCount = new AtomicInteger(0);
     }
 
     @Override
@@ -90,15 +96,16 @@ public class DirectAgentAttache extends AgentAttache {
             if (answers != null && answers[0] instanceof StartupAnswer) {
                 StartupAnswer startup = (StartupAnswer)answers[0];
                 int interval = startup.getPingInterval();
-                _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
+                _futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
             }
         } else {
             Command[] cmds = req.getCommands();
             if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) {
-                _agentMgr.getDirectAgentPool().execute(new Task(req));
+                queueTask(new Task(req));
+                scheduleFromQueue();
             } else {
                 CronCommand cmd = (CronCommand)cmds[0];
-                _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
+                _futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new CronTask(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
             }
         }
     }
@@ -109,7 +116,7 @@ public class DirectAgentAttache extends AgentAttache {
             StartupAnswer startup = (StartupAnswer)answers[0];
             int interval = startup.getPingInterval();
             s_logger.info("StartupAnswer received " + startup.getHostId() + " Interval = " + interval);
-            _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
+            _futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
         }
     }
 
@@ -128,13 +135,26 @@ public class DirectAgentAttache extends AgentAttache {
         }
     }
 
+    private synchronized void queueTask(Task task) {
+        tasks.add(task);
+    }
+
+    private synchronized void scheduleFromQueue() {
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace("Agent attache=" + _id + ", task queue size=" + tasks.size() + ", outstanding tasks=" + _outstandingTaskCount.get());
+        }
+        while (!tasks.isEmpty() && _outstandingTaskCount.get() < _agentMgr.getDirectAgentThreadCap()) {
+            _outstandingTaskCount.incrementAndGet();
+            _agentMgr.getDirectAgentPool().execute(tasks.remove());
+        }
+    }
+
     protected class PingTask extends ManagedContextRunnable {
         @Override
         protected synchronized void runInContext() {
             try {
-                if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) {
-                    s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() +
-                        "), bailing out");
+                if (_outstandingCronTaskCount.incrementAndGet() >= _agentMgr.getDirectAgentThreadCap()) {
+                    s_logger.warn("PingTask execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out");
                     return;
                 }
 
@@ -162,15 +182,15 @@ public class DirectAgentAttache extends AgentAttache {
             } catch (Exception e) {
                 s_logger.warn("Unable to complete the ping task", e);
             } finally {
-                _outstandingTaskCount.decrementAndGet();
+                _outstandingCronTaskCount.decrementAndGet();
             }
         }
     }
 
-    protected class Task extends ManagedContextRunnable {
+    protected class CronTask extends ManagedContextRunnable {
         Request _req;
 
-        public Task(Request req) {
+        public CronTask(Request req) {
             _req = req;
         }
 
@@ -194,9 +214,8 @@ public class DirectAgentAttache extends AgentAttache {
         protected void runInContext() {
             long seq = _req.getSequence();
             try {
-                if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) {
-                    s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() +
-                        "), bailing out");
+                if (_outstandingCronTaskCount.incrementAndGet() >= _agentMgr.getDirectAgentThreadCap()) {
+                    s_logger.warn("CronTask execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out");
                     bailout();
                     return;
                 }
@@ -243,9 +262,67 @@ public class DirectAgentAttache extends AgentAttache {
             } catch (Exception e) {
                 s_logger.warn(log(seq, "Exception caught "), e);
             } finally {
-                _outstandingTaskCount.decrementAndGet();
+                _outstandingCronTaskCount.decrementAndGet();
             }
         }
     }
 
+    protected class Task extends ManagedContextRunnable {
+        Request _req;
+
+        public Task(Request req) {
+            _req = req;
+        }
+
+        @Override
+        protected void runInContext() {
+            long seq = _req.getSequence();
+            try {
+                ServerResource resource = _resource;
+                Command[] cmds = _req.getCommands();
+                boolean stopOnError = _req.stopOnError();
+
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug(log(seq, "Executing request"));
+                }
+                ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
+                for (int i = 0; i < cmds.length; i++) {
+                    Answer answer = null;
+                    try {
+                        if (resource != null) {
+                            answer = resource.executeRequest(cmds[i]);
+                            if (answer == null) {
+                                s_logger.warn("Resource returned null answer!");
+                                answer = new Answer(cmds[i], false, "Resource returned null answer");
+                            }
+                        } else {
+                            answer = new Answer(cmds[i], false, "Agent is disconnected");
+                        }
+                    } catch (Exception e) {
+                        s_logger.warn(log(seq, "Exception Caught while executing command"), e);
+                        answer = new Answer(cmds[i], false, e.toString());
+                    }
+                    answers.add(answer);
+                    if (!answer.getResult() && stopOnError) {
+                        if (i < cmds.length - 1 && s_logger.isDebugEnabled()) {
+                            s_logger.debug(log(seq, "Cancelling because one of the answers is false and it is stop on error."));
+                        }
+                        break;
+                    }
+                }
+
+                Response resp = new Response(_req, answers.toArray(new Answer[answers.size()]));
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug(log(seq, "Response Received: "));
+                }
+
+                processAnswers(seq, resp);
+            } catch (Exception e) {
+                s_logger.warn(log(seq, "Exception caught "), e);
+            } finally {
+                _outstandingTaskCount.decrementAndGet();
+                scheduleFromQueue();
+            }
+        }
+    }
 }