You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ko...@apache.org on 2014/05/22 10:45:01 UTC

git commit: updated refs/heads/4.4-forward to 120da60

Repository: cloudstack
Updated Branches:
  refs/heads/4.4-forward 0d243ec7f -> 120da605b


CLOUDSTACK-6740: Direct agent command throttling improvements
List of changes:
1. Created a separate thread pool for handling cron and ping tasks. The size of the pool is based on direct.agent.pool.size. The existing direct agent pool will run all commands other than cron and ping.
2. For normal tasks (generated as part of user/admin API calls), if throttle limit is reached then tasks get queued up for subsequent execution once threads are available.
3. For cron and ping tasks (internally generated by MS like ping, VM sync etc.), if throttle limit is reached then these gets rejected. Since these are internally generated these can be rejected without any issues.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/120da605
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/120da605
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/120da605

Branch: refs/heads/4.4-forward
Commit: 120da605b06e6cd5dc04c6d1eab9e10005bd6807
Parents: 0d243ec
Author: Koushik Das <ko...@apache.org>
Authored: Thu May 22 14:07:13 2014 +0530
Committer: Koushik Das <ko...@apache.org>
Committed: Thu May 22 14:07:13 2014 +0530

----------------------------------------------------------------------
 .../com/cloud/agent/manager/AgentAttache.java   |   3 -
 .../cloud/agent/manager/AgentManagerImpl.java   |   8 ++
 .../cloud/agent/manager/DirectAgentAttache.java | 107 ++++++++++++++++---
 3 files changed, 100 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/120da605/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
index fd1531e..24a6fe7 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 import org.apache.log4j.Logger;
@@ -107,7 +106,6 @@ public abstract class AgentAttache {
     protected Status _status = Status.Connecting;
     protected boolean _maintenance;
     protected long _nextSequence;
-    protected AtomicInteger _outstandingTaskCount;
 
     protected AgentManagerImpl _agentMgr;
 
@@ -130,7 +128,6 @@ public abstract class AgentAttache {
         _requests = new LinkedList<Request>();
         _agentMgr = agentMgr;
         _nextSequence = new Long(s_rand.nextInt(Short.MAX_VALUE)) << 48;
-        _outstandingTaskCount = new AtomicInteger(0);
     }
 
     public synchronized long getNextSequence() {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/120da605/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
index f43086f..2d0be24 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
@@ -162,6 +162,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
     protected ExecutorService _executor;
     protected ThreadPoolExecutor _connectExecutor;
     protected ScheduledExecutorService _directAgentExecutor;
+    protected ScheduledExecutorService _cronJobExecutor;
     protected ScheduledExecutorService _monitorExecutor;
 
     private int _directAgentThreadCap;
@@ -219,7 +220,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
         _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this);
         s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers");
 
+        // executes all agent commands other than cron and ping
         _directAgentExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgent"));
+        // executes cron and ping agent commands
+        _cronJobExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgentCronJob"));
         s_logger.debug("Created DirectAgentAttache pool with size: " + DirectAgentPoolSize.value());
         _directAgentThreadCap = Math.round(DirectAgentPoolSize.value() * DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0
 
@@ -1451,6 +1455,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
         return _directAgentExecutor;
     }
 
+    public ScheduledExecutorService getCronJobPool() {
+        return _cronJobExecutor;
+    }
+
     public int getDirectAgentThreadCap() {
         return _directAgentThreadCap;
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/120da605/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
index 9874ee4..7ca6929 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
@@ -17,12 +17,13 @@
 package com.cloud.agent.manager;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.log4j.Logger;
-
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 
 import com.cloud.agent.api.Answer;
@@ -43,11 +44,16 @@ public class DirectAgentAttache extends AgentAttache {
     List<ScheduledFuture<?>> _futures = new ArrayList<ScheduledFuture<?>>();
     AgentManagerImpl _mgr;
     long _seq = 0;
+    LinkedList<Task> tasks = new LinkedList<Task>();
+    AtomicInteger _outstandingTaskCount;
+    AtomicInteger _outstandingCronTaskCount;
 
     public DirectAgentAttache(AgentManagerImpl agentMgr, long id, String name, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) {
         super(agentMgr, id, name, maintenance);
         _resource = resource;
         _mgr = mgr;
+        _outstandingTaskCount = new AtomicInteger(0);
+        _outstandingCronTaskCount = new AtomicInteger(0);
     }
 
     @Override
@@ -90,15 +96,16 @@ public class DirectAgentAttache extends AgentAttache {
             if (answers != null && answers[0] instanceof StartupAnswer) {
                 StartupAnswer startup = (StartupAnswer)answers[0];
                 int interval = startup.getPingInterval();
-                _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
+                _futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
             }
         } else {
             Command[] cmds = req.getCommands();
             if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) {
-                _agentMgr.getDirectAgentPool().execute(new Task(req));
+                queueTask(new Task(req));
+                scheduleFromQueue();
             } else {
                 CronCommand cmd = (CronCommand)cmds[0];
-                _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
+                _futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new CronTask(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
             }
         }
     }
@@ -109,7 +116,7 @@ public class DirectAgentAttache extends AgentAttache {
             StartupAnswer startup = (StartupAnswer)answers[0];
             int interval = startup.getPingInterval();
             s_logger.info("StartupAnswer received " + startup.getHostId() + " Interval = " + interval);
-            _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
+            _futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
         }
     }
 
@@ -128,13 +135,26 @@ public class DirectAgentAttache extends AgentAttache {
         }
     }
 
