You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2013/04/24 20:04:39 UTC
svn commit: r1471568 - in /incubator/ambari/trunk: ./
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/
ambari-server/src/main/java/org/apache/ambari/server/agent/
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/
Author: swagle
Date: Wed Apr 24 18:04:39 2013
New Revision: 1471568
URL: http://svn.apache.org/r1471568
Log:
AMBARI-2010. Tasks do not timeout for failed hosts. (swagle)
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1471568&r1=1471567&r2=1471568&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Wed Apr 24 18:04:39 2013
@@ -786,6 +786,8 @@ Trunk (unreleased changes):
BUG FIXES
+ AMBARI-2010. Tasks do not timeout for failed hosts. (swagle)
+
AMBARI-2012. Check Ambari-agent process - nagios alert is only being
configured on the nagios-server host. (smohanty)
Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java?rev=1471568&r1=1471567&r2=1471568&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java Wed Apr 24 18:04:39 2013
@@ -31,7 +31,6 @@ import org.apache.ambari.server.state.fs
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.*;
/**
@@ -279,7 +278,7 @@ class ActionScheduler implements Runnabl
* has succeeded or failed.
*/
private Map<String, RoleStats> processInProgressStage(Stage s,
- List<ExecutionCommand> commandsToSchedule) {
+ List<ExecutionCommand> commandsToSchedule) throws AmbariException {
// Map to track role status
Map<String, RoleStats> roleStats = initRoleStats(s);
long now = System.currentTimeMillis();
@@ -289,11 +288,14 @@ class ActionScheduler implements Runnabl
}
for (String host : s.getHosts()) {
List<ExecutionCommandWrapper> commandWrappers = s.getExecutionCommands(host);
+ Cluster cluster = fsmObject.getCluster(s.getClusterName());
+ Host hostObj = fsmObject.getHost(host);
for(ExecutionCommandWrapper wrapper : commandWrappers) {
ExecutionCommand c = wrapper.getExecutionCommand();
String roleStr = c.getRole().toString();
HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
- if (timeOutActionNeeded(status, s, host, roleStr, now, taskTimeout)) {
+ if (timeOutActionNeeded(status, s, hostObj, roleStr, now,
+ taskTimeout)) {
LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:"
+ s.getActionId() + " timed out");
if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
@@ -307,7 +309,6 @@ class ActionScheduler implements Runnabl
new ServiceComponentHostOpFailedEvent(roleStr,
host, now);
try {
- Cluster cluster = fsmObject.getCluster(s.getClusterName());
Service svc = cluster.getService(c.getServiceName());
ServiceComponent svcComp = svc.getServiceComponent(
roleStr);
@@ -322,6 +323,8 @@ class ActionScheduler implements Runnabl
} catch (AmbariException ex) {
LOG.warn("Invalid live state", ex);
}
+ // Dequeue command
+ actionQueue.dequeue(host, c.getCommandId());
} else {
commandsToSchedule.add(c);
}
@@ -360,12 +363,19 @@ class ActionScheduler implements Runnabl
}
private boolean timeOutActionNeeded(HostRoleStatus status, Stage stage,
- String host, String role, long currentTime, long taskTimeout) {
+ Host host, String role, long currentTime, long taskTimeout) throws
+ AmbariException {
if (( !status.equals(HostRoleStatus.QUEUED) ) &&
( ! status.equals(HostRoleStatus.IN_PROGRESS) )) {
return false;
}
- if (currentTime > stage.getLastAttemptTime(host, role)+taskTimeout) {
+ // Fast fail task if host state is unknown
+ if (host.getState().equals(HostState.HEARTBEAT_LOST)) {
+ LOG.debug("Timing out action since agent is not heartbeating.");
+ return true;
+ }
+ if (currentTime > stage.getLastAttemptTime(host.getHostName(),
+ role) + taskTimeout) {
return true;
}
return false;
Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java?rev=1471568&r1=1471567&r2=1471568&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java Wed Apr 24 18:04:39 2013
@@ -19,6 +19,7 @@ package org.apache.ambari.server.agent;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -80,6 +81,30 @@ public class ActionQueue {
}
}
}
+
+ public AgentCommand dequeue(String hostname, String commandId) {
+ Queue<AgentCommand> q = getQueue(hostname);
+ if (q == null) {
+ return null;
+ }
+ synchronized (q) {
+ if (q.isEmpty()) {
+ return null;
+ } else {
+ AgentCommand c = null;
+ for (Iterator it = q.iterator(); it.hasNext();) {
+ AgentCommand ac = (AgentCommand) it.next();
+ if (ac instanceof ExecutionCommand && ((ExecutionCommand) ac)
+ .getCommandId().equals(commandId)) {
+ c = ac;
+ it.remove();
+ break;
+ }
+ }
+ return c;
+ }
+ }
+ }
public int size(String hostname) {
Queue<AgentCommand> q = getQueue(hostname);
Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java?rev=1471568&r1=1471567&r2=1471568&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java Wed Apr 24 18:04:39 2013
@@ -43,12 +43,15 @@ import org.apache.ambari.server.serverac
import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
import org.apache.ambari.server.utils.StageUtils;
+import org.easymock.EasyMock;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,10 +77,14 @@ public class TestActionScheduler {
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+ String hostname = "ahost.ambari.apache.org";
+ Host host = mock(Host.class);
+ when(fsm.getHost(anyString())).thenReturn(host);
+ when(host.getState()).thenReturn(HostState.HEALTHY);
+ when(host.getHostName()).thenReturn(hostname);
ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
List<Stage> stages = new ArrayList<Stage>();
- String hostname = "ahost.ambari.apache.org";
Stage s = StageUtils.getATestStage(1, 977, hostname);
stages.add(s);
when(db.getStagesInProgress()).thenReturn(stages);
@@ -140,9 +147,13 @@ public class TestActionScheduler {
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+ String hostname = "ahost.ambari.apache.org";
+ Host host = mock(Host.class);
+ when(fsm.getHost(anyString())).thenReturn(host);
+ when(host.getState()).thenReturn(HostState.HEALTHY);
+ when(host.getHostName()).thenReturn(hostname);
ActionDBAccessor db = new ActionDBInMemoryImpl();
- String hostname = "ahost.ambari.apache.org";
List<Stage> stages = new ArrayList<Stage>();
Stage s = StageUtils.getATestStage(1, 977, hostname);
stages.add(s);
@@ -163,6 +174,46 @@ public class TestActionScheduler {
HostRoleStatus.TIMEDOUT);
}
+ @Test
+ public void testActionTimeoutForLostHost() throws Exception {
+ ActionQueue aq = new ActionQueue();
+ Clusters fsm = mock(Clusters.class);
+ Cluster oneClusterMock = mock(Cluster.class);
+ Service serviceObj = mock(Service.class);
+ ServiceComponent scomp = mock(ServiceComponent.class);
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+ String hostname = "ahost.ambari.apache.org";
+ Host host = mock(Host.class);
+ when(fsm.getHost(anyString())).thenReturn(host);
+ when(host.getState()).thenReturn(HostState.HEARTBEAT_LOST);
+ when(host.getHostName()).thenReturn(hostname);
+
+ ActionDBAccessor db = new ActionDBInMemoryImpl();
+ List<Stage> stages = new ArrayList<Stage>();
+ Stage s = StageUtils.getATestStage(1, 977, hostname);
+ stages.add(s);
+ db.persistActions(stages);
+
+ //Small action timeout to test rescheduling
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), null);
+ scheduler.setTaskTimeoutAdjustment(false);
+ // Start the thread
+ scheduler.start();
+
+ while (!stages.get(0).getHostRoleStatus(hostname, "NAMENODE")
+ .equals(HostRoleStatus.TIMEDOUT)) {
+ Thread.sleep(100L);
+ }
+ assertEquals(stages.get(0).getHostRoleStatus(hostname, "NAMENODE"),
+ HostRoleStatus.TIMEDOUT);
+ }
+
/**
* Test server action
*/
@@ -298,6 +349,11 @@ public class TestActionScheduler {
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+ Host host = mock(Host.class);
+ when(fsm.getHost(anyString())).thenReturn(host);
+ when(host.getState()).thenReturn(HostState.HEALTHY);
+ when(host.getHostName()).thenReturn("host1");
+
ActionDBAccessor db = new ActionDBInMemoryImpl();
List<Stage> stages = new ArrayList<Stage>();