You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/12/22 03:22:06 UTC

svn commit: r1221997 - in /incubator/ambari/trunk: ./ agent/src/main/python/ambari_agent/ agent/src/test/python/ controller/src/main/java/org/apache/ambari/controller/

Author: ddas
Date: Thu Dec 22 02:22:06 2011
New Revision: 1221997

URL: http://svn.apache.org/viewvc?rev=1221997&view=rev
Log:
AMBARI-171. Agents retry failed actions for a configurable number of times after a configurable delay.

Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py
    incubator/ambari/trunk/agent/src/test/python/TestActionQueue.py
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1221997&r1=1221996&r2=1221997&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Thu Dec 22 02:22:06 2011
@@ -2,6 +2,9 @@ Ambari Change log
 
 Release 0.1.0 - unreleased
 
+  AMBARI-171. Agents retry failed actions for a configurable number of times
+  after a configurable delay (ddas)
+
   AMBARI-170. Update the cluster state after state machine transitions it to final ACTIVE/INACTIVE state (vgogate)
 
   AMBARI-168. trim the white spaces from host names returned through getHostnamesFromRageExpressions (vgogate)

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py?rev=1221997&r1=1221996&r2=1221997&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py Thu Dec 22 02:22:06 2011
@@ -49,6 +49,8 @@ class ActionQueue(threading.Thread):
     self.config = config
     self.sh = shellRunner()
     self._stop = threading.Event()
+    self.maxRetries = config.get('command', 'maxretries') 
+    self.sleepInterval = config.get('command', 'sleepBetweenRetries')
 
   def stop(self):
     self._stop.set()
@@ -97,13 +99,41 @@ class ActionQueue(threading.Thread):
                      'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction,
                      'NO_OP_ACTION'              : self.noOpAction
                    }
-        try:
-          result = switches.get(action['kind'], self.unknownAction)(action)
-        except Exception, err:
-          traceback.print_exc()  
-          logger.info(err)
+        
+        exitCode = 1
+        retryCount = 1
+        while (exitCode != 0 and retryCount <= self.maxRetries):
+          try:
+            result = switches.get(action['kind'], self.unknownAction)(action) 
+            if ('commandResult' in result):
+              commandResult = result['commandResult']
+              exitCode = commandResult['exitCode']
+              if (exitCode == 0):
+                break
+              else:
+                logger.warn(str(action) + " exited with code " + str(exitCode))
+            else:
+              #Really, no commandResult? Is this possible?
+              #TODO: check
+              exitCode = 0
+              break
+          except Exception, err:
+            traceback.print_exc()  
+            logger.warn(err)
+            if ('commandResult' in result):
+              commandResult = result['commandResult']
+              if ('exitCode' in commandResult):
+                exitCode = commandResult['exitCode']
+          #retry in 5 seconds  
+          time.sleep(self.sleepInterval)
+          retryCount += 1
+          
+        if (exitCode != 0):
           result = self.genResult(action)
-          result['exitCode']=1
+          result['exitCode']=exitCode
+          result['retryActionCount'] = retryCount - 1
+        else:
+          result['retryActionCount'] = retryCount
         # Update the result
         r.put(result)
       if not self.stopped():

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py?rev=1221997&r1=1221996&r2=1221997&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py Thu Dec 22 02:22:06 2011
@@ -41,6 +41,10 @@ prefix=/homes/ddas/puppet
 commandpath=/usr/local/bin/puppet apply --modulepath /home/puppet/puppet-ambari/modules
 driver=/home/puppet/puppet-ambari/manifests/site.pp
 
