You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2014/01/15 16:12:51 UTC

git commit: AMBARI-4290. Run Various Requests in Parallel so that one does not wait on the other. (dlysnichenko)

Updated Branches:
  refs/heads/trunk 1763922ec -> af56baa0b


AMBARI-4290. Run Various Requests in Parallel so that one does not wait on the other. (dlysnichenko)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/af56baa0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/af56baa0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/af56baa0

Branch: refs/heads/trunk
Commit: af56baa0b497869398726ae6b6c78b8707072d95
Parents: 1763922
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Wed Jan 15 17:07:32 2014 +0200
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Wed Jan 15 17:07:32 2014 +0200

----------------------------------------------------------------------
 ambari-server/conf/unix/ambari.properties       |   1 +
 .../server/actionmanager/ActionManager.java     |   5 +-
 .../server/actionmanager/ActionScheduler.java   | 123 ++++++++----
 .../server/configuration/Configuration.java     |  17 ++
 .../actionmanager/TestActionDBAccessorImpl.java |   3 +-
 .../server/actionmanager/TestActionManager.java |   9 +-
 .../actionmanager/TestActionScheduler.java      | 195 +++++++++++++++++--
 .../server/agent/TestHeartbeatHandler.java      |   4 +-
 8 files changed, 302 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/conf/unix/ambari.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/unix/ambari.properties b/ambari-server/conf/unix/ambari.properties
index ebb5d9c..563c04f 100644
--- a/ambari-server/conf/unix/ambari.properties
+++ b/ambari-server/conf/unix/ambari.properties
@@ -31,6 +31,7 @@ bootstrap.setup_agent.script=/usr/lib/python2.6/site-packages/ambari_server/setu
 api.authenticate=true
 server.connection.max.idle.millis=900000
 server.fqdn.service.url=http://169.254.169.254/latest/meta-data/public-hostname
+server.stages.parallel=true
 
 # Scheduler settings
 server.execution.scheduler.isClustered=false

http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index af25149..8e6fb13 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -24,6 +24,7 @@ import com.google.inject.persist.UnitOfWork;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.ExecuteActionRequest;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.serveraction.ServerActionManager;
@@ -58,11 +59,11 @@ public class ActionManager {
                        @Named("actionTimeout") long actionTimeout,
                        ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap,
                        ServerActionManager serverActionManager, UnitOfWork unitOfWork, CustomActionDBAccessor cdb,
-                       RequestFactory requestFactory) {
+                       RequestFactory requestFactory, Configuration configuration) {
     this.actionQueue = aq;
     this.db = db;
     scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
-        actionQueue, fsm, 2, hostsMap, serverActionManager, unitOfWork);
+        actionQueue, fsm, 2, hostsMap, serverActionManager, unitOfWork, configuration);
     requestCounter = new AtomicLong(
         db.getLastPersistedRequestIdWhenInitialized());
     this.cdb = cdb;

http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 02c94b8..47f56eb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.actionmanager;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -33,6 +34,7 @@ import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.serveraction.ServerAction;
 import org.apache.ambari.server.serveraction.ServerActionManager;
