You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2013/03/19 18:22:31 UTC
svn commit: r1458415 - in /incubator/ambari/trunk: ./
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/
Author: swagle
Date: Tue Mar 19 17:22:31 2013
New Revision: 1458415
URL: http://svn.apache.org/r1458415
Log:
AMBARI-1660. Server seems to ignore failures if the prior stage has failed before the next iteration of the scheduler. (Sumit Mohanty via swagle)
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1458415&r1=1458414&r2=1458415&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Tue Mar 19 17:22:31 2013
@@ -500,6 +500,9 @@ Trunk (unreleased changes):
BUG FIXES
+ AMBARI-1660. Server seems to ignore failures if the prior stage has failed
+ before the next iteration of the scheduler. (Sumit Mohanty via swagle)
+
AMBARI-1657. User directories on HDFS do not get created with custom names
provided from Ambari UI. (swagle)
Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java?rev=1458415&r1=1458414&r2=1458415&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java Tue Mar 19 17:22:31 2013
@@ -119,7 +119,7 @@ class ActionScheduler implements Runnabl
}
}
- private void doWork() throws AmbariException {
+ public void doWork() throws AmbariException {
List<Stage> stages = db.getStagesInProgress();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduler wakes up");
@@ -147,6 +147,12 @@ class ActionScheduler implements Runnabl
break;
}
}
+
+ if(!failed) {
+ // Prior stage may have failed and it may need to fail the whole request
+ failed = hasPreviousStageFailed(s);
+ }
+
if (failed) {
LOG.warn("Operation completely failed, aborting request id:"
+ s.getRequestId());
@@ -194,6 +200,53 @@ class ActionScheduler implements Runnabl
}
}
+ private boolean hasPreviousStageFailed(Stage stage) {
+ boolean failed = false;
+ long prevStageId = stage.getStageId() - 1;
+ if (prevStageId > 0) {
+ List<Stage> allStages = db.getAllStages(stage.getRequestId());
+ Stage prevStage = null;
+ for (Stage s : allStages) {
+ if (s.getStageId() == prevStageId) {
+ prevStage = s;
+ break;
+ }
+ }
+
+ //It may be null for test scenarios
+ if(prevStage != null) {
+ Map<Role, Integer> hostCountsForRoles = new HashMap<Role, Integer>();
+ Map<Role, Integer> failedHostCountsForRoles = new HashMap<Role, Integer>();
+
+ for (String host : prevStage.getHostRoleCommands().keySet()) {
+ Map<String, HostRoleCommand> roleCommandMap = prevStage.getHostRoleCommands().get(host);
+ for (String role : roleCommandMap.keySet()) {
+ HostRoleCommand c = roleCommandMap.get(role);
+ if (hostCountsForRoles.get(c.getRole()) == null) {
+ hostCountsForRoles.put(c.getRole(), 0);
+ failedHostCountsForRoles.put(c.getRole(), 0);
+ }
+ int hostCount = hostCountsForRoles.get(c.getRole());
+ hostCountsForRoles.put(c.getRole(), hostCount + 1);
+ if (c.getStatus().isFailedState()) {
+ int failedHostCount = failedHostCountsForRoles.get(c.getRole());
+ failedHostCountsForRoles.put(c.getRole(), failedHostCount + 1);
+ }
+ }
+ }
+
+ for (Role role : hostCountsForRoles.keySet()) {
+ float failedHosts = failedHostCountsForRoles.get(role);
+ float totalHosts = hostCountsForRoles.get(role);
+ if (((totalHosts - failedHosts) / totalHosts) < prevStage.getSuccessFactor(role)) {
+ failed = true;
+ }
+ }
+ }
+ }
+ return failed;
+ }
+
private void reportServerActionSuccess(Stage stage, ExecutionCommand cmd) {
CommandReport report = new CommandReport();
report.setStatus(HostRoleStatus.COMPLETED.toString());
Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java?rev=1458415&r1=1458414&r2=1458415&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java Tue Mar 19 17:22:31 2013
@@ -18,11 +18,32 @@
package org.apache.ambari.server.actionmanager;
public enum HostRoleStatus {
- PENDING, //Not queued for a host
- QUEUED, //Queued for a host
- IN_PROGRESS, //Host reported it is working
- COMPLETED, //Host reported success
- FAILED, //Failed
- TIMEDOUT, //Host did not respond in time
- ABORTED //Operation was abandoned
+ PENDING(0), //Not queued for a host
+ QUEUED(1), //Queued for a host
+ IN_PROGRESS(2), //Host reported it is working
+ COMPLETED(3), //Host reported success
+ FAILED(4), //Failed
+ TIMEDOUT(5), //Host did not respond in time
+ ABORTED(6); //Operation was abandoned
+ private final int status;
+
+ private HostRoleStatus(int status) {
+ this.status = status;
+ }
+
+ /**
+ * Indicates whether or not it is a valid failure state.
+ *
+ * @return true if this is a valid failure state.
+ */
+ public boolean isFailedState() {
+ switch (HostRoleStatus.values()[this.status]) {
+ case FAILED:
+ case TIMEDOUT:
+ case ABORTED:
+ return true;
+ default:
+ return false;
+ }
+ }
}
Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java?rev=1458415&r1=1458414&r2=1458415&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java Tue Mar 19 17:22:31 2013
@@ -36,6 +36,7 @@ import org.apache.ambari.server.RoleComm
import org.apache.ambari.server.actionmanager.ActionScheduler.RoleStats;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.AgentCommand;
+import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.serveraction.ServerAction;
@@ -231,6 +232,145 @@ public class TestActionScheduler {
return stage;
}
+ @Test
+ public void testRequestFailureOnStageFailure() throws Exception {
+ ActionQueue aq = new ActionQueue();
+ Clusters fsm = mock(Clusters.class);
+ Cluster oneClusterMock = mock(Cluster.class);
+ Service serviceObj = mock(Service.class);
+ ServiceComponent scomp = mock(ServiceComponent.class);
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+ ActionDBAccessor db = new ActionDBInMemoryImpl();
+ String hostname = "ahost.ambari.apache.org";
+ List<Stage> stages = new ArrayList<Stage>();
+ stages.add(
+ getStageWithSingleTask(
+ hostname, "cluster1", Role.NAMENODE, RoleCommand.UPGRADE, Service.Type.HDFS, 1, 1, 1));
+ stages.add(
+ getStageWithSingleTask(
+ hostname, "cluster1", Role.DATANODE, RoleCommand.UPGRADE, Service.Type.HDFS, 2, 2, 1));
+ db.persistActions(stages);
+
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), new ServerActionManagerImpl(fsm));
+ ActionManager am = new ActionManager(
+ 2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm));
+
+ scheduler.doWork();
+
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ reports.add(getCommandReport(HostRoleStatus.FAILED, Role.NAMENODE, Service.Type.HDFS, "1-1", 1));
+ am.processTaskResponse(hostname, reports);
+
+ scheduler.doWork();
+ Assert.assertEquals(HostRoleStatus.FAILED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
+ Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
+ }
+
+ @Test
+ public void testRequestFailureBasedOnSuccessFactor() throws Exception {
+ ActionQueue aq = new ActionQueue();
+ Clusters fsm = mock(Clusters.class);
+ Cluster oneClusterMock = mock(Cluster.class);
+ Service serviceObj = mock(Service.class);
+ ServiceComponent scomp = mock(ServiceComponent.class);
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+ ActionDBAccessor db = new ActionDBInMemoryImpl();
+ List<Stage> stages = new ArrayList<Stage>();
+
+ long now = System.currentTimeMillis();
+ Stage stage = new Stage(1, "/tmp", "cluster1");
+ stage.setStageId(1);
+ stage.addHostRoleExecutionCommand("host1", Role.DATANODE, RoleCommand.UPGRADE,
+ new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(), "host1", now, "HDP-0.2"),
+ "cluster1", Service.Type.HDFS.toString());
+ stage.getExecutionCommandWrapper("host1",
+ Role.DATANODE.toString()).getExecutionCommand();
+
+ stage.addHostRoleExecutionCommand("host2", Role.DATANODE, RoleCommand.UPGRADE,
+ new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(), "host2", now, "HDP-0.2"),
+ "cluster1", Service.Type.HDFS.toString());
+ stage.getExecutionCommandWrapper("host2",
+ Role.DATANODE.toString()).getExecutionCommand();
+
+ stage.addHostRoleExecutionCommand("host3", Role.DATANODE, RoleCommand.UPGRADE,
+ new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(), "host3", now, "HDP-0.2"),
+ "cluster1", Service.Type.HDFS.toString());
+ stage.getExecutionCommandWrapper("host3",
+ Role.DATANODE.toString()).getExecutionCommand();
+ stages.add(stage);
+
+ stage.getOrderedHostRoleCommands().get(0).setTaskId(1);
+ stage.getOrderedHostRoleCommands().get(1).setTaskId(2);
+ stage.getOrderedHostRoleCommands().get(2).setTaskId(3);
+
+ stages.add(
+ getStageWithSingleTask(
+ "host1", "cluster1", Role.HDFS_CLIENT, RoleCommand.UPGRADE, Service.Type.HDFS, 4, 2, 1));
+ db.persistActions(stages);
+
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), new ServerActionManagerImpl(fsm));
+ ActionManager am = new ActionManager(
+ 2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm));
+
+ scheduler.doWork();
+
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ reports.add(getCommandReport(HostRoleStatus.FAILED, Role.DATANODE, Service.Type.HDFS, "1-1", 1));
+ am.processTaskResponse("host1", reports);
+
+ reports.clear();
+ reports.add(getCommandReport(HostRoleStatus.FAILED, Role.DATANODE, Service.Type.HDFS, "1-1", 2));
+ am.processTaskResponse("host2", reports);
+
+ reports.clear();
+ reports.add(getCommandReport(HostRoleStatus.COMPLETED, Role.DATANODE, Service.Type.HDFS, "1-1", 3));
+ am.processTaskResponse("host3", reports);
+
+ scheduler.doWork();
+ Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus("host1", "HDFS_CLIENT"));
+ }
+
+ private CommandReport getCommandReport(HostRoleStatus status, Role role, Service.Type service, String actionId,
+ int taskId) {
+ CommandReport report = new CommandReport();
+ report.setExitCode(999);
+ report.setStdErr("");
+ report.setStdOut("");
+ report.setStatus(status.toString());
+ report.setRole(role.toString());
+ report.setServiceName(service.toString());
+ report.setActionId(actionId);
+ report.setTaskId(taskId);
+ return report;
+ }
+
+ private Stage getStageWithSingleTask(String hostname, String clusterName, Role role,
+ RoleCommand roleCommand, Service.Type service, int taskId,
+ int stageId, int requestId) {
+ Stage stage = new Stage(requestId, "/tmp", clusterName);
+ stage.setStageId(stageId);
+ stage.addHostRoleExecutionCommand(hostname, role, roleCommand,
+ new ServiceComponentHostUpgradeEvent(role.toString(), hostname, System.currentTimeMillis(), "HDP-0.2"),
+ clusterName, service.toString());
+ stage.getExecutionCommandWrapper(hostname,
+ role.toString()).getExecutionCommand();
+ stage.getOrderedHostRoleCommands().get(0).setTaskId(taskId);
+ return stage;
+ }
@Test
public void testSuccessFactors() {