+    private synchronized void queueTask(Task task) {
+        tasks.add(task);
+    }
+
+    private synchronized void scheduleFromQueue() {
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace("Agent attache=" + _id + ", task queue size=" + tasks.size() + ", outstanding tasks=" + _outstandingTaskCount.get());
+        }
+        while (!tasks.isEmpty() && _outstandingTaskCount.get() < _agentMgr.getDirectAgentThreadCap()) {
+            _outstandingTaskCount.incrementAndGet();
+            _agentMgr.getDirectAgentPool().execute(tasks.remove());
+        }
+    }
+
     protected class PingTask extends ManagedContextRunnable {
         @Override
         protected synchronized void runInContext() {
             try {
-                if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) {
-                    s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() +
-                        "), bailing out");
+                if (_outstandingCronTaskCount.incrementAndGet() >= _agentMgr.getDirectAgentThreadCap()) {
+                    s_logger.warn("PingTask execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out");
                     return;
                 }
 
@@ -162,15 +182,15 @@ public class DirectAgentAttache extends AgentAttache {
             } catch (Exception e) {
                 s_logger.warn("Unable to complete the ping task", e);
             } finally {
-                _outstandingTaskCount.decrementAndGet();
+                _outstandingCronTaskCount.decrementAndGet();
             }
         }
     }
 
-    protected class Task extends ManagedContextRunnable {
+    protected class CronTask extends ManagedContextRunnable {
         Request _req;
 
-        public Task(Request req) {
+        public CronTask(Request req) {
             _req = req;
         }
 
@@ -194,9 +214,8 @@ public class DirectAgentAttache extends AgentAttache {
         protected void runInContext() {
             long seq = _req.getSequence();
             try {
-                if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) {
-                    s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() +
-                        "), bailing out");
+                if (_outstandingCronTaskCount.incrementAndGet() >= _agentMgr.getDirectAgentThreadCap()) {
+                    s_logger.warn("CronTask execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out");
                     bailout();
                     return;
                 }
@@ -243,9 +262,67 @@ public class DirectAgentAttache extends AgentAttache {
             } catch (Exception e) {
                 s_logger.warn(log(seq, "Exception caught "), e);
             } finally {
-                _outstandingTaskCount.decrementAndGet();
+                _outstandingCronTaskCount.decrementAndGet();
             }
         }
     }
 
+    protected class Task extends ManagedContextRunnable {
+        Request _req;
+
+        public Task(Request req) {
+            _req = req;
+        }
+
+        @Override
+        protected void runInContext() {
+            long seq = _req.getSequence();
+            try {
+                ServerResource resource = _resource;
+                Command[] cmds = _req.getCommands();
+                boolean stopOnError = _req.stopOnError();
+
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug(log(seq, "Executing request"));
+                }
+                ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
+                for (int i = 0; i < cmds.length; i++) {
+                    Answer answer = null;
+                    try {
+                        if (resource != null) {
+                            answer = resource.executeRequest(cmds[i]);
+                            if (answer == null) {
+                                s_logger.warn("Resource returned null answer!");
+                                answer = new Answer(cmds[i], false, "Resource returned null answer");
+                            }
+                        } else {
+                            answer = new Answer(cmds[i], false, "Agent is disconnected");
+                        }
+                    } catch (Exception e) {
+                        s_logger.warn(log(seq, "Exception Caught while executing command"), e);
+                        answer = new Answer(cmds[i], false, e.toString());
+                    }
+                    answers.add(answer);
+                    if (!answer.getResult() && stopOnError) {
+                        if (i < cmds.length - 1 && s_logger.isDebugEnabled()) {
+                            s_logger.debug(log(seq, "Cancelling because one of the answers is false and it is stop on error."));
+                        }
+                        break;
+                    }
+                }
+
+                Response resp = new Response(_req, answers.toArray(new Answer[answers.size()]));
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug(log(seq, "Response Received: "));
+                }
+
+                processAnswers(seq, resp);
+            } catch (Exception e) {
+                s_logger.warn(log(seq, "Exception caught "), e);
+            } finally {
+                _outstandingTaskCount.decrementAndGet();
+                scheduleFromQueue();
+            }
+        }
+    }
 }