You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/12/22 03:22:06 UTC
svn commit: r1221997 - in /incubator/ambari/trunk: ./
agent/src/main/python/ambari_agent/ agent/src/test/python/
controller/src/main/java/org/apache/ambari/controller/
Author: ddas
Date: Thu Dec 22 02:22:06 2011
New Revision: 1221997
URL: http://svn.apache.org/viewvc?rev=1221997&view=rev
Log:
AMBARI-171. Agents retry failed actions for a configurable number of times after a configurable delay.
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py
incubator/ambari/trunk/agent/src/test/python/TestActionQueue.py
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1221997&r1=1221996&r2=1221997&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Thu Dec 22 02:22:06 2011
@@ -2,6 +2,9 @@ Ambari Change log
Release 0.1.0 - unreleased
+ AMBARI-171. Agents retry failed actions for a configurable number of times
+ after a configurable delay (ddas)
+
AMBARI-170. Update the cluster state after state machine transitions it to final ACTIVE/INACTIVE state (vgogate)
AMBARI-168. trim the white spaces from host names returned through getHostnamesFromRageExpressions (vgogate)
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py?rev=1221997&r1=1221996&r2=1221997&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py Thu Dec 22 02:22:06 2011
@@ -49,6 +49,8 @@ class ActionQueue(threading.Thread):
self.config = config
self.sh = shellRunner()
self._stop = threading.Event()
+ self.maxRetries = config.get('command', 'maxretries')
+ self.sleepInterval = config.get('command', 'sleepBetweenRetries')
def stop(self):
self._stop.set()
@@ -97,13 +99,41 @@ class ActionQueue(threading.Thread):
'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction,
'NO_OP_ACTION' : self.noOpAction
}
- try:
- result = switches.get(action['kind'], self.unknownAction)(action)
- except Exception, err:
- traceback.print_exc()
- logger.info(err)
+
+ exitCode = 1
+ retryCount = 1
+ while (exitCode != 0 and retryCount <= self.maxRetries):
+ try:
+ result = switches.get(action['kind'], self.unknownAction)(action)
+ if ('commandResult' in result):
+ commandResult = result['commandResult']
+ exitCode = commandResult['exitCode']
+ if (exitCode == 0):
+ break
+ else:
+ logger.warn(str(action) + " exited with code " + str(exitCode))
+ else:
+ #Really, no commandResult? Is this possible?
+ #TODO: check
+ exitCode = 0
+ break
+ except Exception, err:
+ traceback.print_exc()
+ logger.warn(err)
+ if ('commandResult' in result):
+ commandResult = result['commandResult']
+ if ('exitCode' in commandResult):
+ exitCode = commandResult['exitCode']
+ #retry in 5 seconds
+ time.sleep(self.sleepInterval)
+ retryCount += 1
+
+ if (exitCode != 0):
result = self.genResult(action)
- result['exitCode']=1
+ result['exitCode']=exitCode
+ result['retryActionCount'] = retryCount - 1
+ else:
+ result['retryActionCount'] = retryCount
# Update the result
r.put(result)
if not self.stopped():
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py?rev=1221997&r1=1221996&r2=1221997&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py Thu Dec 22 02:22:06 2011
@@ -41,6 +41,10 @@ prefix=/homes/ddas/puppet
commandpath=/usr/local/bin/puppet apply --modulepath /home/puppet/puppet-ambari/modules
driver=/home/puppet/puppet-ambari/manifests/site.pp
+[command]
+maxretries=2
+sleepBetweenRetries=1
+
"""
s = StringIO.StringIO(content)
config.readfp(s)
Modified: incubator/ambari/trunk/agent/src/test/python/TestActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/test/python/TestActionQueue.py?rev=1221997&r1=1221996&r2=1221997&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/test/python/TestActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/test/python/TestActionQueue.py Thu Dec 22 02:22:06 2011
@@ -21,7 +21,8 @@ limitations under the License.
from unittest import TestCase
from ambari_agent.ActionQueue import ActionQueue
from ambari_agent.AmbariConfig import AmbariConfig
-import os, errno
+from ambari_agent.FileUtil import getFilePath
+import os, errno, time
class TestActionQueue(TestCase):
def test_ActionQueueStartStop(self):
@@ -30,3 +31,60 @@ class TestActionQueue(TestCase):
actionQueue.stop()
actionQueue.join()
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+
+ def test_RetryAction(self):
+ action={'id' : 'tttt'}
+ config = AmbariConfig().getConfig()
+ actionQueue = ActionQueue(config)
+ path = actionQueue.getInstallFilename(action['id'])
+ configFile = {
+ "data" : "test",
+ "owner" : os.getuid(),
+ "group" : os.getgid() ,
+ "permission" : 0700,
+ "path" : path,
+ "umask" : 022
+ }
+
+ #note that the command in the action is just a listing of the path created
+ #we just want to ensure that 'ls' can run on the data file (in the actual world
+ #this 'ls' would be a puppet or a chef command that would work on a data
+ #file
+ badAction = {
+ 'id' : 'tttt',
+ 'kind' : 'INSTALL_AND_CONFIG_ACTION',
+ 'workDirComponent' : 'abc-hdfs',
+ 'file' : configFile,
+ 'clusterDefinitionRevision' : 12,
+ 'command' : ['/bin/ls',"/foo/bar/badPath1234"]
+ }
+ path=getFilePath(action,path)
+ goodAction = {
+ 'id' : 'tttt',
+ 'kind' : 'INSTALL_AND_CONFIG_ACTION',
+ 'workDirComponent' : 'abc-hdfs',
+ 'file' : configFile,
+ 'clusterDefinitionRevision' : 12,
+ 'command' : ['/bin/ls',path]
+ }
+ actionQueue.start()
+ response = {'actions' : [badAction,goodAction]}
+ actionQueue.maxRetries = 2
+ actionQueue.sleepInterval = 1
+ result = actionQueue.put(response)
+ time.sleep(5)
+ actionQueue.stop()
+ actionQueue.join()
+ results = actionQueue.result()
+ self.assertEqual(len(results), 2, 'Number of results is not 2.')
+ result = results[0]
+ maxretries = config.get('command', 'maxretries')
+ self.assertEqual(int(result['retryActionCount']),
+ int(maxretries),
+ "Number of retries is %d and not %d" %
+ (int(result['retryActionCount']), int(str(maxretries))))
+ result = results[1]
+ self.assertEqual(int(result['retryActionCount']),
+ 1,
+ "Number of retries is %d and not %d" %
+ (int(result['retryActionCount']), 1))
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1221997&r1=1221996&r2=1221997&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java Thu Dec 22 02:22:06 2011
@@ -61,10 +61,10 @@ public class HeartbeatHandler {
private static Log LOG = LogFactory.getLog(HeartbeatHandler.class);
private final Clusters clusters;
private final Nodes nodes;
- private StateMachineInvokerInterface stateMachineInvoker;
- private FSMDriverInterface driver;
+ private final StateMachineInvokerInterface stateMachineInvoker;
+ private final FSMDriverInterface driver;
- static final String DEFAULT_USER = "hdfs"; //TBD: this needs to come from the stack definition or something
+ static final String DEFAULT_USER = "hdfs"; //TODO: this needs to come from the stack definition or something (AMBARI-169)
@Inject
HeartbeatHandler(Clusters clusters, Nodes nodes,
@@ -192,6 +192,8 @@ public class HeartbeatHandler {
return createResponse(responseId,allActions,heartbeat);
}
+ //TODO: this should be moved to the ClusterImpl (a dependency graph
+ //should be created there)
private boolean dependentComponentsActive(ComponentPlugin plugin,
ClusterFSM cluster) throws IOException {
String[] dependents = plugin.getRequiredComponents();
@@ -216,7 +218,7 @@ public class HeartbeatHandler {
List<Action> allActions, HeartBeat heartbeat) {
ControllerResponse r = new ControllerResponse();
r.setResponseId(responseId);
- if (allActions.size() > 0) {//TODO: REMOVE THIS
+ if (allActions.size() > 0) {//TODO: REMOVE THIS (AMBARI-158)
Action a = new Action();
a.setKind(Kind.NO_OP_ACTION);
allActions.add(a);
@@ -260,7 +262,7 @@ public class HeartbeatHandler {
List<Action> allActions) {
ConfigFile file = new ConfigFile();
file.setData(script);
- file.setOwner(DEFAULT_USER);
+ file.setOwner(DEFAULT_USER); //TODO (AMBARI-169)
Action action = new Action();
action.setFile(file);
@@ -468,7 +470,7 @@ public class HeartbeatHandler {
action.setClusterDefinitionRevision(clusterDefRev);
action.setComponent(component);
action.setRole(role);
- action.setUser(DEFAULT_USER);
+ action.setUser(DEFAULT_USER); //TODO (AMBARI-169)
action.setCleanUpCommand(new Command("foobar","",new String[]{"foobar"}));//TODO: this needs fixing at some point
String workDir = role.equals(component + "-client") ?
(clusterId + "-client") : (clusterId + "-" + role);
@@ -481,4 +483,9 @@ public class HeartbeatHandler {
fillActionDetails(action, clusterId, clusterDefRev, component, role);
addAction(action, allActions);
}
+
+ private class ActionTracker {
+ //tracks all actions based on agent hostnames. When the agent returns a response
+ //note all the failed actionIDs and resend them
+ }
}