You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2013/04/24 20:04:39 UTC

svn commit: r1471568 - in /incubator/ambari/trunk: ./ ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ ambari-server/src/main/java/org/apache/ambari/server/agent/ ambari-server/src/test/java/org/apache/ambari/server/actionmanager/

Author: swagle
Date: Wed Apr 24 18:04:39 2013
New Revision: 1471568

URL: http://svn.apache.org/r1471568
Log:
AMBARI-2010. Tasks do not timeout for failed hosts. (swagle)

Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1471568&r1=1471567&r2=1471568&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Wed Apr 24 18:04:39 2013
@@ -786,6 +786,8 @@ Trunk (unreleased changes):
 
  BUG FIXES
 
+ AMBARI-2010. Tasks do not timeout for failed hosts. (swagle)
+
  AMBARI-2012. Check Ambari-agent process - nagios alert is only being
  configured on the nagios-server host. (smohanty)
 

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java?rev=1471568&r1=1471567&r2=1471568&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java Wed Apr 24 18:04:39 2013
@@ -31,7 +31,6 @@ import org.apache.ambari.server.state.fs
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.*;
 
 /**
@@ -279,7 +278,7 @@ class ActionScheduler implements Runnabl
    * has succeeded or failed.
    */
   private Map<String, RoleStats> processInProgressStage(Stage s,
-      List<ExecutionCommand> commandsToSchedule) {
+      List<ExecutionCommand> commandsToSchedule) throws AmbariException {
     // Map to track role status
     Map<String, RoleStats> roleStats = initRoleStats(s);
     long now = System.currentTimeMillis();
@@ -289,11 +288,14 @@ class ActionScheduler implements Runnabl
     }
     for (String host : s.getHosts()) {
       List<ExecutionCommandWrapper> commandWrappers = s.getExecutionCommands(host);
+      Cluster cluster = fsmObject.getCluster(s.getClusterName());
+      Host hostObj = fsmObject.getHost(host);
       for(ExecutionCommandWrapper wrapper : commandWrappers) {
         ExecutionCommand c = wrapper.getExecutionCommand();
         String roleStr = c.getRole().toString();
         HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
-        if (timeOutActionNeeded(status, s, host, roleStr, now, taskTimeout)) {
+        if (timeOutActionNeeded(status, s, hostObj, roleStr, now,
+          taskTimeout)) {
           LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:"
               + s.getActionId() + " timed out");
           if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
@@ -307,7 +309,6 @@ class ActionScheduler implements Runnabl
                 new ServiceComponentHostOpFailedEvent(roleStr,
                     host, now);
             try {
-              Cluster cluster = fsmObject.getCluster(s.getClusterName());
               Service svc = cluster.getService(c.getServiceName());
               ServiceComponent svcComp = svc.getServiceComponent(
                   roleStr);
@@ -322,6 +323,8 @@ class ActionScheduler implements Runnabl
             } catch (AmbariException ex) {
               LOG.warn("Invalid live state", ex);
             }
+            // Dequeue command
+            actionQueue.dequeue(host, c.getCommandId());
           } else {
             commandsToSchedule.add(c);
           }
@@ -360,12 +363,19 @@ class ActionScheduler implements Runnabl
   }
 
   private boolean timeOutActionNeeded(HostRoleStatus status, Stage stage,
-      String host, String role, long currentTime, long taskTimeout) {
+      Host host, String role, long currentTime, long taskTimeout) throws
+    AmbariException {
     if (( !status.equals(HostRoleStatus.QUEUED) ) &&
         ( ! status.equals(HostRoleStatus.IN_PROGRESS) )) {
       return false;
     }
-    if (currentTime > stage.getLastAttemptTime(host, role)+taskTimeout) {
+    // Fast fail task if host state is unknown
+    if (host.getState().equals(HostState.HEARTBEAT_LOST)) {
+      LOG.debug("Timing out action since agent is not heartbeating.");
+      return true;
+    }
+    if (currentTime > stage.getLastAttemptTime(host.getHostName(),
+      role) + taskTimeout) {
       return true;
     }
     return false;

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java?rev=1471568&r1=1471567&r2=1471568&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java Wed Apr 24 18:04:39 2013
@@ -19,6 +19,7 @@ package org.apache.ambari.server.agent;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -80,6 +81,30 @@ public class ActionQueue {
       }
     }
   }
+
+  public AgentCommand dequeue(String hostname, String commandId) {
+    Queue<AgentCommand> q = getQueue(hostname);
+    if (q == null) {
+      return null;
+    }
+    synchronized (q) {
+      if (q.isEmpty()) {
+        return null;
+      } else {
+        AgentCommand c = null;
+        for (Iterator it = q.iterator(); it.hasNext();) {
+          AgentCommand ac = (AgentCommand) it.next();
+          if (ac instanceof ExecutionCommand && ((ExecutionCommand) ac)
+            .getCommandId().equals(commandId)) {
+            c = ac;
+            it.remove();
+            break;
+          }
+        }
+        return c;
+      }
+    }
+  }
   
   public int size(String hostname) {
     Queue<AgentCommand> q = getQueue(hostname);

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java?rev=1471568&r1=1471567&r2=1471568&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java Wed Apr 24 18:04:39 2013
@@ -43,12 +43,15 @@ import org.apache.ambari.server.serverac
 import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
 import org.apache.ambari.server.utils.StageUtils;
+import org.easymock.EasyMock;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,10 +77,14 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+    String hostname = "ahost.ambari.apache.org";
+    Host host = mock(Host.class);
+    when(fsm.getHost(anyString())).thenReturn(host);
+    when(host.getState()).thenReturn(HostState.HEALTHY);
+    when(host.getHostName()).thenReturn(hostname);
 
     ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
     List<Stage> stages = new ArrayList<Stage>();
-    String hostname = "ahost.ambari.apache.org";
     Stage s = StageUtils.getATestStage(1, 977, hostname);
     stages.add(s);
     when(db.getStagesInProgress()).thenReturn(stages);
@@ -140,9 +147,13 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+    String hostname = "ahost.ambari.apache.org";
+    Host host = mock(Host.class);
+    when(fsm.getHost(anyString())).thenReturn(host);
+    when(host.getState()).thenReturn(HostState.HEALTHY);
+    when(host.getHostName()).thenReturn(hostname);
 
     ActionDBAccessor db = new ActionDBInMemoryImpl();
-    String hostname = "ahost.ambari.apache.org";
     List<Stage> stages = new ArrayList<Stage>();
     Stage s = StageUtils.getATestStage(1, 977, hostname);
     stages.add(s);
@@ -163,6 +174,46 @@ public class TestActionScheduler {
         HostRoleStatus.TIMEDOUT);
   }
 
+  @Test
+  public void testActionTimeoutForLostHost() throws Exception {
+    ActionQueue aq = new ActionQueue();
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+    String hostname = "ahost.ambari.apache.org";
+    Host host = mock(Host.class);
+    when(fsm.getHost(anyString())).thenReturn(host);
+    when(host.getState()).thenReturn(HostState.HEARTBEAT_LOST);
+    when(host.getHostName()).thenReturn(hostname);
+
+    ActionDBAccessor db = new ActionDBInMemoryImpl();
+    List<Stage> stages = new ArrayList<Stage>();
+    Stage s = StageUtils.getATestStage(1, 977, hostname);
+    stages.add(s);
+    db.persistActions(stages);
+
+    //Small action timeout to test rescheduling
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+      new HostsMap((String) null), null);
+    scheduler.setTaskTimeoutAdjustment(false);
+    // Start the thread
+    scheduler.start();
+
+    while (!stages.get(0).getHostRoleStatus(hostname, "NAMENODE")
+      .equals(HostRoleStatus.TIMEDOUT)) {
+      Thread.sleep(100L);
+    }
+    assertEquals(stages.get(0).getHostRoleStatus(hostname, "NAMENODE"),
+      HostRoleStatus.TIMEDOUT);
+  }
+
   /**
    * Test server action
    */
@@ -298,6 +349,11 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+    Host host = mock(Host.class);
+    when(fsm.getHost(anyString())).thenReturn(host);
+    when(host.getState()).thenReturn(HostState.HEALTHY);
+    when(host.getHostName()).thenReturn("host1");
+
 
     ActionDBAccessor db = new ActionDBInMemoryImpl();
     List<Stage> stages = new ArrayList<Stage>();