You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2015/10/28 17:47:07 UTC
ambari git commit: AMBARI-13550.
ActionScheduler#filterParallelPerHostStages should not filter out stages with
server-side actions (rlevas)
Repository: ambari
Updated Branches:
refs/heads/trunk 81c608f48 -> 640d15232
AMBARI-13550. ActionScheduler#filterParallelPerHostStages should not filter out stages with server-side actions (rlevas)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/640d1523
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/640d1523
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/640d1523
Branch: refs/heads/trunk
Commit: 640d1523256e923517184e6a22394df7c9530ff8
Parents: 81c608f
Author: Robert Levas <rl...@hortonworks.com>
Authored: Wed Oct 28 12:46:55 2015 -0400
Committer: Robert Levas <rl...@hortonworks.com>
Committed: Wed Oct 28 12:46:55 2015 -0400
----------------------------------------------------------------------
.../server/actionmanager/ActionScheduler.java | 92 +++++++++++++++++---
.../ambari/server/actionmanager/Stage.java | 2 +-
.../actionmanager/TestActionScheduler.java | 92 +++++++++++++++++++-
3 files changed, 171 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/640d1523/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 3f289b2..effd869 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -274,7 +274,7 @@ class ActionScheduler implements Runnable {
if (runningRequestIds.contains(requestId)) {
// We don't want to process different stages from the same request in parallel
- LOG.debug("==> We don't want to process different stages from the same request in parallel" );
+ LOG.debug("==> We don't want to process different stages from the same request in parallel");
continue;
} else {
runningRequestIds.add(requestId);
@@ -430,25 +430,97 @@ class ActionScheduler implements Runnable {
}
/**
- * Returns filtered list of stages following the rule:
- * 1) remove stages that has the same host. Leave only first stage, the rest that have same host of any operation will be filtered
- * 2) do not remove stages intersected by host if they have intersection by background command
- * @param stages
- * @return
+ * Returns filtered list of stages such that the returned list is an ordered list of stages that may
+ * be executed in parallel or in the order in which they are presented
+ * <p/>
+ * Assumption: the list of stages supplied as input are ordered by request id and then stage id.
+ * <p/>
+ * Rules:
+ * <ul>
+ * <li>
+ * Stages are filtered such that the first stage in the list (assumed to be the first pending
+ * stage from the earliest active request) has priority
+ * </li>
+ * <li>
+ * No stage in any request may be executed before an earlier stage in the same request
+ * </li>
+ * <li>
+ * A stages in different requests may be performed in parallel if the relevant hosts for the
+ * stage in the later requests do not intersect with the union of hosts from (pending) stages
+ * in earlier requests
+ * </li>
+ * </ul>
+ *
+ * @param stages the stages to process
+ * @return a list of stages that may be executed in parallel
*/
private List<Stage> filterParallelPerHostStages(List<Stage> stages) {
List<Stage> retVal = new ArrayList<Stage>();
Set<String> affectedHosts = new HashSet<String>();
- for(Stage s : stages){
+ Set<Long> affectedRequests = new HashSet<Long>();
+
+ for (Stage s : stages) {
+ long requestId = s.getRequestId();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("==> Processing stage: {}/{} ({}) for {}", requestId, s.getStageId(), s.getRequestContext());
+ }
+
+ boolean addStage = true;
+
+ // Iterate over the relevant hosts for this stage to see if any intersect with the set of
+ // hosts needed for previous stages. If any intersection occurs, this stage may not be
+ // executed in parallel.
for (String host : s.getHosts()) {
- if (!affectedHosts.contains(host)) {
- if(!isStageHasBackgroundCommandsOnly(s, host)){
+ LOG.trace("===> Processing Host {}", host);
+
+ if (affectedHosts.contains(host)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("===> Skipping stage since it utilizes at least one host that a previous stage requires: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
+ }
+
+ addStage &= false;
+ } else {
+ if (!Stage.INTERNAL_HOSTNAME.equalsIgnoreCase(host) && !isStageHasBackgroundCommandsOnly(s, host)) {
+ LOG.trace("====> Adding host to affected hosts: {}", host);
affectedHosts.add(host);
}
- retVal.add(s);
+
+ addStage &= true;
}
}
+
+ // If this stage is for a request that we have already processed, the it cannot execute in
+ // parallel since only one stage per request my execute at a time. The first time we encounter
+ // a request id, will be for the first pending stage for that request, so it is a candidate
+ // for execution at this time - if the previous test for host intersection succeeds.
+ if (affectedRequests.contains(requestId)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("===> Skipping stage since the request it is in has been processed already: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
+ }
+
+ addStage = false;
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("====> Adding request to affected requests: {}", requestId);
+ }
+
+ affectedRequests.add(requestId);
+ addStage &= true;
+ }
+
+ // If both tests pass - the stage is the first pending stage in its request and the hosts
+ // required in the stage do not intersect with hosts from stages that should occur before this,
+ // than add it to the list of stages that may be executed in parallel.
+ if (addStage) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("===> Adding stage to return value: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
+ }
+
+ retVal.add(s);
+ }
}
+
return retVal;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/640d1523/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 8b2703c..ef50963 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -63,7 +63,7 @@ public class Stage {
* don't want stages getting confused with Ambari vs cluster hosts, so
* don't use {@link StageUtils#getHostName()}
*/
- private static final String INTERNAL_HOSTNAME = "_internal_ambari";
+ public static final String INTERNAL_HOSTNAME = "_internal_ambari";
private static Logger LOG = LoggerFactory.getLogger(Stage.class);
private final long requestId;
http://git-wip-us.apache.org/repos/asf/ambari/blob/640d1523/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index f8f9ce9..bc4d397 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -636,6 +636,79 @@ public class TestActionScheduler {
}
/**
+ * Test server actions in multiple requests.
+ *
+ * This is used to make sure the server-side actions do not get filtered out from
+ * {@link org.apache.ambari.server.actionmanager.ActionScheduler#filterParallelPerHostStages(java.util.List)}
+ */
+ @Test
+ public void testServerActionInMultipleRequests() throws Exception {
+ ActionQueue aq = new ActionQueue();
+ Clusters fsm = mock(Clusters.class);
+ Cluster oneClusterMock = mock(Cluster.class);
+ Service serviceObj = mock(Service.class);
+ ServiceComponent scomp = mock(ServiceComponent.class);
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
+ UnitOfWork unitOfWork = mock(UnitOfWork.class);
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+ String clusterName = "cluster1";
+ String hostname1 = "ahost.ambari.apache.org";
+ String hostname2 = "bhost.ambari.apache.org";
+ HashMap<String, ServiceComponentHost> hosts =
+ new HashMap<String, ServiceComponentHost>();
+ hosts.put(hostname1, sch);
+ hosts.put(hostname2, sch);
+ hosts.put(Stage.INTERNAL_HOSTNAME, sch);
+ when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+ List<Stage> stages = new ArrayList<Stage>();
+ Stage stage01 = createStage(clusterName, 0, 1);
+ addTask(stage01, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 1);
+
+ Stage stage11 = createStage("cluster1", 1, 1);
+ addTask(stage11, hostname1, clusterName, Role.KERBEROS_CLIENT, RoleCommand.CUSTOM_COMMAND, "KERBEROS", 2);
+
+ Stage stage02 = createStage("cluster1", 0, 2);
+ addTask(stage02, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 3);
+
+ Stage stage12 = createStage("cluster1", 1, 2);
+ addTask(stage12, hostname2, clusterName, Role.KERBEROS_CLIENT, RoleCommand.CUSTOM_COMMAND, "KERBEROS", 4);
+
+ stages.add(stage01);
+ stages.add(stage11);
+ stages.add(stage02);
+ stages.add(stage12);
+
+ ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+ RequestEntity request = mock(RequestEntity.class);
+ when(request.isExclusive()).thenReturn(false);
+ when(db.getRequestEntity(anyLong())).thenReturn(request);
+
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
+ when(db.getStagesInProgress()).thenReturn(stages);
+
+ Properties properties = new Properties();
+ properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
+ Configuration conf = new Configuration(properties);
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null),
+ unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf);
+
+ scheduler.doWork();
+
+ Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(Stage.INTERNAL_HOSTNAME, Role.AMBARI_SERVER_ACTION.name()));
+ Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, Role.KERBEROS_CLIENT.name()));
+ Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(Stage.INTERNAL_HOSTNAME, Role.AMBARI_SERVER_ACTION.name()));
+ Assert.assertEquals(HostRoleStatus.PENDING, stages.get(3).getHostRoleStatus(hostname2, Role.KERBEROS_CLIENT.name()));
+ }
+
+ /**
* Test server action
*/
@Test
@@ -1560,21 +1633,32 @@ public class TestActionScheduler {
return report;
}
- private Stage getStageWithSingleTask(String hostname, String clusterName, Role role,
- RoleCommand roleCommand, Service.Type service, int taskId,
- int stageId, int requestId) {
+ private Stage createStage(String clusterName, int stageId, int requestId) {
Stage stage = stageFactory.createNew(requestId, "/tmp", clusterName, 1L, "getStageWithSingleTask",
CLUSTER_HOST_INFO, "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
stage.setStageId(stageId);
+ return stage;
+ }
+
+ private Stage addTask(Stage stage, String hostname, String clusterName, Role role,
+ RoleCommand roleCommand, String serviceName, int taskId) {
stage.addHostRoleExecutionCommand(hostname, role, roleCommand,
new ServiceComponentHostUpgradeEvent(role.toString(), hostname, System.currentTimeMillis(), "HDP-0.2"),
- clusterName, service.toString(), false, false);
+ clusterName, serviceName, false, false);
stage.getExecutionCommandWrapper(hostname,
role.toString()).getExecutionCommand();
stage.getOrderedHostRoleCommands().get(0).setTaskId(taskId);
return stage;
}
+ private Stage getStageWithSingleTask(String hostname, String clusterName, Role role,
+ RoleCommand roleCommand, Service.Type service, int taskId,
+ int stageId, int requestId) {
+ Stage stage = createStage(clusterName, stageId, requestId);
+ return addTask(stage, hostname, clusterName, role, roleCommand, service.name(), taskId);
+ }
+
+
private Stage getStageWithSingleTask(String hostname, String clusterName, Role role, RoleCommand roleCommand,
String customCommandName, Service.Type service, int taskId, int stageId, int requestId) {
Stage stage = getStageWithSingleTask(hostname, clusterName, role, roleCommand, service, taskId, stageId, requestId);