@@ -75,6 +77,7 @@ class ActionScheduler implements Runnable {
   private final HostsMap hostsMap;
   private final Object wakeupSyncObject = new Object();
   private final ServerActionManager serverActionManager;
+  private final Configuration configuration;
 
   /**
    * true if scheduler should run ASAP.
@@ -87,7 +90,8 @@ class ActionScheduler implements Runnable {
 
   public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
       ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
-      int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager, UnitOfWork unitOfWork) {
+      int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager,
+      UnitOfWork unitOfWork, Configuration configuration) {
     this.sleepTime = sleepTimeMilliSec;
     this.hostsMap = hostsMap;
     this.actionTimeout = actionTimeoutMilliSec;
@@ -100,6 +104,7 @@ class ActionScheduler implements Runnable {
     this.clusterHostInfoCache = CacheBuilder.newBuilder().
         expireAfterAccess(5, TimeUnit.MINUTES).
         build();
+    this.configuration = configuration;
   }
 
   public void start() {
@@ -149,7 +154,8 @@ class ActionScheduler implements Runnable {
   public void doWork() throws AmbariException {
     try {
       unitOfWork.begin();
-
+      Set<String> runningRequestIds = new HashSet<String>();
+      Set<String> affectedHosts = new HashSet<String>();
       List<Stage> stages = db.getStagesInProgress();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Scheduler wakes up");
@@ -163,9 +169,37 @@ class ActionScheduler implements Runnable {
       }
 
       for (Stage s : stages) {
+        // Check if we can process this stage in parallel with another stages
+
+        long requestId = s.getRequestId();
+        // Convert to string to avoid glitches with boxing/unboxing
+        String requestIdStr = String.valueOf(requestId);
+        if (runningRequestIds.contains(requestIdStr)) {
+          // We don't want to process different stages from the same request in parallel
+          continue;
+        } else {
+          runningRequestIds.add(requestIdStr);
+        }
+
+
+        List<String> stageHosts = s.getHosts();
+        boolean conflict = false;
+        for (String host : stageHosts) {
+          if (affectedHosts.contains(host)) {
+            conflict = true;
+            break;
+          }
+        }
+        if (conflict) {
+          // Also we don't want to perform stages in parallel at the same hosts
+          continue;
+        } else {
+          affectedHosts.addAll(stageHosts);
+        }
+
         List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
         Map<String, RoleStats> roleStats = processInProgressStage(s, commandsToSchedule);
-        //Check if stage is failed
+        // Check if stage is failed
         boolean failed = false;
         for (String role : roleStats.keySet()) {
           RoleStats stats = roleStats.get(role);
@@ -193,23 +227,14 @@ class ActionScheduler implements Runnable {
         //Schedule what we have so far
         for (ExecutionCommand cmd : commandsToSchedule) {
           if (Role.valueOf(cmd.getRole()).equals(Role.AMBARI_SERVER_ACTION)) {
-            try {
-              long now = System.currentTimeMillis();
-              String hostName = cmd.getHostname();
-              String roleName = cmd.getRole();
-
-              s.setStartTime(hostName, roleName, now);
-              s.setLastAttemptTime(hostName, roleName, now);
-              s.incrementAttemptCount(hostName, roleName);
-              s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED);
-              db.hostRoleScheduled(s, hostName, roleName);
-              String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME);
-              this.serverActionManager.executeAction(actionName, cmd.getCommandParams());
-              reportServerActionSuccess(s, cmd);
-            } catch (AmbariException e) {
-              LOG.warn("Could not execute server action " + cmd.toString(), e);
-              reportServerActionFailure(s, cmd, e.getMessage());
-            }
+            /**
+             * We don't forbid executing any stages in parallel with
+             * AMBARI_SERVER_ACTION. That  should be OK as AMBARI_SERVER_ACTION
+             * is not used as of now. The general motivation has been to update
+             * Request status when last task associated with the
+             * Request is finished.
+             */
+            executeServerAction(s, cmd);
           } else {
             try {
               scheduleHostRole(s, cmd);
@@ -220,16 +245,7 @@ class ActionScheduler implements Runnable {
           }
         }
 
-        //Check if ready to go to next stage
-        boolean goToNextStage = true;
-        for (String role : roleStats.keySet()) {
-          RoleStats stats = roleStats.get(role);
-          if (!stats.isSuccessFactorMet()) {
-            goToNextStage = false;
-            break;
-          }
-        }
-        if (!goToNextStage) {
+        if (! configuration.getParallelStageExecution()) { // If disabled
           return;
         }
       }
@@ -239,10 +255,36 @@ class ActionScheduler implements Runnable {
     }
   }
 
