You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/02/12 03:48:17 UTC

git commit: AMBARI-4621. Host Components stuck in Installing state. (swagle)

Updated Branches:
  refs/heads/trunk d0b03b5f5 -> 491313662


AMBARI-4621. Host Components stuck in Installing state. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/49131366
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/49131366
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/49131366

Branch: refs/heads/trunk
Commit: 49131366201e701efaa96af3882fe770fb03dd74
Parents: d0b03b5
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Feb 11 17:08:55 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Feb 11 18:48:12 2014 -0800

----------------------------------------------------------------------
 .../server/actionmanager/ActionScheduler.java   |  91 ++++++++---
 .../svccomphost/ServiceComponentHostImpl.java   |   9 ++
 .../actionmanager/TestActionScheduler.java      | 162 ++++++++++++++++++-
 3 files changed, 235 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/49131366/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 601930d..f5f16d7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -228,7 +228,7 @@ class ActionScheduler implements Runnable {
         if (failed) {
           LOG.warn("Operation completely failed, aborting request id:"
               + s.getRequestId());
-          db.abortOperation(s.getRequestId());
+          abortOperationsForStage(s);
           return;
         }
 
@@ -394,26 +394,10 @@ class ActionScheduler implements Runnable {
             db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole());
             //Reinitialize status
             status = s.getHostRoleStatus(host, roleStr);
-            ServiceComponentHostOpFailedEvent timeoutEvent =
-                new ServiceComponentHostOpFailedEvent(roleStr,
-                    host, now);
-            try {
-              Service svc = cluster.getService(c.getServiceName());
-              ServiceComponent svcComp = svc.getServiceComponent(
-                  roleStr);
-              ServiceComponentHost svcCompHost =
-                  svcComp.getServiceComponentHost(host);
-              svcCompHost.handleEvent(timeoutEvent);
-              LOG.warn("Operation timed out. Role: " + roleStr + ", host: " + host);
-            } catch (ServiceComponentNotFoundException scnex) {
-              LOG.debug(roleStr + " associated with service " + c.getServiceName() +
-                  " is not a service component, assuming it's an action.");
-            } catch (InvalidStateTransitionException e) {
-              LOG.info("Transition failed for host: " + host + ", role: "
-                  + roleStr, e);
-            } catch (AmbariException ex) {
-              LOG.warn("Invalid live state", ex);
-            }
+            transitionToFailedState(cluster.getClusterName(),
+              c.getServiceName(), roleStr, host, now, false);
+            LOG.warn("Operation timed out. Role: " + roleStr + ", host: " + host);
+
             // Dequeue command
             actionQueue.dequeue(host, c.getCommandId());
           } else {
@@ -430,6 +414,67 @@ class ActionScheduler implements Runnable {
     return roleStats;
   }
 
+  /**
+   * Generate a OPFailed event before aborting all operations in the stage
+   * @param stage
+   */
+  private void abortOperationsForStage(Stage stage) {
+    long now = System.currentTimeMillis();
+
+    for (String hostName : stage.getHosts()) {
+      List<ExecutionCommandWrapper> commandWrappers =
+        stage.getExecutionCommands(hostName);
+
+      for(ExecutionCommandWrapper wrapper : commandWrappers) {
+        ExecutionCommand c = wrapper.getExecutionCommand();
+        transitionToFailedState(stage.getClusterName(), c.getServiceName(),
+          c.getRole(), hostName, now, true);
+      }
+    }
+
+    db.abortOperation(stage.getRequestId());
+  }
+
+  /**
+   * Raise a OPFailed event for a SCH
+   * @param clusterName
+   * @param serviceName
+   * @param componentName
+   * @param hostname
+   * @param timestamp
+   */
+  private void transitionToFailedState(String clusterName, String serviceName,
+                                       String componentName, String hostname,
+                                       long timestamp,
+                                       boolean ignoreTransitionException) {
+
+    try {
+      Cluster cluster = fsmObject.getCluster(clusterName);
+
+      ServiceComponentHostOpFailedEvent timeoutEvent =
+        new ServiceComponentHostOpFailedEvent(componentName,
+          hostname, timestamp);
+
+      Service svc = cluster.getService(serviceName);
+      ServiceComponent svcComp = svc.getServiceComponent(componentName);
+      ServiceComponentHost svcCompHost =
+        svcComp.getServiceComponentHost(hostname);
+      svcCompHost.handleEvent(timeoutEvent);
+
+    } catch (ServiceComponentNotFoundException scnex) {
+      LOG.debug(componentName + " associated with service " + serviceName +
+        " is not a service component, assuming it's an action.");
+    } catch (InvalidStateTransitionException e) {
+      if (ignoreTransitionException) {
+        LOG.debug("Unable to transition to failed state.", e);
+      } else {
+        LOG.warn("Unable to transition to failed state.", e);
+      }
+    } catch (AmbariException e) {
+      LOG.warn("Unable to transition to failed state.", e);
+    }
+  }
+
 
   /**
    * Populates a map < role_name, role_stats>.
@@ -472,8 +517,8 @@ class ActionScheduler implements Runnable {
       LOG.debug("Timing out action since agent is not heartbeating.");
       return true;
     }
-    if (currentTime > stage.getLastAttemptTime(host.getHostName(),
-      role) + taskTimeout) {
+    if (currentTime > stage.getLastAttemptTime(host.getHostName(), role)
+        + taskTimeout) {
       return true;
     }
     return false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/49131366/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index b90027b..94ae38a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -156,6 +156,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
          ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,
          new ServiceComponentHostOpStartedTransition())
 
+       // Allow transition on abort
+     .addTransition(State.INSTALL_FAILED, State.INSTALL_FAILED,
+         ServiceComponentHostEventType.HOST_SVCCOMP_OP_FAILED,
+         new ServiceComponentHostOpCompletedTransition())
+
      .addTransition(State.INSTALLED,
          State.STARTING,
          ServiceComponentHostEventType.HOST_SVCCOMP_START,
@@ -387,6 +392,10 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
          State.INSTALLING,
          ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,
          new ServiceComponentHostOpStartedTransition())
+      // Allow transition on abort
+     .addTransition(State.INSTALL_FAILED, State.INSTALL_FAILED,
+         ServiceComponentHostEventType.HOST_SVCCOMP_OP_FAILED,
+         new ServiceComponentHostOpCompletedTransition())
 
     .addTransition(State.INSTALLED,
          State.INSTALLED,

http://git-wip-us.apache.org/repos/asf/ambari/blob/49131366/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index de6c98b..063213c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -24,7 +24,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
-
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -33,12 +32,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-
 import com.google.common.reflect.TypeToken;
 import com.google.inject.persist.UnitOfWork;
-
 import junit.framework.Assert;
-
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionScheduler.RoleStats;
@@ -57,10 +53,15 @@ import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.ServiceComponentHostEventType;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
 import org.apache.ambari.server.utils.StageUtils;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -277,6 +278,141 @@ public class TestActionScheduler {
     scheduler.stop();
   }
 
+  @Test
+  public void testOpFailedEventRaisedForAbortedHostRole() throws Exception {
+    ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch1 = mock(ServiceComponentHost.class);
+    ServiceComponentHost sch2 = mock(ServiceComponentHost.class);
+    String hostname1 = "host1";
+    String hostname2 = "host2";
+    Host host1 = mock(Host.class);
+    Host host2 = mock(Host.class);
+    when(fsm.getHost(hostname1)).thenReturn(host1);
+    when(fsm.getHost(hostname2)).thenReturn(host2);
+    when(host1.getState()).thenReturn(HostState.HEARTBEAT_LOST);
+    when(host2.getState()).thenReturn(HostState.HEALTHY);
+    when(host1.getHostName()).thenReturn(hostname1);
+    when(host2.getHostName()).thenReturn(hostname2);
+    when(scomp.getServiceComponentHost(hostname1)).thenReturn(sch1);
+    when(scomp.getServiceComponentHost(hostname2)).thenReturn(sch2);
+
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    final List<Stage> stages = new ArrayList<Stage>();
+    Stage stage = new Stage(1, "/tmp", "cluster1", "stageWith2Tasks",
+      CLUSTER_HOST_INFO);
+    addInstallTaskToStage(stage, hostname1, "cluster1", Role.DATANODE,
+      RoleCommand.INSTALL, Service.Type.HDFS, 1);
+    addInstallTaskToStage(stage, hostname2, "cluster1", Role.NAMENODE,
+      RoleCommand.INSTALL, Service.Type.HDFS, 2);
+    stages.add(stage);
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    when(db.getStagesInProgress()).thenReturn(stages);
+
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[3];
+        //HostRoleCommand command = stages.get(0).getHostRoleCommand(host, role);
+        for (HostRoleCommand command : stages.get(0).getOrderedHostRoleCommands()) {
+          if (command.getHostName().equals(host) && command.getRole().name()
+              .equals(role)) {
+            command.setStatus(HostRoleStatus.TIMEDOUT);
+          }
+        }
+        return null;
+      }
+    }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString());
+
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Long requestId = (Long) invocation.getArguments()[0];
+        for (Stage stage : stages) {
+          if (requestId.equals(stage.getRequestId())) {
+            for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+              if (command.getStatus() == HostRoleStatus.QUEUED ||
+                command.getStatus() == HostRoleStatus.IN_PROGRESS ||
+                command.getStatus() == HostRoleStatus.PENDING) {
+                command.setStatus(HostRoleStatus.ABORTED);
+              }
+            }
+          }
+        }
+
+        return null;
+      }
+    }).when(db).abortOperation(anyLong());
+
+    ArgumentCaptor<ServiceComponentHostEvent> eventsCapture1 =
+      ArgumentCaptor.forClass(ServiceComponentHostEvent.class);
+    ArgumentCaptor<ServiceComponentHostEvent> eventsCapture2 =
+      ArgumentCaptor.forClass(ServiceComponentHostEvent.class);
+
+    // Make sure the NN install doesn't timeout
+    ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
+      new HostsMap((String) null), null, unitOfWork, conf);
+    scheduler.setTaskTimeoutAdjustment(false);
+    // Start the thread
+    scheduler.start();
+
+    while (!stages.get(0).getHostRoleStatus(hostname1, "DATANODE")
+      .equals(HostRoleStatus.TIMEDOUT) && !stages.get(0).getHostRoleStatus
+      (hostname2, "NAMENODE").equals(HostRoleStatus.ABORTED)) {
+
+      Thread.sleep(100L);
+    }
+
+    Assert.assertEquals(HostRoleStatus.TIMEDOUT,
+      stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
+    Assert.assertEquals(HostRoleStatus.ABORTED,
+      stages.get(0).getHostRoleStatus(hostname2, "NAMENODE"));
+
+    verify(sch1, atLeastOnce()).handleEvent(eventsCapture1.capture());
+    verify(sch2, atLeastOnce()).handleEvent(eventsCapture2.capture());
+
+    List<ServiceComponentHostEvent> eventTypes = eventsCapture1.getAllValues();
+    eventTypes.addAll(eventsCapture2.getAllValues());
+
+    Assert.assertNotNull(eventTypes);
+
+    ServiceComponentHostOpFailedEvent datanodeFailedEvent = null;
+    ServiceComponentHostOpFailedEvent namenodeFailedEvent = null;
+
+    for (ServiceComponentHostEvent eventType : eventTypes) {
+      if (eventType instanceof ServiceComponentHostOpFailedEvent) {
+        ServiceComponentHostOpFailedEvent event =
+          (ServiceComponentHostOpFailedEvent) eventType;
+
+        if (event.getServiceComponentName().equals("DATANODE")) {
+          datanodeFailedEvent = event;
+        } else if (event.getServiceComponentName().equals("NAMENODE")) {
+          namenodeFailedEvent = event;
+        }
+      }
+    }
+
+    Assert.assertNotNull("Datanode should be in Install failed state.",
+      datanodeFailedEvent);
+    Assert.assertNotNull("Namenode should be in Install failed state.",
+      namenodeFailedEvent);
+
+    scheduler.stop();
+  }
+
   /**
    * Test server action
    */
@@ -1027,6 +1163,24 @@ public class TestActionScheduler {
     return stage;
   }
 
+  private void addInstallTaskToStage(Stage stage, String hostname,
+                              String clusterName, Role role,
+                              RoleCommand roleCommand, Service.Type service,
+                              int taskId) {
+
+    stage.addHostRoleExecutionCommand(hostname, role, roleCommand,
+      new ServiceComponentHostInstallEvent(role.toString(), hostname,
+        System.currentTimeMillis(), "HDP-0.2"), clusterName, service.toString());
+    ExecutionCommand command = stage.getExecutionCommandWrapper
+      (hostname, role.toString()).getExecutionCommand();
+    command.setTaskId(taskId);
+    for (HostRoleCommand cmd :stage.getOrderedHostRoleCommands()) {
+      if (cmd.getHostName().equals(hostname) && cmd.getRole().equals(role)) {
+        cmd.setTaskId(taskId);
+      }
+    }
+  }
+
   @Test
   public void testSuccessFactors() {
     Stage s = StageUtils.getATestStage(1, 1, CLUSTER_HOST_INFO);