You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2015/10/28 17:47:07 UTC

ambari git commit: AMBARI-13550. ActionScheduler#filterParallelPerHostStages should not filter out stages with server-side actions (rlevas)

Repository: ambari
Updated Branches:
  refs/heads/trunk 81c608f48 -> 640d15232


AMBARI-13550. ActionScheduler#filterParallelPerHostStages should not filter out stages with server-side actions (rlevas)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/640d1523
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/640d1523
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/640d1523

Branch: refs/heads/trunk
Commit: 640d1523256e923517184e6a22394df7c9530ff8
Parents: 81c608f
Author: Robert Levas <rl...@hortonworks.com>
Authored: Wed Oct 28 12:46:55 2015 -0400
Committer: Robert Levas <rl...@hortonworks.com>
Committed: Wed Oct 28 12:46:55 2015 -0400

----------------------------------------------------------------------
 .../server/actionmanager/ActionScheduler.java   | 92 +++++++++++++++++---
 .../ambari/server/actionmanager/Stage.java      |  2 +-
 .../actionmanager/TestActionScheduler.java      | 92 +++++++++++++++++++-
 3 files changed, 171 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/640d1523/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 3f289b2..effd869 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -274,7 +274,7 @@ class ActionScheduler implements Runnable {
 
         if (runningRequestIds.contains(requestId)) {
           // We don't want to process different stages from the same request in parallel
-          LOG.debug("==> We don't want to process different stages from the same request in parallel" );
+          LOG.debug("==> We don't want to process different stages from the same request in parallel");
           continue;
         } else {
           runningRequestIds.add(requestId);
@@ -430,25 +430,97 @@ class ActionScheduler implements Runnable {
   }
 
   /**
-   * Returns filtered list of stages following the rule:
-   * 1) remove stages that has the same host. Leave only first stage, the rest that have same host of any operation will be filtered
-   * 2) do not remove stages intersected by host if they have intersection by background command
-   * @param stages
-   * @return
+   * Returns filtered list of stages such that the returned list is an ordered list of stages that may
+   * be executed in parallel or in the order in which they are presented
+   * <p/>
+   * Assumption: the list of stages supplied as input are ordered by request id and then stage id.
+   * <p/>
+   * Rules:
+   * <ul>
+   * <li>
+   * Stages are filtered such that the first stage in the list (assumed to be the first pending
+   * stage from the earliest active request) has priority
+   * </li>
+   * <li>
+   * No stage in any request may be executed before an earlier stage in the same request
+   * </li>
+   * <li>
+   * A stages in different requests may be performed in parallel if the relevant hosts for the
+   * stage in the later requests do not intersect with the union of hosts from (pending) stages
+   * in earlier requests
+   * </li>
+   * </ul>
+   *
+   * @param stages the stages to process
+   * @return a list of stages that may be executed in parallel
    */
   private List<Stage> filterParallelPerHostStages(List<Stage> stages) {
     List<Stage> retVal = new ArrayList<Stage>();
     Set<String> affectedHosts = new HashSet<String>();
-    for(Stage s : stages){
+    Set<Long> affectedRequests = new HashSet<Long>();
+
+    for (Stage s : stages) {
+      long requestId = s.getRequestId();
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("==> Processing stage: {}/{} ({}) for {}", requestId, s.getStageId(), s.getRequestContext());
+      }
+
+      boolean addStage = true;
+
+      // Iterate over the relevant hosts for this stage to see if any intersect with the set of
+      // hosts needed for previous stages.  If any intersection occurs, this stage may not be
+      // executed in parallel.
       for (String host : s.getHosts()) {
-        if (!affectedHosts.contains(host)) {
-          if(!isStageHasBackgroundCommandsOnly(s, host)){
+        LOG.trace("===> Processing Host {}", host);
+
+        if (affectedHosts.contains(host)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("===>  Skipping stage since it utilizes at least one host that a previous stage requires: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
+          }
+
+          addStage &= false;
+        } else {
+          if (!Stage.INTERNAL_HOSTNAME.equalsIgnoreCase(host) && !isStageHasBackgroundCommandsOnly(s, host)) {
+            LOG.trace("====>  Adding host to affected hosts: {}", host);
             affectedHosts.add(host);
           }
-          retVal.add(s);
+
+          addStage &= true;
         }
       }
+
+      // If this stage is for a request that we have already processed, the it cannot execute in
+      // parallel since only one stage per request my execute at a time. The first time we encounter
+      // a request id, will be for the first pending stage for that request, so it is a candidate
+      // for execution at this time - if the previous test for host intersection succeeds.
+      if (affectedRequests.contains(requestId)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("===>  Skipping stage since the request it is in has been processed already: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
+        }
+
+        addStage = false;
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("====>  Adding request to affected requests: {}", requestId);
+        }
+
+        affectedRequests.add(requestId);
+        addStage &= true;
+      }
+
+      // If both tests pass - the stage is the first pending stage in its request and the hosts
+      // required in the stage do not intersect with hosts from stages that should occur before this,
+      // than add it to the list of stages that may be executed in parallel.
+      if (addStage) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("===>  Adding stage to return value: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
+        }
+
+        retVal.add(s);
+      }
     }
+
     return retVal;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/640d1523/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 8b2703c..ef50963 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -63,7 +63,7 @@ public class Stage {
    * don't want stages getting confused with Ambari vs cluster hosts, so
    * don't use {@link StageUtils#getHostName()}
    */
-  private static final String INTERNAL_HOSTNAME = "_internal_ambari";
+  public static final String INTERNAL_HOSTNAME = "_internal_ambari";
 
   private static Logger LOG = LoggerFactory.getLogger(Stage.class);
   private final long requestId;

http://git-wip-us.apache.org/repos/asf/ambari/blob/640d1523/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index f8f9ce9..bc4d397 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -636,6 +636,79 @@ public class TestActionScheduler {
   }
 
   /**
+   * Test server actions in multiple requests.
+   *
+   * This is used to make sure the server-side actions do not get filtered out from
+   * {@link org.apache.ambari.server.actionmanager.ActionScheduler#filterParallelPerHostStages(java.util.List)}
+   */
+  @Test
+  public void testServerActionInMultipleRequests() throws Exception {
+    ActionQueue aq = new ActionQueue();
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    String clusterName = "cluster1";
+    String hostname1 = "ahost.ambari.apache.org";
+    String hostname2 = "bhost.ambari.apache.org";
+    HashMap<String, ServiceComponentHost> hosts =
+        new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname1, sch);
+    hosts.put(hostname2, sch);
+    hosts.put(Stage.INTERNAL_HOSTNAME, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+    List<Stage> stages = new ArrayList<Stage>();
+    Stage stage01 = createStage(clusterName, 0, 1);
+    addTask(stage01, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 1);
+
+    Stage stage11 = createStage("cluster1", 1, 1);
+    addTask(stage11, hostname1, clusterName, Role.KERBEROS_CLIENT, RoleCommand.CUSTOM_COMMAND, "KERBEROS", 2);
+
+    Stage stage02 = createStage("cluster1", 0, 2);
+    addTask(stage02, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 3);
+
+    Stage stage12 = createStage("cluster1", 1, 2);
+    addTask(stage12, hostname2, clusterName, Role.KERBEROS_CLIENT, RoleCommand.CUSTOM_COMMAND, "KERBEROS", 4);
+
+    stages.add(stage01);
+    stages.add(stage11);
+    stages.add(stage02);
+    stages.add(stage12);
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    RequestEntity request = mock(RequestEntity.class);
+    when(request.isExclusive()).thenReturn(false);
+    when(db.getRequestEntity(anyLong())).thenReturn(request);
+
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
+    when(db.getStagesInProgress()).thenReturn(stages);
+
+    Properties properties = new Properties();
+    properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
+    Configuration conf = new Configuration(properties);
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null),
+        unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf);
+
+    scheduler.doWork();
+
+    Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(Stage.INTERNAL_HOSTNAME, Role.AMBARI_SERVER_ACTION.name()));
+    Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, Role.KERBEROS_CLIENT.name()));
+    Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(Stage.INTERNAL_HOSTNAME, Role.AMBARI_SERVER_ACTION.name()));
+    Assert.assertEquals(HostRoleStatus.PENDING, stages.get(3).getHostRoleStatus(hostname2, Role.KERBEROS_CLIENT.name()));
+  }
+
+  /**
    * Test server action
    */
   @Test