+
+  /**
+   * Executes internal ambari-server action
+   */
+  private void executeServerAction(Stage s, ExecutionCommand cmd) {
+    try {
+      long now = System.currentTimeMillis();
+      String hostName = cmd.getHostname();
+      String roleName = cmd.getRole();
+
+      s.setStartTime(hostName, roleName, now);
+      s.setLastAttemptTime(hostName, roleName, now);
+      s.incrementAttemptCount(hostName, roleName);
+      s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED);
+      db.hostRoleScheduled(s, hostName, roleName);
+      String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME);
+      this.serverActionManager.executeAction(actionName, cmd.getCommandParams());
+      reportServerActionSuccess(s, cmd);
+
+    } catch (AmbariException e) {
+      LOG.warn("Could not execute server action " + cmd.toString(), e);
+      reportServerActionFailure(s, cmd, e.getMessage());
+    }
+  }
+
   private boolean hasPreviousStageFailed(Stage stage) {
     boolean failed = false;
     long prevStageId = stage.getStageId() - 1;
     if (prevStageId > 0) {
+      // Find previous stage instance
       List<Stage> allStages = db.getAllStages(stage.getRequestId());
       Stage prevStage = null;
       for (Stage s : allStages) {
@@ -293,7 +335,7 @@ class ActionScheduler implements Runnable {
     report.setStdOut("Server action succeeded");
     report.setStdErr("");
     db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
-                           cmd.getRole().toString(), report);
+            cmd.getRole(), report);
   }
 
   private void reportServerActionFailure(Stage stage, ExecutionCommand cmd, String message) {
@@ -303,13 +345,15 @@ class ActionScheduler implements Runnable {
     report.setStdOut("Server action failed");
     report.setStdErr(message);
     db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
-                           cmd.getRole().toString(), report);
+            cmd.getRole(), report);
   }
 
   /**
-   * @param commandsToSchedule
    * @return Stats for the roles in the stage. It is used to determine whether stage
    * has succeeded or failed.
+   * Side effects:
+   * This method processes command timeouts and retry attempts, and
+   * adds new (pending) execution commands to commandsToSchedule list.
    */
   private Map<String, RoleStats> processInProgressStage(Stage s,
       List<ExecutionCommand> commandsToSchedule) throws AmbariException {
@@ -328,6 +372,8 @@ class ActionScheduler implements Runnable {
         ExecutionCommand c = wrapper.getExecutionCommand();
         String roleStr = c.getRole();
         HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
+
+        // Process command timeouts
         if (timeOutActionNeeded(status, s, hostObj, roleStr, now,
           taskTimeout)) {
           LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:"
@@ -361,6 +407,7 @@ class ActionScheduler implements Runnable {
             // Dequeue command
             actionQueue.dequeue(host, c.getCommandId());
           } else {
+            // reschedule command
             commandsToSchedule.add(c);
           }
         } else if (status.equals(HostRoleStatus.PENDING)) {
@@ -373,8 +420,14 @@ class ActionScheduler implements Runnable {
     return roleStats;
   }
 
+
+  /**
+   * Populates a map < role_name, role_stats>.
+   */
   private Map<String, RoleStats> initRoleStats(Stage s) {
+    // Meaning: how many hosts are affected by commands for each role
     Map<Role, Integer> hostCountsForRoles = new HashMap<Role, Integer>();
+    // < role_name, rolestats >
     Map<String, RoleStats> roleStats = new TreeMap<String, RoleStats>();
 
     for (String host : s.getHostRoleCommands().keySet()) {
@@ -419,8 +472,10 @@ class ActionScheduler implements Runnable {
   private void scheduleHostRole(Stage s, ExecutionCommand cmd)
       throws InvalidStateTransitionException, AmbariException {
     long now = System.currentTimeMillis();
-    String roleStr = cmd.getRole().toString();
+    String roleStr = cmd.getRole();
     String hostname = cmd.getHostname();
+
+    // start time is -1 if host role command is not started yet
     if (s.getStartTime(hostname, roleStr) < 0) {
       if (RoleCommand.ACTIONEXECUTE != cmd.getRoleCommand()) {
         try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 6b2301d..6af54c0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -279,6 +279,16 @@ public class Configuration {
   public static final String DEFAULT_EXECUTION_SCHEDULER_MISFIRE_TOLERATION = "480";
   public static final String DEFAULT_SCHEDULER_START_DELAY_SECONDS = "120";
 
+
+  /**
+   * This key defines whether stages of parallel requests are executed in
+   * parallel or sequentally. Only stages from different requests
+   * running on not interfering host sets may be executed in parallel.
+   */
+  public static final String PARALLEL_STAGE_EXECUTION_KEY =
+          "server.stages.parallel";
+  private static final String PARALLEL_STAGE_EXECUTION_DEFAULT = "true";
+
   private static final Logger LOG = LoggerFactory.getLogger(
       Configuration.class);
 
@@ -345,6 +355,8 @@ public class Configuration {
       CLIENT_API_SSL_CRT_NAME_KEY, CLIENT_API_SSL_CRT_NAME_DEFAULT));
     configsMap.put(JAVA_HOME_KEY, properties.getProperty(
         JAVA_HOME_KEY));
+    configsMap.put(PARALLEL_STAGE_EXECUTION_KEY, properties.getProperty(
+            PARALLEL_STAGE_EXECUTION_KEY, PARALLEL_STAGE_EXECUTION_DEFAULT));
 
     File passFile = new File(configsMap.get(SRVR_KSTR_DIR_KEY) + File.separator
         + configsMap.get(SRVR_CRT_PASS_FILE_KEY));
@@ -909,4 +921,9 @@ public class Configuration {
       DEFAULT_SCHEDULER_START_DELAY_SECONDS);
     return Integer.parseInt(delay);
   }
+
+  public boolean getParallelStageExecution() {
+    return "true".equalsIgnoreCase(configsMap.get(PARALLEL_STAGE_EXECUTION_KEY));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index 6f8c884..c09c8bb 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -83,7 +83,8 @@ public class TestActionDBAccessorImpl {
     cdb = injector.getInstance(CustomActionDBAccessor.class);
 
     am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class), cdb, injector.getInstance(RequestFactory.class));
+        new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class), cdb, 
+		injector.getInstance(RequestFactory.class), null);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
index a266cc6..648e935 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
@@ -84,7 +84,8 @@ public class TestActionManager {
   public void testActionResponse() {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class));
+        clusters, db, new HostsMap((String) null), null, unitOfWork, null, 
+        injector.getInstance(RequestFactory.class), null);
     populateActionDB(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -124,7 +125,8 @@ public class TestActionManager {
   public void testLargeLogs() {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class));
+        clusters, db, new HostsMap((String) null), null, unitOfWork, null,
+        injector.getInstance(RequestFactory.class), null);
     populateActionDB(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -213,7 +215,8 @@ public class TestActionManager {
 
     replay(queue, db, clusters);
 
-    ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, null, unitOfWork, null, injector.getInstance(RequestFactory.class));
+    ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, null, unitOfWork, null,
+        injector.getInstance(RequestFactory.class), null);
     assertSame(listStages, manager.getActions(requestId));
 
     verify(queue, db, clusters);

http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index bce150a..5925fbb 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -33,6 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import com.google.common.reflect.TypeToken;
@@ -47,6 +48,7 @@ import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.AgentCommand;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.serveraction.ServerAction;
 import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
@@ -88,6 +90,8 @@ public class TestActionScheduler {
     Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
     
     ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
     Clusters fsm = mock(Clusters.class);
     Cluster oneClusterMock = mock(Cluster.class);
     Service serviceObj = mock(Service.class);
@@ -114,7 +118,7 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
-        10000, new HostsMap((String) null), null, unitOfWork);
+        10000, new HostsMap((String) null), null, unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
     scheduler.start();
@@ -161,6 +165,8 @@ public class TestActionScheduler {
   @Test
   public void testActionTimeout() throws Exception {
     ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
     Clusters fsm = mock(Clusters.class);
     Cluster oneClusterMock = mock(Cluster.class);
     Service serviceObj = mock(Service.class);
@@ -198,7 +204,7 @@ public class TestActionScheduler {
 
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, 
-        new HostsMap((String) null), null, unitOfWork);
+        new HostsMap((String) null), null, unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
     scheduler.start();
@@ -216,6 +222,8 @@ public class TestActionScheduler {
   @Test
   public void testActionTimeoutForLostHost() throws Exception {
     ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
     Clusters fsm = mock(Clusters.class);
     Cluster oneClusterMock = mock(Cluster.class);
     Service serviceObj = mock(Service.class);
@@ -253,7 +261,7 @@ public class TestActionScheduler {
 
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-      new HostsMap((String) null), null, unitOfWork);
+      new HostsMap((String) null), null, unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
     scheduler.start();
@@ -274,6 +282,8 @@ public class TestActionScheduler {
   @Test
   public void testServerAction() throws Exception {
     ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
     Clusters fsm = mock(Clusters.class);
     Cluster oneClusterMock = mock(Cluster.class);
     Service serviceObj = mock(Service.class);
@@ -310,7 +320,8 @@ public class TestActionScheduler {
 
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork);
+        new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+        unitOfWork, conf);
     scheduler.start();
 
     while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
@@ -328,6 +339,8 @@ public class TestActionScheduler {
   @Test
   public void testServerActionFailed() throws Exception {
     ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
     Clusters fsm = mock(Clusters.class);
     Cluster oneClusterMock = mock(Cluster.class);
     Service serviceObj = mock(Service.class);
@@ -363,7 +376,7 @@ public class TestActionScheduler {
 
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork);
+        new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, conf);
     scheduler.start();
 
     while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
@@ -392,6 +405,152 @@ public class TestActionScheduler {
     return stage;
   }
 
+
+  /**
+   * Verifies that stages that are executed on different hosts and
+   * rely to different requests are scheduled to be  executed in parallel
+   */
+  @Test
+  public void testIndependentStagesExecution() throws Exception {
+    ActionQueue aq = new ActionQueue();
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    RequestFactory requestFactory = mock(RequestFactory.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    String hostname1 = "ahost.ambari.apache.org";
+    String hostname2 = "bhost.ambari.apache.org";
+    String hostname3 = "chost.ambari.apache.org";
+    String hostname4 = "chost.ambari.apache.org";
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(
+            getStageWithSingleTask(
+                    hostname1, "cluster1", Role.DATANODE,
+                    RoleCommand.START, Service.Type.HDFS, 1, 1, 1));
+    stages.add( // Stage with the same hostname, should not be scheduled
+            getStageWithSingleTask(
+                    hostname1, "cluster1", Role.GANGLIA_MONITOR,
+                    RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
+
+    stages.add(
+            getStageWithSingleTask(
+                    hostname2, "cluster1", Role.DATANODE,
+                    RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
+
+    stages.add(
+            getStageWithSingleTask(
+                    hostname3, "cluster1", Role.DATANODE,
+                    RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
+
+    stages.add( // Stage with the same request id, should not be scheduled
+            getStageWithSingleTask(
+                    hostname4, "cluster1", Role.GANGLIA_MONITOR,
+                    RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+    when(db.getStagesInProgress()).thenReturn(stages);
+
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+            new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+            unitOfWork, conf);
+
+    ActionManager am = new ActionManager(
+            2, 2, aq, fsm, db, new HostsMap((String) null),
+            new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory, conf);
+
+    scheduler.doWork();
+
+    Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
+    Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, "GANGLIA_MONITOR"));
+    Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
+    Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(3).getHostRoleStatus(hostname3, "DATANODE"));
+    Assert.assertEquals(HostRoleStatus.PENDING, stages.get(4).getHostRoleStatus(hostname4, "GANGLIA_MONITOR"));
+  }
+
+
+  /**
+   * Verifies that ActionScheduler respects "disable parallel stage execution option"
+   */
+  @Test
+  public void testIndependentStagesExecutionDisabled() throws Exception {
+    ActionQueue aq = new ActionQueue();
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    RequestFactory requestFactory = mock(RequestFactory.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    String hostname1 = "ahost.ambari.apache.org";
+    String hostname2 = "bhost.ambari.apache.org";
+    String hostname3 = "chost.ambari.apache.org";
+    String hostname4 = "chost.ambari.apache.org";
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(
+            getStageWithSingleTask(
+                    hostname1, "cluster1", Role.DATANODE,
+                    RoleCommand.START, Service.Type.HDFS, 1, 1, 1));
+    stages.add( // Stage with the same hostname, should not be scheduled
+            getStageWithSingleTask(
+                    hostname1, "cluster1", Role.GANGLIA_MONITOR,
+                    RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
+
+    stages.add(
+            getStageWithSingleTask(
+                    hostname2, "cluster1", Role.DATANODE,
+                    RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
+
+    stages.add(
+            getStageWithSingleTask(
+                    hostname3, "cluster1", Role.DATANODE,
+                    RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
+
+    stages.add( // Stage with the same request id, should not be scheduled
+            getStageWithSingleTask(
+                    hostname4, "cluster1", Role.GANGLIA_MONITOR,
+                    RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+    when(db.getStagesInProgress()).thenReturn(stages);
+
+    Properties properties = new Properties();
+    properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
+    Configuration conf = new Configuration(properties);
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+            new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+            unitOfWork, conf);
+
+    ActionManager am = new ActionManager(
+            2, 2, aq, fsm, db, new HostsMap((String) null),
+            new ServerActionManagerImpl(fsm), unitOfWork, null,
+            requestFactory, conf);
+
+    scheduler.doWork();
+
+    Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
+    Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, "GANGLIA_MONITOR"));
+    Assert.assertEquals(HostRoleStatus.PENDING, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
+    Assert.assertEquals(HostRoleStatus.PENDING, stages.get(3).getHostRoleStatus(hostname3, "DATANODE"));
+    Assert.assertEquals(HostRoleStatus.PENDING, stages.get(4).getHostRoleStatus(hostname4, "GANGLIA_MONITOR"));
+  }
+
+
   @Test
   public void testRequestFailureOnStageFailure() throws Exception {
     ActionQueue aq = new ActionQueue();
@@ -478,11 +637,13 @@ public class TestActionScheduler {
       }
     }).when(db).abortOperation(anyLong());
 
-
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork);
+        new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+        unitOfWork, conf);
     ActionManager am = new ActionManager(
-        2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory);
+        2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -623,10 +784,13 @@ public class TestActionScheduler {
       }
     }).when(db).abortOperation(anyLong());
 
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork);
+        new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+        unitOfWork, conf);
     ActionManager am = new ActionManager(
-        2, 10000, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory);
+        2, 10000, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -792,10 +956,13 @@ public class TestActionScheduler {
       }
     }).when(db).abortOperation(anyLong());
 
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork);
+        new HostsMap((String) null),
+        new ServerActionManagerImpl(fsm), unitOfWork, conf);
     ActionManager am = new ActionManager(
-        2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory);
+        2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -893,6 +1060,8 @@ public class TestActionScheduler {
     int requestId2 = 2;
     
     ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
     Clusters fsm = mock(Clusters.class);
     Cluster oneClusterMock = mock(Cluster.class);
     Service serviceObj = mock(Service.class);
@@ -919,7 +1088,7 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
-        10000, new HostsMap((String) null), null, unitOfWork);
+        10000, new HostsMap((String) null), null, unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
     scheduler.start();

http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index 4e716dc..7b07062 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -413,7 +413,7 @@ public class TestHeartbeatHandler {
     clusters.addCluster(DummyCluster);
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class));
+        new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class), null);
     populateActionDB(db, DummyHostname1);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -1511,7 +1511,7 @@ public class TestHeartbeatHandler {
 
   private ActionManager getMockActionManager() {
     return new ActionManager(0, 0, null, null,
-              actionDBAccessor, new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class));
+              actionDBAccessor, new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class), null);
   }