You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2014/01/15 16:12:51 UTC
git commit: AMBARI-4290. Run Various Requests in Parallel so that one
does not wait on the other. (dlysnichenko)
Updated Branches:
refs/heads/trunk 1763922ec -> af56baa0b
AMBARI-4290. Run Various Requests in Parallel so that one does not wait on the other. (dlysnichenko)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/af56baa0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/af56baa0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/af56baa0
Branch: refs/heads/trunk
Commit: af56baa0b497869398726ae6b6c78b8707072d95
Parents: 1763922
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Wed Jan 15 17:07:32 2014 +0200
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Wed Jan 15 17:07:32 2014 +0200
----------------------------------------------------------------------
ambari-server/conf/unix/ambari.properties | 1 +
.../server/actionmanager/ActionManager.java | 5 +-
.../server/actionmanager/ActionScheduler.java | 123 ++++++++----
.../server/configuration/Configuration.java | 17 ++
.../actionmanager/TestActionDBAccessorImpl.java | 3 +-
.../server/actionmanager/TestActionManager.java | 9 +-
.../actionmanager/TestActionScheduler.java | 195 +++++++++++++++++--
.../server/agent/TestHeartbeatHandler.java | 4 +-
8 files changed, 302 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/conf/unix/ambari.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/unix/ambari.properties b/ambari-server/conf/unix/ambari.properties
index ebb5d9c..563c04f 100644
--- a/ambari-server/conf/unix/ambari.properties
+++ b/ambari-server/conf/unix/ambari.properties
@@ -31,6 +31,7 @@ bootstrap.setup_agent.script=/usr/lib/python2.6/site-packages/ambari_server/setu
api.authenticate=true
server.connection.max.idle.millis=900000
server.fqdn.service.url=http://169.254.169.254/latest/meta-data/public-hostname
+server.stages.parallel=true
# Scheduler settings
server.execution.scheduler.isClustered=false
http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index af25149..8e6fb13 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -24,6 +24,7 @@ import com.google.inject.persist.UnitOfWork;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.ExecuteActionRequest;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.serveraction.ServerActionManager;
@@ -58,11 +59,11 @@ public class ActionManager {
@Named("actionTimeout") long actionTimeout,
ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap,
ServerActionManager serverActionManager, UnitOfWork unitOfWork, CustomActionDBAccessor cdb,
- RequestFactory requestFactory) {
+ RequestFactory requestFactory, Configuration configuration) {
this.actionQueue = aq;
this.db = db;
scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
- actionQueue, fsm, 2, hostsMap, serverActionManager, unitOfWork);
+ actionQueue, fsm, 2, hostsMap, serverActionManager, unitOfWork, configuration);
requestCounter = new AtomicLong(
db.getLastPersistedRequestIdWhenInitialized());
this.cdb = cdb;
http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 02c94b8..47f56eb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.actionmanager;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,6 +34,7 @@ import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.serveraction.ServerAction;
import org.apache.ambari.server.serveraction.ServerActionManager;
@@ -75,6 +77,7 @@ class ActionScheduler implements Runnable {
private final HostsMap hostsMap;
private final Object wakeupSyncObject = new Object();
private final ServerActionManager serverActionManager;
+ private final Configuration configuration;
/**
* true if scheduler should run ASAP.
@@ -87,7 +90,8 @@ class ActionScheduler implements Runnable {
public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
- int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager, UnitOfWork unitOfWork) {
+ int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager,
+ UnitOfWork unitOfWork, Configuration configuration) {
this.sleepTime = sleepTimeMilliSec;
this.hostsMap = hostsMap;
this.actionTimeout = actionTimeoutMilliSec;
@@ -100,6 +104,7 @@ class ActionScheduler implements Runnable {
this.clusterHostInfoCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
+ this.configuration = configuration;
}
public void start() {
@@ -149,7 +154,8 @@ class ActionScheduler implements Runnable {
public void doWork() throws AmbariException {
try {
unitOfWork.begin();
-
+ Set<String> runningRequestIds = new HashSet<String>();
+ Set<String> affectedHosts = new HashSet<String>();
List<Stage> stages = db.getStagesInProgress();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduler wakes up");
@@ -163,9 +169,37 @@ class ActionScheduler implements Runnable {
}
for (Stage s : stages) {
+ // Check if we can process this stage in parallel with another stages
+
+ long requestId = s.getRequestId();
+ // Convert to string to avoid glitches with boxing/unboxing
+ String requestIdStr = String.valueOf(requestId);
+ if (runningRequestIds.contains(requestIdStr)) {
+ // We don't want to process different stages from the same request in parallel
+ continue;
+ } else {
+ runningRequestIds.add(requestIdStr);
+ }
+
+
+ List<String> stageHosts = s.getHosts();
+ boolean conflict = false;
+ for (String host : stageHosts) {
+ if (affectedHosts.contains(host)) {
+ conflict = true;
+ break;
+ }
+ }
+ if (conflict) {
+ // Also we don't want to perform stages in parallel at the same hosts
+ continue;
+ } else {
+ affectedHosts.addAll(stageHosts);
+ }
+
List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
Map<String, RoleStats> roleStats = processInProgressStage(s, commandsToSchedule);
- //Check if stage is failed
+ // Check if stage is failed
boolean failed = false;
for (String role : roleStats.keySet()) {
RoleStats stats = roleStats.get(role);
@@ -193,23 +227,14 @@ class ActionScheduler implements Runnable {
//Schedule what we have so far
for (ExecutionCommand cmd : commandsToSchedule) {
if (Role.valueOf(cmd.getRole()).equals(Role.AMBARI_SERVER_ACTION)) {
- try {
- long now = System.currentTimeMillis();
- String hostName = cmd.getHostname();
- String roleName = cmd.getRole();
-
- s.setStartTime(hostName, roleName, now);
- s.setLastAttemptTime(hostName, roleName, now);
- s.incrementAttemptCount(hostName, roleName);
- s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED);
- db.hostRoleScheduled(s, hostName, roleName);
- String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME);
- this.serverActionManager.executeAction(actionName, cmd.getCommandParams());
- reportServerActionSuccess(s, cmd);
- } catch (AmbariException e) {
- LOG.warn("Could not execute server action " + cmd.toString(), e);
- reportServerActionFailure(s, cmd, e.getMessage());
- }
+ /**
+ * We don't forbid executing any stages in parallel with
+ * AMBARI_SERVER_ACTION. That should be OK as AMBARI_SERVER_ACTION
+ * is not used as of now. The general motivation has been to update
+ * Request status when last task associated with the
+ * Request is finished.
+ */
+ executeServerAction(s, cmd);
} else {
try {
scheduleHostRole(s, cmd);
@@ -220,16 +245,7 @@ class ActionScheduler implements Runnable {
}
}
- //Check if ready to go to next stage
- boolean goToNextStage = true;
- for (String role : roleStats.keySet()) {
- RoleStats stats = roleStats.get(role);
- if (!stats.isSuccessFactorMet()) {
- goToNextStage = false;
- break;
- }
- }
- if (!goToNextStage) {
+ if (! configuration.getParallelStageExecution()) { // If disabled
return;
}
}
@@ -239,10 +255,36 @@ class ActionScheduler implements Runnable {
}
}
+
+ /**
+ * Executes internal ambari-server action
+ */
+ private void executeServerAction(Stage s, ExecutionCommand cmd) {
+ try {
+ long now = System.currentTimeMillis();
+ String hostName = cmd.getHostname();
+ String roleName = cmd.getRole();
+
+ s.setStartTime(hostName, roleName, now);
+ s.setLastAttemptTime(hostName, roleName, now);
+ s.incrementAttemptCount(hostName, roleName);
+ s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED);
+ db.hostRoleScheduled(s, hostName, roleName);
+ String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME);
+ this.serverActionManager.executeAction(actionName, cmd.getCommandParams());
+ reportServerActionSuccess(s, cmd);
+
+ } catch (AmbariException e) {
+ LOG.warn("Could not execute server action " + cmd.toString(), e);
+ reportServerActionFailure(s, cmd, e.getMessage());
+ }
+ }
+
private boolean hasPreviousStageFailed(Stage stage) {
boolean failed = false;
long prevStageId = stage.getStageId() - 1;
if (prevStageId > 0) {
+ // Find previous stage instance
List<Stage> allStages = db.getAllStages(stage.getRequestId());
Stage prevStage = null;
for (Stage s : allStages) {
@@ -293,7 +335,7 @@ class ActionScheduler implements Runnable {
report.setStdOut("Server action succeeded");
report.setStdErr("");
db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
- cmd.getRole().toString(), report);
+ cmd.getRole(), report);
}
private void reportServerActionFailure(Stage stage, ExecutionCommand cmd, String message) {
@@ -303,13 +345,15 @@ class ActionScheduler implements Runnable {
report.setStdOut("Server action failed");
report.setStdErr(message);
db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
- cmd.getRole().toString(), report);
+ cmd.getRole(), report);
}
/**
- * @param commandsToSchedule
* @return Stats for the roles in the stage. It is used to determine whether stage
* has succeeded or failed.
+ * Side effects:
+ * This method processes command timeouts and retry attempts, and
+ * adds new (pending) execution commands to commandsToSchedule list.
*/
private Map<String, RoleStats> processInProgressStage(Stage s,
List<ExecutionCommand> commandsToSchedule) throws AmbariException {
@@ -328,6 +372,8 @@ class ActionScheduler implements Runnable {
ExecutionCommand c = wrapper.getExecutionCommand();
String roleStr = c.getRole();
HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
+
+ // Process command timeouts
if (timeOutActionNeeded(status, s, hostObj, roleStr, now,
taskTimeout)) {
LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:"
@@ -361,6 +407,7 @@ class ActionScheduler implements Runnable {
// Dequeue command
actionQueue.dequeue(host, c.getCommandId());
} else {
+ // reschedule command
commandsToSchedule.add(c);
}
} else if (status.equals(HostRoleStatus.PENDING)) {
@@ -373,8 +420,14 @@ class ActionScheduler implements Runnable {
return roleStats;
}
+
+ /**
+ * Populates a map < role_name, role_stats>.
+ */
private Map<String, RoleStats> initRoleStats(Stage s) {
+ // Meaning: how many hosts are affected by commands for each role
Map<Role, Integer> hostCountsForRoles = new HashMap<Role, Integer>();
+ // < role_name, rolestats >
Map<String, RoleStats> roleStats = new TreeMap<String, RoleStats>();
for (String host : s.getHostRoleCommands().keySet()) {
@@ -419,8 +472,10 @@ class ActionScheduler implements Runnable {
private void scheduleHostRole(Stage s, ExecutionCommand cmd)
throws InvalidStateTransitionException, AmbariException {
long now = System.currentTimeMillis();
- String roleStr = cmd.getRole().toString();
+ String roleStr = cmd.getRole();
String hostname = cmd.getHostname();
+
+ // start time is -1 if host role command is not started yet
if (s.getStartTime(hostname, roleStr) < 0) {
if (RoleCommand.ACTIONEXECUTE != cmd.getRoleCommand()) {
try {
http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 6b2301d..6af54c0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -279,6 +279,16 @@ public class Configuration {
public static final String DEFAULT_EXECUTION_SCHEDULER_MISFIRE_TOLERATION = "480";
public static final String DEFAULT_SCHEDULER_START_DELAY_SECONDS = "120";
+
+ /**
+ * This key defines whether stages of parallel requests are executed in
+ * parallel or sequentally. Only stages from different requests
+ * running on not interfering host sets may be executed in parallel.
+ */
+ public static final String PARALLEL_STAGE_EXECUTION_KEY =
+ "server.stages.parallel";
+ private static final String PARALLEL_STAGE_EXECUTION_DEFAULT = "true";
+
private static final Logger LOG = LoggerFactory.getLogger(
Configuration.class);
@@ -345,6 +355,8 @@ public class Configuration {
CLIENT_API_SSL_CRT_NAME_KEY, CLIENT_API_SSL_CRT_NAME_DEFAULT));
configsMap.put(JAVA_HOME_KEY, properties.getProperty(
JAVA_HOME_KEY));
+ configsMap.put(PARALLEL_STAGE_EXECUTION_KEY, properties.getProperty(
+ PARALLEL_STAGE_EXECUTION_KEY, PARALLEL_STAGE_EXECUTION_DEFAULT));
File passFile = new File(configsMap.get(SRVR_KSTR_DIR_KEY) + File.separator
+ configsMap.get(SRVR_CRT_PASS_FILE_KEY));
@@ -909,4 +921,9 @@ public class Configuration {
DEFAULT_SCHEDULER_START_DELAY_SECONDS);
return Integer.parseInt(delay);
}
+
+ public boolean getParallelStageExecution() {
+ return "true".equalsIgnoreCase(configsMap.get(PARALLEL_STAGE_EXECUTION_KEY));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index 6f8c884..c09c8bb 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -83,7 +83,8 @@ public class TestActionDBAccessorImpl {
cdb = injector.getInstance(CustomActionDBAccessor.class);
am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
- new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class), cdb, injector.getInstance(RequestFactory.class));
+ new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class), cdb,
+ injector.getInstance(RequestFactory.class), null);
}
@After
http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
index a266cc6..648e935 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
@@ -84,7 +84,8 @@ public class TestActionManager {
public void testActionResponse() {
ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
- clusters, db, new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class));
+ clusters, db, new HostsMap((String) null), null, unitOfWork, null,
+ injector.getInstance(RequestFactory.class), null);
populateActionDB(db, hostname);
Stage stage = db.getAllStages(requestId).get(0);
Assert.assertEquals(stageId, stage.getStageId());
@@ -124,7 +125,8 @@ public class TestActionManager {
public void testLargeLogs() {
ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
- clusters, db, new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class));
+ clusters, db, new HostsMap((String) null), null, unitOfWork, null,
+ injector.getInstance(RequestFactory.class), null);
populateActionDB(db, hostname);
Stage stage = db.getAllStages(requestId).get(0);
Assert.assertEquals(stageId, stage.getStageId());
@@ -213,7 +215,8 @@ public class TestActionManager {
replay(queue, db, clusters);
- ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, null, unitOfWork, null, injector.getInstance(RequestFactory.class));
+ ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, null, unitOfWork, null,
+ injector.getInstance(RequestFactory.class), null);
assertSame(listStages, manager.getActions(requestId));
verify(queue, db, clusters);
http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index bce150a..5925fbb 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -33,6 +33,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import com.google.common.reflect.TypeToken;
@@ -47,6 +48,7 @@ import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.AgentCommand;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.serveraction.ServerAction;
import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
@@ -88,6 +90,8 @@ public class TestActionScheduler {
Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
ActionQueue aq = new ActionQueue();
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
@@ -114,7 +118,7 @@ public class TestActionScheduler {
//Keep large number of attempts so that the task is not expired finally
//Small action timeout to test rescheduling
ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
- 10000, new HostsMap((String) null), null, unitOfWork);
+ 10000, new HostsMap((String) null), null, unitOfWork, conf);
scheduler.setTaskTimeoutAdjustment(false);
// Start the thread
scheduler.start();
@@ -161,6 +165,8 @@ public class TestActionScheduler {
@Test
public void testActionTimeout() throws Exception {
ActionQueue aq = new ActionQueue();
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
@@ -198,7 +204,7 @@ public class TestActionScheduler {
//Small action timeout to test rescheduling
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), null, unitOfWork);
+ new HostsMap((String) null), null, unitOfWork, conf);
scheduler.setTaskTimeoutAdjustment(false);
// Start the thread
scheduler.start();
@@ -216,6 +222,8 @@ public class TestActionScheduler {
@Test
public void testActionTimeoutForLostHost() throws Exception {
ActionQueue aq = new ActionQueue();
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
@@ -253,7 +261,7 @@ public class TestActionScheduler {
//Small action timeout to test rescheduling
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), null, unitOfWork);
+ new HostsMap((String) null), null, unitOfWork, conf);
scheduler.setTaskTimeoutAdjustment(false);
// Start the thread
scheduler.start();
@@ -274,6 +282,8 @@ public class TestActionScheduler {
@Test
public void testServerAction() throws Exception {
ActionQueue aq = new ActionQueue();
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
@@ -310,7 +320,8 @@ public class TestActionScheduler {
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork);
+ new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+ unitOfWork, conf);
scheduler.start();
while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
@@ -328,6 +339,8 @@ public class TestActionScheduler {
@Test
public void testServerActionFailed() throws Exception {
ActionQueue aq = new ActionQueue();
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
@@ -363,7 +376,7 @@ public class TestActionScheduler {
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork);
+ new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, conf);
scheduler.start();
while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
@@ -392,6 +405,152 @@ public class TestActionScheduler {
return stage;
}
+
+ /**
+ * Verifies that stages that are executed on different hosts and
+ * rely to different requests are scheduled to be executed in parallel
+ */
+ @Test
+ public void testIndependentStagesExecution() throws Exception {
+ ActionQueue aq = new ActionQueue();
+ Clusters fsm = mock(Clusters.class);
+ Cluster oneClusterMock = mock(Cluster.class);
+ Service serviceObj = mock(Service.class);
+ ServiceComponent scomp = mock(ServiceComponent.class);
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
+ UnitOfWork unitOfWork = mock(UnitOfWork.class);
+ RequestFactory requestFactory = mock(RequestFactory.class);
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+ String hostname1 = "ahost.ambari.apache.org";
+ String hostname2 = "bhost.ambari.apache.org";
+ String hostname3 = "chost.ambari.apache.org";
+ String hostname4 = "chost.ambari.apache.org";
+ List<Stage> stages = new ArrayList<Stage>();
+ stages.add(
+ getStageWithSingleTask(
+ hostname1, "cluster1", Role.DATANODE,
+ RoleCommand.START, Service.Type.HDFS, 1, 1, 1));
+ stages.add( // Stage with the same hostname, should not be scheduled
+ getStageWithSingleTask(
+ hostname1, "cluster1", Role.GANGLIA_MONITOR,
+ RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
+
+ stages.add(
+ getStageWithSingleTask(
+ hostname2, "cluster1", Role.DATANODE,
+ RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
+
+ stages.add(
+ getStageWithSingleTask(
+ hostname3, "cluster1", Role.DATANODE,
+ RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
+
+ stages.add( // Stage with the same request id, should not be scheduled
+ getStageWithSingleTask(
+ hostname4, "cluster1", Role.GANGLIA_MONITOR,
+ RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
+
+ ActionDBAccessor db = mock(ActionDBAccessor.class);
+ when(db.getStagesInProgress()).thenReturn(stages);
+
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+ unitOfWork, conf);
+
+ ActionManager am = new ActionManager(
+ 2, 2, aq, fsm, db, new HostsMap((String) null),
+ new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory, conf);
+
+ scheduler.doWork();
+
+ Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
+ Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, "GANGLIA_MONITOR"));
+ Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
+ Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(3).getHostRoleStatus(hostname3, "DATANODE"));
+ Assert.assertEquals(HostRoleStatus.PENDING, stages.get(4).getHostRoleStatus(hostname4, "GANGLIA_MONITOR"));
+ }
+
+
+ /**
+ * Verifies that ActionScheduler respects "disable parallel stage execution option"
+ */
+ @Test
+ public void testIndependentStagesExecutionDisabled() throws Exception {
+ ActionQueue aq = new ActionQueue();
+ Clusters fsm = mock(Clusters.class);
+ Cluster oneClusterMock = mock(Cluster.class);
+ Service serviceObj = mock(Service.class);
+ ServiceComponent scomp = mock(ServiceComponent.class);
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
+ UnitOfWork unitOfWork = mock(UnitOfWork.class);
+ RequestFactory requestFactory = mock(RequestFactory.class);
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+ String hostname1 = "ahost.ambari.apache.org";
+ String hostname2 = "bhost.ambari.apache.org";
+ String hostname3 = "chost.ambari.apache.org";
+ String hostname4 = "chost.ambari.apache.org";
+ List<Stage> stages = new ArrayList<Stage>();
+ stages.add(
+ getStageWithSingleTask(
+ hostname1, "cluster1", Role.DATANODE,
+ RoleCommand.START, Service.Type.HDFS, 1, 1, 1));
+ stages.add( // Stage with the same hostname, should not be scheduled
+ getStageWithSingleTask(
+ hostname1, "cluster1", Role.GANGLIA_MONITOR,
+ RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
+
+ stages.add(
+ getStageWithSingleTask(
+ hostname2, "cluster1", Role.DATANODE,
+ RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
+
+ stages.add(
+ getStageWithSingleTask(
+ hostname3, "cluster1", Role.DATANODE,
+ RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
+
+ stages.add( // Stage with the same request id, should not be scheduled
+ getStageWithSingleTask(
+ hostname4, "cluster1", Role.GANGLIA_MONITOR,
+ RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
+
+ ActionDBAccessor db = mock(ActionDBAccessor.class);
+ when(db.getStagesInProgress()).thenReturn(stages);
+
+ Properties properties = new Properties();
+ properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
+ Configuration conf = new Configuration(properties);
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+ unitOfWork, conf);
+
+ ActionManager am = new ActionManager(
+ 2, 2, aq, fsm, db, new HostsMap((String) null),
+ new ServerActionManagerImpl(fsm), unitOfWork, null,
+ requestFactory, conf);
+
+ scheduler.doWork();
+
+ Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
+ Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, "GANGLIA_MONITOR"));
+ Assert.assertEquals(HostRoleStatus.PENDING, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
+ Assert.assertEquals(HostRoleStatus.PENDING, stages.get(3).getHostRoleStatus(hostname3, "DATANODE"));
+ Assert.assertEquals(HostRoleStatus.PENDING, stages.get(4).getHostRoleStatus(hostname4, "GANGLIA_MONITOR"));
+ }
+
+
@Test
public void testRequestFailureOnStageFailure() throws Exception {
ActionQueue aq = new ActionQueue();
@@ -478,11 +637,13 @@ public class TestActionScheduler {
}
}).when(db).abortOperation(anyLong());
-
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork);
+ new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+ unitOfWork, conf);
ActionManager am = new ActionManager(
- 2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory);
+ 2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory, conf);
scheduler.doWork();
@@ -623,10 +784,13 @@ public class TestActionScheduler {
}
}).when(db).abortOperation(anyLong());
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
- new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork);
+ new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+ unitOfWork, conf);
ActionManager am = new ActionManager(
- 2, 10000, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory);
+ 2, 10000, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory, conf);
scheduler.doWork();
@@ -792,10 +956,13 @@ public class TestActionScheduler {
}
}).when(db).abortOperation(anyLong());
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork);
+ new HostsMap((String) null),
+ new ServerActionManagerImpl(fsm), unitOfWork, conf);
ActionManager am = new ActionManager(
- 2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory);
+ 2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, null, requestFactory, conf);
scheduler.doWork();
@@ -893,6 +1060,8 @@ public class TestActionScheduler {
int requestId2 = 2;
ActionQueue aq = new ActionQueue();
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
@@ -919,7 +1088,7 @@ public class TestActionScheduler {
//Keep large number of attempts so that the task is not expired finally
//Small action timeout to test rescheduling
ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
- 10000, new HostsMap((String) null), null, unitOfWork);
+ 10000, new HostsMap((String) null), null, unitOfWork, conf);
scheduler.setTaskTimeoutAdjustment(false);
// Start the thread
scheduler.start();
http://git-wip-us.apache.org/repos/asf/ambari/blob/af56baa0/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index 4e716dc..7b07062 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -413,7 +413,7 @@ public class TestHeartbeatHandler {
clusters.addCluster(DummyCluster);
ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
- new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class));
+ new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class), null);
populateActionDB(db, DummyHostname1);
Stage stage = db.getAllStages(requestId).get(0);
Assert.assertEquals(stageId, stage.getStageId());
@@ -1511,7 +1511,7 @@ public class TestHeartbeatHandler {
private ActionManager getMockActionManager() {
return new ActionManager(0, 0, null, null,
- actionDBAccessor, new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class));
+ actionDBAccessor, new HostsMap((String) null), null, unitOfWork, null, injector.getInstance(RequestFactory.class), null);
}