@@ -1560,21 +1633,32 @@ public class TestActionScheduler {
     return report;
   }
 
-  private Stage getStageWithSingleTask(String hostname, String clusterName, Role role,
-                                       RoleCommand roleCommand, Service.Type service, int taskId,
-                                       int stageId, int requestId) {
+  private Stage createStage(String clusterName, int stageId, int requestId) {
     Stage stage = stageFactory.createNew(requestId, "/tmp", clusterName, 1L, "getStageWithSingleTask",
       CLUSTER_HOST_INFO, "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
     stage.setStageId(stageId);
+    return stage;
+  }
+
+  private Stage addTask(Stage stage, String hostname, String clusterName, Role role,
+                        RoleCommand roleCommand, String serviceName, int taskId) {
     stage.addHostRoleExecutionCommand(hostname, role, roleCommand,
         new ServiceComponentHostUpgradeEvent(role.toString(), hostname, System.currentTimeMillis(), "HDP-0.2"),
-        clusterName, service.toString(), false, false);
+        clusterName, serviceName, false, false);
     stage.getExecutionCommandWrapper(hostname,
         role.toString()).getExecutionCommand();
     stage.getOrderedHostRoleCommands().get(0).setTaskId(taskId);
     return stage;
   }
 
+  private Stage getStageWithSingleTask(String hostname, String clusterName, Role role,
+                                       RoleCommand roleCommand, Service.Type service, int taskId,
+                                       int stageId, int requestId) {
+    Stage stage = createStage(clusterName, stageId, requestId);
+    return addTask(stage, hostname, clusterName, role, roleCommand, service.name(), taskId);
+  }
+
+
   private Stage getStageWithSingleTask(String hostname, String clusterName, Role role, RoleCommand roleCommand,
       String customCommandName, Service.Type service, int taskId, int stageId, int requestId) {
     Stage stage = getStageWithSingleTask(hostname, clusterName, role, roleCommand, service, taskId, stageId, requestId);