You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/02/12 03:48:17 UTC
git commit: AMBARI-4621. Host Components stuck in Installing state.
(swagle)
Updated Branches:
refs/heads/trunk d0b03b5f5 -> 491313662
AMBARI-4621. Host Components stuck in Installing state. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/49131366
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/49131366
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/49131366
Branch: refs/heads/trunk
Commit: 49131366201e701efaa96af3882fe770fb03dd74
Parents: d0b03b5
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Feb 11 17:08:55 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Feb 11 18:48:12 2014 -0800
----------------------------------------------------------------------
.../server/actionmanager/ActionScheduler.java | 91 ++++++++---
.../svccomphost/ServiceComponentHostImpl.java | 9 ++
.../actionmanager/TestActionScheduler.java | 162 ++++++++++++++++++-
3 files changed, 235 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/49131366/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 601930d..f5f16d7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -228,7 +228,7 @@ class ActionScheduler implements Runnable {
if (failed) {
LOG.warn("Operation completely failed, aborting request id:"
+ s.getRequestId());
- db.abortOperation(s.getRequestId());
+ abortOperationsForStage(s);
return;
}
@@ -394,26 +394,10 @@ class ActionScheduler implements Runnable {
db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole());
//Reinitialize status
status = s.getHostRoleStatus(host, roleStr);
- ServiceComponentHostOpFailedEvent timeoutEvent =
- new ServiceComponentHostOpFailedEvent(roleStr,
- host, now);
- try {
- Service svc = cluster.getService(c.getServiceName());
- ServiceComponent svcComp = svc.getServiceComponent(
- roleStr);
- ServiceComponentHost svcCompHost =
- svcComp.getServiceComponentHost(host);
- svcCompHost.handleEvent(timeoutEvent);
- LOG.warn("Operation timed out. Role: " + roleStr + ", host: " + host);
- } catch (ServiceComponentNotFoundException scnex) {
- LOG.debug(roleStr + " associated with service " + c.getServiceName() +
- " is not a service component, assuming it's an action.");
- } catch (InvalidStateTransitionException e) {
- LOG.info("Transition failed for host: " + host + ", role: "
- + roleStr, e);
- } catch (AmbariException ex) {
- LOG.warn("Invalid live state", ex);
- }
+ transitionToFailedState(cluster.getClusterName(),
+ c.getServiceName(), roleStr, host, now, false);
+ LOG.warn("Operation timed out. Role: " + roleStr + ", host: " + host);
+
// Dequeue command
actionQueue.dequeue(host, c.getCommandId());
} else {
@@ -430,6 +414,67 @@ class ActionScheduler implements Runnable {
return roleStats;
}
+ /**
+ * Generate a OPFailed event before aborting all operations in the stage
+ * @param stage
+ */
+ private void abortOperationsForStage(Stage stage) {
+ long now = System.currentTimeMillis();
+
+ for (String hostName : stage.getHosts()) {
+ List<ExecutionCommandWrapper> commandWrappers =
+ stage.getExecutionCommands(hostName);
+
+ for(ExecutionCommandWrapper wrapper : commandWrappers) {
+ ExecutionCommand c = wrapper.getExecutionCommand();
+ transitionToFailedState(stage.getClusterName(), c.getServiceName(),
+ c.getRole(), hostName, now, true);
+ }
+ }
+
+ db.abortOperation(stage.getRequestId());
+ }
+
+ /**
+ * Raise a OPFailed event for a SCH
+ * @param clusterName
+ * @param serviceName
+ * @param componentName
+ * @param hostname
+ * @param timestamp
+ */
+ private void transitionToFailedState(String clusterName, String serviceName,
+ String componentName, String hostname,
+ long timestamp,
+ boolean ignoreTransitionException) {
+
+ try {
+ Cluster cluster = fsmObject.getCluster(clusterName);
+
+ ServiceComponentHostOpFailedEvent timeoutEvent =
+ new ServiceComponentHostOpFailedEvent(componentName,
+ hostname, timestamp);
+
+ Service svc = cluster.getService(serviceName);
+ ServiceComponent svcComp = svc.getServiceComponent(componentName);
+ ServiceComponentHost svcCompHost =
+ svcComp.getServiceComponentHost(hostname);
+ svcCompHost.handleEvent(timeoutEvent);
+
+ } catch (ServiceComponentNotFoundException scnex) {
+ LOG.debug(componentName + " associated with service " + serviceName +
+ " is not a service component, assuming it's an action.");
+ } catch (InvalidStateTransitionException e) {
+ if (ignoreTransitionException) {
+ LOG.debug("Unable to transition to failed state.", e);
+ } else {
+ LOG.warn("Unable to transition to failed state.", e);
+ }
+ } catch (AmbariException e) {
+ LOG.warn("Unable to transition to failed state.", e);
+ }
+ }
+
/**
* Populates a map < role_name, role_stats>.
@@ -472,8 +517,8 @@ class ActionScheduler implements Runnable {
LOG.debug("Timing out action since agent is not heartbeating.");
return true;
}
- if (currentTime > stage.getLastAttemptTime(host.getHostName(),
- role) + taskTimeout) {
+ if (currentTime > stage.getLastAttemptTime(host.getHostName(), role)
+ + taskTimeout) {
return true;
}
return false;
http://git-wip-us.apache.org/repos/asf/ambari/blob/49131366/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index b90027b..94ae38a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -156,6 +156,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,
new ServiceComponentHostOpStartedTransition())
+ // Allow transition on abort
+ .addTransition(State.INSTALL_FAILED, State.INSTALL_FAILED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_OP_FAILED,
+ new ServiceComponentHostOpCompletedTransition())
+
.addTransition(State.INSTALLED,
State.STARTING,
ServiceComponentHostEventType.HOST_SVCCOMP_START,
@@ -387,6 +392,10 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
State.INSTALLING,
ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,
new ServiceComponentHostOpStartedTransition())
+ // Allow transition on abort
+ .addTransition(State.INSTALL_FAILED, State.INSTALL_FAILED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_OP_FAILED,
+ new ServiceComponentHostOpCompletedTransition())
.addTransition(State.INSTALLED,
State.INSTALLED,
http://git-wip-us.apache.org/repos/asf/ambari/blob/49131366/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index de6c98b..063213c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -24,7 +24,6 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
-
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collections;
@@ -33,12 +32,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-
import com.google.common.reflect.TypeToken;
import com.google.inject.persist.UnitOfWork;
-
import junit.framework.Assert;
-
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.actionmanager.ActionScheduler.RoleStats;
@@ -57,10 +53,15 @@ import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.ServiceComponentHostEventType;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
import org.apache.ambari.server.utils.StageUtils;
+import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -277,6 +278,141 @@ public class TestActionScheduler {
scheduler.stop();
}
+ @Test
+ public void testOpFailedEventRaisedForAbortedHostRole() throws Exception {
+ ActionQueue aq = new ActionQueue();
+ Properties properties = new Properties();
+ Configuration conf = new Configuration(properties);
+ Clusters fsm = mock(Clusters.class);
+ Cluster oneClusterMock = mock(Cluster.class);
+ Service serviceObj = mock(Service.class);
+ ServiceComponent scomp = mock(ServiceComponent.class);
+ ServiceComponentHost sch1 = mock(ServiceComponentHost.class);
+ ServiceComponentHost sch2 = mock(ServiceComponentHost.class);
+ String hostname1 = "host1";
+ String hostname2 = "host2";
+ Host host1 = mock(Host.class);
+ Host host2 = mock(Host.class);
+ when(fsm.getHost(hostname1)).thenReturn(host1);
+ when(fsm.getHost(hostname2)).thenReturn(host2);
+ when(host1.getState()).thenReturn(HostState.HEARTBEAT_LOST);
+ when(host2.getState()).thenReturn(HostState.HEALTHY);
+ when(host1.getHostName()).thenReturn(hostname1);
+ when(host2.getHostName()).thenReturn(hostname2);
+ when(scomp.getServiceComponentHost(hostname1)).thenReturn(sch1);
+ when(scomp.getServiceComponentHost(hostname2)).thenReturn(sch2);
+
+ UnitOfWork unitOfWork = mock(UnitOfWork.class);
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+ final List<Stage> stages = new ArrayList<Stage>();
+ Stage stage = new Stage(1, "/tmp", "cluster1", "stageWith2Tasks",
+ CLUSTER_HOST_INFO);
+ addInstallTaskToStage(stage, hostname1, "cluster1", Role.DATANODE,
+ RoleCommand.INSTALL, Service.Type.HDFS, 1);
+ addInstallTaskToStage(stage, hostname2, "cluster1", Role.NAMENODE,
+ RoleCommand.INSTALL, Service.Type.HDFS, 2);
+ stages.add(stage);
+
+ ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+ when(db.getStagesInProgress()).thenReturn(stages);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ String host = (String) invocation.getArguments()[0];
+ String role = (String) invocation.getArguments()[3];
+ //HostRoleCommand command = stages.get(0).getHostRoleCommand(host, role);
+ for (HostRoleCommand command : stages.get(0).getOrderedHostRoleCommands()) {
+ if (command.getHostName().equals(host) && command.getRole().name()
+ .equals(role)) {
+ command.setStatus(HostRoleStatus.TIMEDOUT);
+ }
+ }
+ return null;
+ }
+ }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString());
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Long requestId = (Long) invocation.getArguments()[0];
+ for (Stage stage : stages) {
+ if (requestId.equals(stage.getRequestId())) {
+ for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+ if (command.getStatus() == HostRoleStatus.QUEUED ||
+ command.getStatus() == HostRoleStatus.IN_PROGRESS ||
+ command.getStatus() == HostRoleStatus.PENDING) {
+ command.setStatus(HostRoleStatus.ABORTED);
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+ }).when(db).abortOperation(anyLong());
+
+ ArgumentCaptor<ServiceComponentHostEvent> eventsCapture1 =
+ ArgumentCaptor.forClass(ServiceComponentHostEvent.class);
+ ArgumentCaptor<ServiceComponentHostEvent> eventsCapture2 =
+ ArgumentCaptor.forClass(ServiceComponentHostEvent.class);
+
+ // Make sure the NN install doesn't timeout
+ ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
+ new HostsMap((String) null), null, unitOfWork, conf);
+ scheduler.setTaskTimeoutAdjustment(false);
+ // Start the thread
+ scheduler.start();
+
+ while (!stages.get(0).getHostRoleStatus(hostname1, "DATANODE")
+ .equals(HostRoleStatus.TIMEDOUT) && !stages.get(0).getHostRoleStatus
+ (hostname2, "NAMENODE").equals(HostRoleStatus.ABORTED)) {
+
+ Thread.sleep(100L);
+ }
+
+ Assert.assertEquals(HostRoleStatus.TIMEDOUT,
+ stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
+ Assert.assertEquals(HostRoleStatus.ABORTED,
+ stages.get(0).getHostRoleStatus(hostname2, "NAMENODE"));
+
+ verify(sch1, atLeastOnce()).handleEvent(eventsCapture1.capture());
+ verify(sch2, atLeastOnce()).handleEvent(eventsCapture2.capture());
+
+ List<ServiceComponentHostEvent> eventTypes = eventsCapture1.getAllValues();
+ eventTypes.addAll(eventsCapture2.getAllValues());
+
+ Assert.assertNotNull(eventTypes);
+
+ ServiceComponentHostOpFailedEvent datanodeFailedEvent = null;
+ ServiceComponentHostOpFailedEvent namenodeFailedEvent = null;
+
+ for (ServiceComponentHostEvent eventType : eventTypes) {
+ if (eventType instanceof ServiceComponentHostOpFailedEvent) {
+ ServiceComponentHostOpFailedEvent event =
+ (ServiceComponentHostOpFailedEvent) eventType;
+
+ if (event.getServiceComponentName().equals("DATANODE")) {
+ datanodeFailedEvent = event;
+ } else if (event.getServiceComponentName().equals("NAMENODE")) {
+ namenodeFailedEvent = event;
+ }
+ }
+ }
+
+ Assert.assertNotNull("Datanode should be in Install failed state.",
+ datanodeFailedEvent);
+ Assert.assertNotNull("Namenode should be in Install failed state.",
+ namenodeFailedEvent);
+
+ scheduler.stop();
+ }
+
/**
* Test server action
*/
@@ -1027,6 +1163,24 @@ public class TestActionScheduler {
return stage;
}
+ private void addInstallTaskToStage(Stage stage, String hostname,
+ String clusterName, Role role,
+ RoleCommand roleCommand, Service.Type service,
+ int taskId) {
+
+ stage.addHostRoleExecutionCommand(hostname, role, roleCommand,
+ new ServiceComponentHostInstallEvent(role.toString(), hostname,
+ System.currentTimeMillis(), "HDP-0.2"), clusterName, service.toString());
+ ExecutionCommand command = stage.getExecutionCommandWrapper
+ (hostname, role.toString()).getExecutionCommand();
+ command.setTaskId(taskId);
+ for (HostRoleCommand cmd :stage.getOrderedHostRoleCommands()) {
+ if (cmd.getHostName().equals(hostname) && cmd.getRole().equals(role)) {
+ cmd.setTaskId(taskId);
+ }
+ }
+ }
+
@Test
public void testSuccessFactors() {
Stage s = StageUtils.getATestStage(1, 1, CLUSTER_HOST_INFO);