+[command]
+maxretries=2
+sleepBetweenRetries=1
+
 """
 s = StringIO.StringIO(content)
 config.readfp(s)

Modified: incubator/ambari/trunk/agent/src/test/python/TestActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/test/python/TestActionQueue.py?rev=1221997&r1=1221996&r2=1221997&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/test/python/TestActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/test/python/TestActionQueue.py Thu Dec 22 02:22:06 2011
@@ -21,7 +21,8 @@ limitations under the License.
 from unittest import TestCase
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.AmbariConfig import AmbariConfig
-import os, errno
+from ambari_agent.FileUtil import getFilePath
+import os, errno, time
 
 class TestActionQueue(TestCase):
   def test_ActionQueueStartStop(self):
@@ -30,3 +31,60 @@ class TestActionQueue(TestCase):
     actionQueue.stop()
     actionQueue.join()
     self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') 
+
+  def test_RetryAction(self):
+    action={'id' : 'tttt'}
+    config = AmbariConfig().getConfig()
+    actionQueue = ActionQueue(config)
+    path = actionQueue.getInstallFilename(action['id'])
+    configFile = {
+      "data"       : "test",
+      "owner"      : os.getuid(),
+      "group"      : os.getgid() ,
+      "permission" : 0700,
+      "path"       : path,
+      "umask"      : 022
+    }
+
+    #note that the command in the action is just a listing of the path created
+    #we just want to ensure that 'ls' can run on the data file (in the actual world
+    #this 'ls' would be a puppet or a chef command that would work on a data
+    #file
+    badAction = {
+      'id' : 'tttt',
+      'kind' : 'INSTALL_AND_CONFIG_ACTION',
+      'workDirComponent' : 'abc-hdfs',
+      'file' : configFile,
+      'clusterDefinitionRevision' : 12,
+      'command' : ['/bin/ls',"/foo/bar/badPath1234"]
+    }
+    path=getFilePath(action,path)
+    goodAction = {
+      'id' : 'tttt',
+      'kind' : 'INSTALL_AND_CONFIG_ACTION',
+      'workDirComponent' : 'abc-hdfs',
+      'file' : configFile,
+      'clusterDefinitionRevision' : 12,
+      'command' : ['/bin/ls',path]
+    }
+    actionQueue.start()
+    response = {'actions' : [badAction,goodAction]}
+    actionQueue.maxRetries = 2
+    actionQueue.sleepInterval = 1
+    result = actionQueue.put(response)
+    time.sleep(5)
+    actionQueue.stop()
+    actionQueue.join()
+    results = actionQueue.result()
+    self.assertEqual(len(results), 2, 'Number of results is not 2.')
+    result = results[0]
+    maxretries = config.get('command', 'maxretries')
+    self.assertEqual(int(result['retryActionCount']), 
+                     int(maxretries),
+                     "Number of retries is %d and not %d" % 
+                     (int(result['retryActionCount']), int(str(maxretries))))
+    result = results[1]
+    self.assertEqual(int(result['retryActionCount']), 
+                     1,
+                     "Number of retries is %d and not %d" % 
+                     (int(result['retryActionCount']), 1))        

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1221997&r1=1221996&r2=1221997&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java Thu Dec 22 02:22:06 2011
@@ -61,10 +61,10 @@ public class HeartbeatHandler {
   private static Log LOG = LogFactory.getLog(HeartbeatHandler.class);
   private final Clusters clusters;
   private final Nodes nodes;
-  private StateMachineInvokerInterface stateMachineInvoker;
-  private FSMDriverInterface driver;
+  private final StateMachineInvokerInterface stateMachineInvoker;
+  private final FSMDriverInterface driver;
   
-  static final String DEFAULT_USER = "hdfs"; //TBD: this needs to come from the stack definition or something
+  static final String DEFAULT_USER = "hdfs"; //TODO: this needs to come from the stack definition or something (AMBARI-169)
     
   @Inject
   HeartbeatHandler(Clusters clusters, Nodes nodes, 
@@ -192,6 +192,8 @@ public class HeartbeatHandler {
     return createResponse(responseId,allActions,heartbeat);
   }
   
+  //TODO: this should be moved to the ClusterImpl (a dependency graph 
+  //should be created there)
   private boolean dependentComponentsActive(ComponentPlugin plugin, 
       ClusterFSM cluster) throws IOException {
     String[] dependents = plugin.getRequiredComponents();
@@ -216,7 +218,7 @@ public class HeartbeatHandler {
       List<Action> allActions, HeartBeat heartbeat) {
     ControllerResponse r = new ControllerResponse();
     r.setResponseId(responseId);
-    if (allActions.size() > 0) {//TODO: REMOVE THIS
+    if (allActions.size() > 0) {//TODO: REMOVE THIS (AMBARI-158)
       Action a = new Action();
       a.setKind(Kind.NO_OP_ACTION);
       allActions.add(a);
@@ -260,7 +262,7 @@ public class HeartbeatHandler {
       List<Action> allActions) {
     ConfigFile file = new ConfigFile();
     file.setData(script);
-    file.setOwner(DEFAULT_USER);
+    file.setOwner(DEFAULT_USER); //TODO (AMBARI-169)
     
     Action action = new Action();
     action.setFile(file);
@@ -468,7 +470,7 @@ public class HeartbeatHandler {
     action.setClusterDefinitionRevision(clusterDefRev);
     action.setComponent(component);
     action.setRole(role);
-    action.setUser(DEFAULT_USER);
+    action.setUser(DEFAULT_USER); //TODO (AMBARI-169)
     action.setCleanUpCommand(new Command("foobar","",new String[]{"foobar"}));//TODO: this needs fixing at some point
     String workDir = role.equals(component + "-client") ? 
         (clusterId + "-client") : (clusterId + "-" + role);
@@ -481,4 +483,9 @@ public class HeartbeatHandler {
     fillActionDetails(action, clusterId, clusterDefRev, component, role);
     addAction(action, allActions);
   }
+  
+  private class ActionTracker {
+    //tracks all actions based on agent hostnames. When the agent returns a response 
+    //note all the failed actionIDs and resend them
+  }
 }