You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2013/03/01 23:36:23 UTC

svn commit: r1451746 - in /incubator/ambari/trunk: ./ ambari-agent/src/main/python/ambari_agent/ ambari-agent/src/test/python/ ambari-agent/src/test/python/dummy_files/

Author: swagle
Date: Fri Mar  1 22:36:22 2013
New Revision: 1451746

URL: http://svn.apache.org/r1451746
Log:
AMBARI-1541. Upgrade task support in agent. (Sumit Mohanty via swagle)

Added:
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/StackVersionsFileHandler.py
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
    incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py
    incubator/ambari/trunk/ambari-agent/src/test/python/TestUpgradeExecutor.py
    incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/
    incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack
Removed:
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/puppetExecutor.py
Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/ambari-agent/src/test/python/TestActionQueue.py
    incubator/ambari/trunk/ambari-agent/src/test/python/TestController.py
    incubator/ambari/trunk/ambari-agent/src/test/python/TestPuppetExecutor.py
    incubator/ambari/trunk/ambari-agent/src/test/python/TestPuppetExecutorManually.py

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1451746&r1=1451745&r2=1451746&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Fri Mar  1 22:36:22 2013
@@ -12,6 +12,8 @@ Trunk (unreleased changes):
 
  NEW FEATURES
 
+ AMBARI-1541. Upgrade task support in agent. (Sumit Mohanty via swagle)
+
  AMBARI-1540. Reassign Master Wizard - Steps 3 and 4 (reconfigure
  component and review). (yusaku)
 

Added: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py?rev=1451746&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py (added)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py Fri Mar  1 22:36:22 2013
@@ -0,0 +1,248 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import json
+import os.path
+import logging
+import subprocess
+from manifestGenerator import generateManifest
+from RepoInstaller import RepoInstaller
+import pprint, threading
+from Grep import Grep
+from threading import Thread
+import shell
+import traceback
+
+logger = logging.getLogger()
+
+class PuppetExecutor:
+
+  """ Class that executes the commands that come from the server using puppet.
+  This is the class that provides the pluggable point for executing the puppet"""
+
+  # How many seconds will pass before running puppet is terminated on timeout
+  PUPPET_TIMEOUT_SECONDS = 600
+  grep = Grep()
+  event = threading.Event()
+  last_puppet_has_been_killed = False
+
+  NO_ERROR = "none"
+
+  def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config):
+    self.puppetModule = puppetModule
+    self.puppetInstall = puppetInstall
+    self.facterInstall = facterInstall
+    self.tmpDir = tmpDir
+    self.reposInstalled = False
+    self.config = config
+    self.modulesdir = self.puppetModule + "/modules"
+
+  def configureEnviron(self, environ):
+    if not self.config.has_option("puppet", "ruby_home"):
+      return environ
+    ruby_home = self.config.get("puppet", "ruby_home")
+    if os.path.exists(ruby_home):
+      """Only update ruby home if the config is configured"""
+      path = os.environ["PATH"]
+      if not ruby_home in path:
+        environ["PATH"] = ruby_home + os.path.sep + "bin"  + ":"+environ["PATH"] 
+      environ["MY_RUBY_HOME"] = ruby_home
+    return environ
+    
+  def getPuppetBinary(self):
+    puppetbin = os.path.join(self.puppetInstall, "bin", "puppet") 
+    if os.path.exists(puppetbin):
+      return puppetbin
+    else:
+      logger.info("Using default puppet on the host : " + puppetbin 
+                  + " does not exist.")
+      return "puppet"
+     
+  def deployRepos(self, command, tmpDir, modulesdir, taskId):
+    # Hack to only create the repo files once
+    result = []
+    if not self.reposInstalled:
+      repoInstaller = RepoInstaller(command, tmpDir, modulesdir, taskId, self.config)
+      result = repoInstaller.installRepos()
+    return result
+  
+  def puppetCommand(self, sitepp):
+    modules = self.puppetModule
+    puppetcommand = [self.getPuppetBinary(), "apply", "--confdir=" + modules, "--detailed-exitcodes", sitepp]
+    return puppetcommand
+  
+  def facterLib(self):
+    return self.facterInstall + "/lib/"
+    pass
+  
+  def puppetLib(self):
+    return self.puppetInstall + "/lib"
+    pass
+
+  def condenseOutput(self, stdout, stderr, retcode):
+    grep = self.grep
+    if stderr == self.NO_ERROR:
+      result = grep.tail(stdout, grep.OUTPUT_LAST_LINES)
+    else:
+      result = grep.grep(stdout, "fail", grep.ERROR_LAST_LINES_BEFORE, grep.ERROR_LAST_LINES_AFTER)
+      if result is None: # Second try
+       result = grep.grep(stdout, "err", grep.ERROR_LAST_LINES_BEFORE, grep.ERROR_LAST_LINES_AFTER)
+    filteredresult = grep.filterMarkup(result)
+    return filteredresult
+
+  def isSuccessfull(self, returncode):
+    return not self.last_puppet_has_been_killed and (returncode == 0 or returncode == 2)
+
+  def just_run_one_file(self, command, file, tmpout, tmperr):
+    result = {}
+    taskId = 0
+    if command.has_key("taskId"):
+      taskId = command['taskId']
+    #Install repos
+    self.deployRepos(command, self.tmpDir, self.modulesdir, command.taskId)
+    puppetEnv = os.environ
+    self.runPuppetFile(file, result, puppetEnv, tmpout, tmperr)
+    if self.isSuccessfull(result["exitcode"]):
+      # Check if all the repos were installed or not and reset the flag
+      self.reposInstalled = True
+    return result
+
+  def runCommand(self, command, tmpoutfile, tmperrfile):
+    result = {}
+    taskId = 0
+    if command.has_key("taskId"):
+      taskId = command['taskId']
+    puppetEnv = os.environ
+    #Install repos
+    puppetFiles = self.deployRepos(command, self.tmpDir, self.modulesdir, taskId)
+    siteppFileName = os.path.join(self.tmpDir, "site-" + str(taskId) + ".pp") 
+    puppetFiles.append(siteppFileName)
+    generateManifest(command, siteppFileName, self.modulesdir, self.config)
+    #Run all puppet commands, from manifest generator and for repos installation
+    #Appending outputs and errors, exitcode - maximal from all
+    for puppetFile in puppetFiles:
+      self.runPuppetFile(puppetFile, result, puppetEnv, tmpoutfile, tmperrfile)
+      # Check if one of the puppet command fails and error out
+      if not self.isSuccessfull(result["exitcode"]):
+        break
+
+    if self.isSuccessfull(result["exitcode"]):
+      # Check if all the repos were installed or not and reset the flag
+      self.reposInstalled = True
+      
+    logger.info("ExitCode : "  + str(result["exitcode"]))
+    return result
+
+  def runPuppetFile(self, puppetFile, result, puppetEnv, tmpoutfile, tmperrfile):
+    """ Run the command and make sure the output gets propagated"""
+    puppetcommand = self.puppetCommand(puppetFile)
+    rubyLib = ""
+    if os.environ.has_key("RUBYLIB"):
+      rubyLib = os.environ["RUBYLIB"]
+      logger.info("RUBYLIB from Env " + rubyLib)
+    if not (self.facterLib() in rubyLib):
+      rubyLib = rubyLib + ":" + self.facterLib()
+    if not (self.puppetLib() in rubyLib):
+      rubyLib = rubyLib + ":" + self.puppetLib()
+    tmpout =  open(tmpoutfile, 'w')
+    tmperr =  open(tmperrfile, 'w')
+    puppetEnv["RUBYLIB"] = rubyLib
+    puppetEnv = self.configureEnviron(puppetEnv)
+    logger.info("Setting RUBYLIB as: " + rubyLib)
+    logger.info("Running command " + pprint.pformat(puppetcommand))
+    puppet = self.lauch_puppet_subprocess(puppetcommand,tmpout, tmperr, puppetEnv)
+    logger.info("Launching watchdog thread")
+    self.event.clear()
+    self.last_puppet_has_been_killed = False
+    thread = Thread(target =  self.puppet_watchdog_func, args = (puppet, ))
+    thread.start()
+    # Waiting for process to finished or killed
+    puppet.communicate()
+    self.event.set()
+    thread.join()
+    # Building results
+    error = self.NO_ERROR
+    returncode = 0
+    if not self.isSuccessfull(puppet.returncode):
+      returncode = puppet.returncode
+      error = open(tmperrfile, 'r').read()
+      logging.error("Error running puppet: \n" + str(error))
+      pass
+    if self.last_puppet_has_been_killed:
+      error = str(error) + "\n Puppet has been killed due to timeout"
+      returncode = 999
+    if result.has_key("stderr"):
+      result["stderr"] = result["stderr"] + os.linesep + str(error)
+    else:
+      result["stderr"] = str(error)
+    puppetOutput = open(tmpoutfile, 'r').read()
+    logger.info("Output from puppet :\n" + puppetOutput)
+    logger.info("Puppet exit code is " + str(returncode))
+    if result.has_key("exitcode"):
+      result["exitcode"] = max(returncode, result["exitcode"])
+    else:
+      result["exitcode"] = returncode
+    condensed = self.condenseOutput(puppetOutput, error, returncode)
+    if result.has_key("stdout"):
+      result["stdout"] = result["stdout"] + os.linesep + str(condensed)
+    else:
+      result["stdout"] = str(condensed)
+    return result
+
+  def lauch_puppet_subprocess(self, puppetcommand, tmpout, tmperr, puppetEnv):
+    """
+    Creates subprocess with given parameters. This functionality was moved to separate method
+    to make possible unit testing
+    """
+    return subprocess.Popen(puppetcommand,
+      stdout=tmpout,
+      stderr=tmperr,
+      env=puppetEnv)
+
+  def puppet_watchdog_func(self, puppet):
+    self.event.wait(self.PUPPET_TIMEOUT_SECONDS)
+    if puppet.returncode is None:
+      logger.error("Task timed out and will be killed")
+      self.runShellKillPgrp(puppet)
+      self.last_puppet_has_been_killed = True
+    pass
+
+  def runShellKillPgrp(self, puppet):
+    shell.killprocessgrp(puppet.pid)
+
+def main():
+  logging.basicConfig(level=logging.DEBUG)    
+  #test code
+  jsonFile = open('test.json', 'r')
+  jsonStr = jsonFile.read() 
+  # Below is for testing only.
+  
+  puppetInstance = PuppetExecutor("/home/centos/ambari_repo_info/ambari-agent/src/main/puppet/",
+                                  "/usr/",
+                                  "/root/workspace/puppet-install/facter-1.6.10/",
+                                  "/tmp")
+  jsonFile = open('test.json', 'r')
+  jsonStr = jsonFile.read() 
+  parsedJson = json.loads(jsonStr)
+  result = puppetInstance.runCommand(parsedJson, '/tmp/out.txt', '/tmp/err.txt')
+  logger.debug(result)
+  
+if __name__ == '__main__':
+  main()
+

Added: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py?rev=1451746&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py (added)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py Fri Mar  1 22:36:22 2013
@@ -0,0 +1,43 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import os.path
+import logging
+import subprocess
+import pprint, threading
+from Grep import Grep
+from threading import Thread
+import shell
+import traceback
+
+logger = logging.getLogger()
+
+class PythonExecutor:
+
+  def __init__(self):
+    pass
+
+  def run_file(self, name, stdout, stderr):
+    """
+    Executes the file specified in a separate subprocess.
+    Method returns only when the subprocess is finished or timeout is exceeded
+    """
+    # TODO: implement
+    logger.warn("TODO: Python file execution is not supported yet")
+    pass

Added: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/StackVersionsFileHandler.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/StackVersionsFileHandler.py?rev=1451746&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/StackVersionsFileHandler.py (added)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/StackVersionsFileHandler.py Fri Mar  1 22:36:22 2013
@@ -0,0 +1,114 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os.path
+import logging
+import subprocess
+import pprint, threading
+from Grep import Grep
+from threading import Thread
+import shell
+import traceback
+import shutil
+
+logger = logging.getLogger()
+
+class StackVersionsFileHandler:
+
+  VER_FILE = "current-stack"
+  DEFAULT_VER = ""
+
+  def __init__(self, versionsFileDir):
+    self.versionsFileDir = versionsFileDir
+    self.versionsFilePath = os.path.join(versionsFileDir, self.VER_FILE)
+    self._lock = threading.RLock()
+
+  def read_stack_version(self, component):
+    try :
+      self.touch_file()
+      for line in open(self.versionsFilePath):
+        comp, ver = self.extract(line)
+        if comp == component:
+          return ver
+      return self.DEFAULT_VER
+    except Exception, err:
+      logger.error("Can't read versions file: %s " % err.message)
+      traceback.print_exc()
+      return self.DEFAULT_VER
+
+
+  def read_all_stack_versions(self):
+    result = {}
+    try :
+      self.touch_file()
+      for line in open(self.versionsFilePath):
+        comp, ver = self.extract(line)
+        if comp != self.DEFAULT_VER:
+          result[comp] = ver
+      return result
+    except Exception, err:
+      logger.error("Can't read stack versions file: %s " % err.message)
+      traceback.print_exc()
+      return {}
+
+
+  def write_stack_version(self, component, newVersion):
+    self._lock.acquire()
+    try:
+      values = self.read_all_stack_versions()
+      values[component] = newVersion
+      logger.info("Backing up old stack versions file")
+      backup = os.path.join(self.versionsFileDir, self.VER_FILE + ".bak")
+      shutil.move(self.versionsFilePath, backup)
+      logger.info("Writing new stack versions file")
+      with open (self.versionsFilePath, 'w') as f:
+        for key in values:
+          f.write ("%s\t%s\n" % (key, values))
+
+    except Exception, err:
+      logger.error("Can't write new stack version (%s %s) :%s " % (component,
+            newVersion, err.message))
+      traceback.print_exc()
+    finally:
+      self._lock.release()
+
+
+  def extract(self, statement):
+    '''
+    Extracts <Component>, <HDPstack version> values from lines like
+    NAGIOS	StackVersion-1.3.0
+    '''
+    parts = statement.strip().split()
+    if len(parts) != 2:
+      logger.warn("Wrong stack versions file statement format: %s" % statement)
+      return self.DEFAULT_VER, self.DEFAULT_VER
+    else:
+      return parts[0], parts[1]
+
+
+  def touch_file(self):
+    '''
+     Called to create file when it does not exist
+    '''
+    if not os.path.isfile(self.versionsFilePath):
+      logger.info("Creating stacks versions file at %s" % self.versionsFilePath)
+      open(self.versionsFilePath, 'w').close()
+
+

Added: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py?rev=1451746&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py (added)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py Fri Mar  1 22:36:22 2013
@@ -0,0 +1,202 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import json
+import os.path
+import logging
+import subprocess
+from manifestGenerator import generateManifest
+from RepoInstaller import RepoInstaller
+import pprint, threading
+from Grep import Grep
+from threading import Thread
+import shell
+import traceback
+from Grep import Grep
+from StackVersionsFileHandler import StackVersionsFileHandler
+import re
+
+logger = logging.getLogger()
+grep = Grep()
+
+class UpgradeExecutor:
+
+  """ Class that performs the StackVersion stack upgrade"""
+
+  SCRIPT_DIRS = [
+    'pre-upgrade.d',
+    'upgrade.d',
+    'post-upgrade.d'
+  ]
+
+  NAME_PARSING_FAILED_CODE = 999
+
+  def __init__(self, pythonExecutor, puppetExecutor, config):
+    self.pythonExecutor = pythonExecutor
+    self.puppetExecutor = puppetExecutor
+    self.stacksDir = config.get('stack', 'upgradeScriptsDir')
+    self.config = config
+    versionsFileDir = config.get('agent', 'prefix')
+    self.versionsHandler = StackVersionsFileHandler(versionsFileDir)
+
+
+  def perform_stack_upgrade(self, command, tmpout, tmperr):
+    logger.info("Performing stack upgrade")
+    params = command['commandParams']
+    srcStack = params['source_stack_version']
+    tgtStack = params['target_stack_version']
+    component = command['component']
+
+    srcStackTuple = self.split_stack_version(srcStack)
+    tgtStackTuple = self.split_stack_version(srcStack)
+
+    if srcStackTuple == None or tgtStackTuple == None:
+      errorstr = "Source (%s) or target (%s) version does not match pattern \
+      <Name>-<Version>" % (srcStack, tgtStack)
+      logger.info(errorstr)
+      result = {
+        'exitcode' : 1,
+        'stdout'   : 'None',
+        'stderr'   : errorstr
+      }
+    elif srcStack != tgtStack:
+      paramTuple = sum((srcStackTuple, tgtStackTuple), ())
+      upgradeId = "%s-%s.%s_%s-%s.%s" % paramTuple
+      # Check stack version (do we need upgrade?)
+      basedir = os.path.join(self.stacksDir, upgradeId, component)
+      if not os.path.isdir(basedir):
+        errorstr = "Upgrade %s is not supported" % upgradeId
+        logger.error(errorstr)
+        result = {
+          'exitcode' : 1,
+          'stdout'   : errorstr,
+          'stderr'   : errorstr
+        }
+      else:
+        result = {
+          'exitcode' : 0,
+          'stdout'   : '',
+          'stderr'   : ''
+        }
+        for dir in self.SCRIPT_DIRS:
+          if result['exitcode'] != 0:
+            break
+          tmpRes = self.execute_dir(command, basedir, dir, tmpout, tmperr)
+
+          result = {
+            'exitcode' : result['exitcode'] or tmpRes['exitcode'],
+            'stdout'   : "%s\n%s" % (result['stdout'], tmpRes['stdout']),
+            'stderr'   : "%s\n%s" % (result['stderr'], tmpRes['stderr']),
+          }
+
+        if result['exitcode'] == 0:
+          logger.info("Upgrade %s successfully finished" % upgradeId)
+          self.versionsHandler.write_stack_version(component, tgtStack)
+    else:
+      infostr = "target_stack_version (%s) matches current stack version" \
+          " for component %s, nothing to do" % (tgtStack, component)
+      logger.info(infostr)
+      result = {
+        'exitcode' : 0,
+        'stdout'   : infostr,
+        'stderr'   : 'None'
+      }
+    result = {
+      'exitcode' : result['exitcode'],
+      'stdout'   : grep.tail(result['stdout'], grep.OUTPUT_LAST_LINES),
+      'stderr'   : grep.tail(result['stderr'], grep.OUTPUT_LAST_LINES)
+    }
+    return result
+
+
+  def get_key_func(self, name):
+    """
+    Returns a number from filenames like 70-foobar.* or 999 for not matching
+    filenames
+    """
+    parts = name.split('-', 1)
+    if not parts or not parts[0].isdigit():
+      logger.warn("Can't parse script filename number %s" % name)
+      return self.NAME_PARSING_FAILED_CODE # unknown element will be placed to the end of list
+    return int(parts[0])
+
+
+  def split_stack_version(self, verstr):
+    matchObj = re.match( r'^(.*)-(\d+).(\d+)', verstr.strip(), re.M|re.I)
+    stack_name = matchObj.group(1)
+    stack_major_ver = matchObj.group(2)
+    stack_minor_ver = matchObj.group(3)
+    if matchObj:
+      return stack_name, stack_major_ver, stack_minor_ver
+    else:
+      return None
+
+
+  def execute_dir(self, command, basedir, dir, tmpout, tmperr):
+    """
+    Executes *.py and *.pp files located in a given directory.
+    Files a executed in a numeric sorting order.
+    """
+    dirpath = os.path.join(basedir, dir)
+    logger.info("Executing %s" % dirpath)
+    if not os.path.isdir(dirpath):
+      logger.warn("Script directory %s does not exist, skipping")
+      return
+    fileList=os.listdir(dirpath)
+    fileList.sort(key = self.get_key_func)
+    formattedResult = {
+      'exitcode' : 0,
+      'stdout'   : '',
+      'stderr'   : ''
+    }
+    for filename in fileList:
+      prevcode = formattedResult['exitcode']
+      if prevcode != 0 or self.get_key_func(filename) == self.NAME_PARSING_FAILED_CODE:
+        break
+      filepath = os.path.join(dirpath, filename)
+      if filename.endswith(".pp"):
+        logger.info("Running puppet file %s" % filepath)
+        result = self.puppetExecutor.just_run_one_file(command, filename,
+                                                                tmpout, tmperr)
+      elif filename.endswith(".py"):
+        logger.info("Running python file %s" % filepath)
+        result = self.pythonExecutor.run_file(filepath, tmpout, tmperr)
+      elif filename.endswith(".pyc"):
+        pass # skipping compiled files
+      else:
+        warnstr = "Unrecognized file type, skipping: %s" % filepath
+        logger.warn(warnstr)
+        result = {
+          'exitcode' : 0,
+          'stdout'   : warnstr,
+          'stderr'   : 'None'
+        }
+      formattedResult = {
+        'exitcode' : prevcode or result['exitcode'],
+        'stdout'   : "%s\n%s" % (formattedResult['stdout'], result['stdout']),
+        'stderr'   : "%s\n%s" % (formattedResult['stderr'], result['stderr']),
+      }
+    logger.debug("Result of %s: \n %s" % (dirpath, pprint.pformat(formattedResult)))
+    return formattedResult
+
+
+
+
+
+

Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestActionQueue.py?rev=1451746&r1=1451745&r2=1451746&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestActionQueue.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestActionQueue.py Fri Mar  1 22:36:22 2013
@@ -22,9 +22,9 @@ from unittest import TestCase
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.FileUtil import getFilePath
+from ambari_agent.UpgradeExecutor import UpgradeExecutor
 import os, errno, time, pprint, tempfile, threading
-
-event = threading.Event()
+from mock.mock import patch, MagicMock, call
 
 class TestActionQueue(TestCase):
   def test_ActionQueueStartStop(self):
@@ -48,7 +48,7 @@ class TestActionQueue(TestCase):
     actionQueue.IDLE_SLEEP_TIME = 0.01
     executor_started_event = threading.Event()
     end_executor_event = threading.Event()
-    actionQueue.executor = FakeExecutor(executor_started_event, end_executor_event)
+    actionQueue.puppetExecutor = FakeExecutor(executor_started_event, end_executor_event)
     before_start_result = actionQueue.result()
 
     command = {
@@ -99,6 +99,85 @@ class TestActionQueue(TestCase):
     #print("in_progress: " + pprint.pformat(in_progress_result))
     #print("after: " + pprint.pformat(after_start_result))
 
+  @patch.object(ActionQueue, "executeCommand")
+  @patch.object(ActionQueue, "stopped")
+  def test_upgradeCommand_dispatching(self, stopped_method, executeCommand_method):
+    queue = ActionQueue(config = MagicMock())
+    command = {
+      'commandId': 17,
+      'role' : "role",
+      'taskId' : "taskId",
+      'clusterName' : "clusterName",
+      'serviceName' : "serviceName",
+      'roleCommand' : 'UPGRADE',
+      'hostname' : "localhost.localdomain",
+      'hostLevelParams': "hostLevelParams",
+      'clusterHostInfo': "clusterHostInfo",
+      'configurations': "configurations",
+      'commandType': "EXECUTION_COMMAND",
+      'configurations':{'global' : {}},
+      'roleParams': {},
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.2.1',
+        'target_stack_version' : 'HDP-1.3.0'
+      }
+    }
+    result = [{
+      'exitcode' : 0,
+      'stdout'   : 'abc',
+      'stderr'   : 'def'
+    }]
+    executeCommand_method.return_value = result
+    stopped_method.side_effect = [False, False, True, True, True]
+    queue.stopped = stopped_method
+    queue.IDLE_SLEEP_TIME = 0.001
+    queue.put(command)
+    queue.run()
+    self.assertTrue(executeCommand_method.called)
+    self.assertEquals(queue.resultQueue.qsize(), 1)
+    returned_result = queue.resultQueue.get()
+    self.assertIs(returned_result[1], result[0])
+
+
+  @patch.object(UpgradeExecutor, "perform_stack_upgrade")
+  def test_upgradeCommand_executeCommand(self, perform_stack_upgrade_method):
+    queue = ActionQueue(config = MagicMock())
+    command = {
+      'commandId': 17,
+      'role' : "role",
+      'taskId' : "taskId",
+      'clusterName' : "clusterName",
+      'serviceName' : "serviceName",
+      'roleCommand' : 'UPGRADE',
+      'hostname' : "localhost.localdomain",
+      'hostLevelParams': "hostLevelParams",
+      'clusterHostInfo': "clusterHostInfo",
+      'configurations': "configurations",
+      'commandType': "EXECUTION_COMMAND",
+      'configurations':{'global' : {}},
+      'roleParams': {},
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.2.1',
+        'target_stack_version' : 'HDP-1.3.0'
+      }
+    }
+    perform_stack_upgrade_method.return_value = {
+      'exitcode' : 0,
+      'stdout'   : 'abc',
+      'stderr'   : 'def'
+    }
+    result = queue.executeCommand(command)
+    expected_result = [{'actionId': 17,
+                        'clusterName': 'clusterName',
+                        'exitCode': 0,
+                        'role': 'role',
+                        'serviceName': 'serviceName',
+                        'status': 'COMPLETED',
+                        'stderr': 'def',
+                        'stdout': 'abc',
+                        'taskId': 'taskId'}]
+    self.assertEquals(result, expected_result)
+
 
 class FakeExecutor():
 

Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestController.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestController.py?rev=1451746&r1=1451745&r2=1451746&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestController.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestController.py Fri Mar  1 22:36:22 2013
@@ -22,6 +22,7 @@ limitations under the License.
 import StringIO
 import unittest
 from ambari_agent import Controller
+from ambari_agent import hostname
 import sys
 from mock.mock import patch, MagicMock, call
 
@@ -30,19 +31,22 @@ class TestController(unittest.TestCase):
 
   @patch("threading.Thread")
   @patch("threading.Lock")
-  @patch("socket.gethostname")
   @patch.object(Controller, "NetUtil")
-  def setUp(self, NetUtil_mock, hostnameMock, lockMock, threadMock):
+  @patch.object(hostname, "hostname")
+  def setUp(self, hostname_method, NetUtil_mock, lockMock, threadMock):
 
     Controller.logger = MagicMock()
-    hostnameMock.return_value = "test_hostname"
     lockMock.return_value = MagicMock()
     NetUtil_mock.return_value = MagicMock()
+    hostname_method.return_value = "test_hostname"
+
 
     config = MagicMock()
     config.get.return_value = "something"
 
     self.controller = Controller.Controller(config)
+    self.controller.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC = 0.1
+    self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 0.1
 
 
   @patch.object(Controller, "Heartbeat")
@@ -324,9 +328,7 @@ class TestController(unittest.TestCase):
     self.controller.sendRequest = Controller.Controller.sendRequest
     self.controller.sendRequest = Controller.Controller.addToQueue
 
-
 if __name__ == "__main__":
-
   unittest.main(verbosity=2)
 
 

Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestPuppetExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestPuppetExecutor.py?rev=1451746&r1=1451745&r2=1451746&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestPuppetExecutor.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestPuppetExecutor.py Fri Mar  1 22:36:22 2013
@@ -19,7 +19,7 @@ limitations under the License.
 '''
 
 from unittest import TestCase
-from puppetExecutor import puppetExecutor
+from PuppetExecutor import PuppetExecutor
 from Grep import Grep
 from pprint import pformat
 import socket, threading, tempfile
@@ -28,13 +28,11 @@ import sys
 from AmbariConfig import AmbariConfig
 from threading import Thread
 
-grep = Grep()
-
 class TestPuppetExecutor(TestCase):
 
 
   def test_build(self):
-    puppetexecutor = puppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
+    puppetexecutor = PuppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
     command = puppetexecutor.puppetCommand("site.pp")
     self.assertEquals("puppet", command[0], "puppet binary wrong")
     self.assertEquals("apply", command[1], "local apply called")
@@ -43,9 +41,11 @@ class TestPuppetExecutor(TestCase):
     correct")
 
   def test_condense_bad2(self):
-    puppetexecutor = puppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
-    puppetexecutor.ERROR_LAST_LINES_BEFORE = 2
-    puppetexecutor.ERROR_LAST_LINES_AFTER = 3
+    puppetexecutor = PuppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
+    grep = Grep()
+    puppetexecutor.grep = grep
+    grep.ERROR_LAST_LINES_BEFORE = 2
+    grep.ERROR_LAST_LINES_AFTER = 3
     string_err = open('dummy_puppet_output_error2.txt', 'r').read().replace("\n", os.linesep)
     result = puppetexecutor.condenseOutput(string_err, '', 1)
     stripped_string = string_err.strip()
@@ -58,7 +58,9 @@ class TestPuppetExecutor(TestCase):
     self.assertEquals(len(result.splitlines(True)), 6, "Failed to condence fail log")
 
   def test_condense_bad3(self):
-    puppetexecutor = puppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
+    puppetexecutor = PuppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
+    grep = Grep()
+    puppetexecutor.grep = grep
     string_err = open('dummy_puppet_output_error3.txt', 'r').read().replace("\n", os.linesep)
     result = puppetexecutor.condenseOutput(string_err, '', 1)
     stripped_string = string_err.strip()
@@ -72,10 +74,12 @@ class TestPuppetExecutor(TestCase):
     self.assertEquals(len(result.splitlines(True)), 33, "Failed to condence fail log")
 
   def test_condense_good(self):
-    puppetexecutor = puppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
-    puppetexecutor.OUTPUT_LAST_LINES = 2
+    puppetexecutor = PuppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
+    grep = Grep()
+    puppetexecutor.grep = grep
+    grep.OUTPUT_LAST_LINES = 2
     string_good = open('dummy_puppet_output_good.txt', 'r').read().replace("\n", os.linesep)
-    result = puppetexecutor.condenseOutput(string_good, puppetExecutor.NO_ERROR, 0)
+    result = puppetexecutor.condenseOutput(string_good, PuppetExecutor.NO_ERROR, 0)
     stripped_string = string_good.strip()
     lines = stripped_string.splitlines(True)
     result_check = lines[45].strip() in result and lines[46].strip() in result
@@ -129,13 +133,13 @@ class TestPuppetExecutor(TestCase):
     self.assertEquals(subproc_mock.returncode, 0, "Subprocess should not be terminated before timeout")
 
 
-  class  PuppetExecutor_mock(puppetExecutor):
+  class  PuppetExecutor_mock(PuppetExecutor):
 
 
 
     def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config, subprocess_mockup):
       self.subprocess_mockup = subprocess_mockup
-      puppetExecutor.__init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config)
+      PuppetExecutor.__init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config)
       pass
 
     def lauch_puppet_subprocess(self, puppetcommand, tmpout, tmperr, puppetEnv):

Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestPuppetExecutorManually.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestPuppetExecutorManually.py?rev=1451746&r1=1451745&r2=1451746&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestPuppetExecutorManually.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestPuppetExecutorManually.py Fri Mar  1 22:36:22 2013
@@ -19,7 +19,7 @@ limitations under the License.
 '''
 
 from unittest import TestCase
-from puppetExecutor import puppetExecutor
+from PuppetExecutor import PuppetExecutor
 from pprint import pformat
 import socket
 import os
@@ -43,7 +43,7 @@ class TestPuppetExecutor(TestCase):
 
     logger.info("***** RUNNING " + FILEPATH + " *****")
     cwd = os.getcwd()
-    puppetexecutor = puppetExecutor(cwd, "/x", "/y", "/tmp", AmbariConfig().getConfig())
+    puppetexecutor = PuppetExecutor(cwd, "/x", "/y", "/tmp", AmbariConfig().getConfig())
     result = {}
     puppetEnv = os.environ
     _, tmpoutfile = tempfile.mkstemp()

Added: incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py?rev=1451746&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py (added)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py Fri Mar  1 22:36:22 2013
@@ -0,0 +1,84 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import unittest
+import StringIO
+import socket
+import os, sys
+from mock.mock import patch
+from mock.mock import MagicMock
+from mock.mock import create_autospec
+import os, errno, tempfile
+from ambari_agent import StackVersionsFileHandler
+import logging
+
+stackVersionsFileHandler = \
+      StackVersionsFileHandler.StackVersionsFileHandler("/tmp")
+dummyVersionsFile = os.path.join('dummy_files', 'dummy_current_stack')
+
+class TestStackVersionsFileHandler(TestCase):
+
+  logger = logging.getLogger()
+
+  @patch.object(stackVersionsFileHandler, 'touch_file')
+  def test_read_stack_version(self, touch_method):
+    stackVersionsFileHandler.versionsFilePath = dummyVersionsFile
+    result = stackVersionsFileHandler.read_stack_version("NAGIOS")
+    self.assertEquals(result, "HDP-1.2.1")
+    result = stackVersionsFileHandler.read_stack_version("GANGLIA")
+    self.assertEquals(result, "HDP-1.2.2")
+    result = stackVersionsFileHandler.read_stack_version("NOTEXISTING")
+    self.assertEquals(result, stackVersionsFileHandler.DEFAULT_VER)
+    self.assertTrue(touch_method.called)
+
+
+  @patch.object(stackVersionsFileHandler, 'touch_file')
+  def test_read_all_stack_versions(self, touch_method):
+    stackVersionsFileHandler.versionsFilePath = dummyVersionsFile
+    result = stackVersionsFileHandler.read_all_stack_versions()
+    self.assertEquals(len(result.keys()), 4)
+    self.assertEquals(result["NAGIOS"], "HDP-1.2.1")
+    self.assertEquals(result["HBASE"], "HDP-1.3.0")
+    self.assertTrue(touch_method.called)
+
+
+  def test_extract(self):
+    s = "   NAGIOS	\t  HDP-1.3.0  "
+    comp, ver = stackVersionsFileHandler.extract(s)
+    self.assertEqual(comp, "NAGIOS")
+    self.assertEqual(ver, "HDP-1.3.0")
+    # testing wrong value
+    s = "   NAGIOS	"
+    comp, ver = stackVersionsFileHandler.extract(s)
+    self.assertEqual(comp, stackVersionsFileHandler.DEFAULT_VER)
+    self.assertEqual(ver, stackVersionsFileHandler.DEFAULT_VER)
+
+
+  def test_touch_file(self):
+    tmpfile = tempfile.mktemp()
+    stackVersionsFileHandler.versionsFilePath = tmpfile
+    stackVersionsFileHandler.touch_file()
+    result = os.path.isfile(tmpfile)
+    self.assertEqual(result, True)
+
+
+if __name__ == "__main__":
+  unittest.main(verbosity=2)
\ No newline at end of file

Added: incubator/ambari/trunk/ambari-agent/src/test/python/TestUpgradeExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestUpgradeExecutor.py?rev=1451746&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestUpgradeExecutor.py (added)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestUpgradeExecutor.py Fri Mar  1 22:36:22 2013
@@ -0,0 +1,220 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import unittest
+import StringIO
+import socket
+import os, sys, pprint
+from mock.mock import patch
+from mock.mock import MagicMock
+from mock.mock import create_autospec
+import os, errno, tempfile
+from ambari_agent import UpgradeExecutor
+import logging
+from ambari_agent import AmbariConfig
+from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
+
+class TestUpgradeExecutor(TestCase):
+
+  logger = logging.getLogger()
+
+  @patch.object(StackVersionsFileHandler, 'write_stack_version')
+  @patch('os.path.isdir')
+  def test_perform_stack_upgrade(self, isdir_method, write_stack_version_method):
+    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
+      'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
+
+    # Checking matching versions
+    command = {
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.3.0',
+        'target_stack_version' : 'HDP-1.3.0',
+       },
+      'component' : 'HDFS'
+    }
+    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
+    self.assertIn('matches current stack version', result['stdout'])
+    self.assertFalse(write_stack_version_method.called)
+    # Checking unsupported update
+    write_stack_version_method.reset()
+    command = {
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.0.1',
+        'target_stack_version' : 'HDP-1.3.0',
+      },
+      'component' : 'HDFS'
+    }
+    isdir_method.return_value = False
+    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
+    self.assertIn('not supported', result['stderr'])
+    self.assertFalse(write_stack_version_method.called)
+    # Checking successful result
+    write_stack_version_method.reset()
+    command = {
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.0.1',
+        'target_stack_version' : 'HDP-1.3.0',
+      },
+      'component' : 'HDFS'
+    }
+    isdir_method.return_value = True
+    executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr : \
+      {
+        'exitcode' : 0,
+        'stdout'   : "output - %s" % dir,
+        'stderr'   : "errors - %s" % dir,
+      }
+    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
+    self.assertTrue(write_stack_version_method.called)
+    self.assertEquals(result['exitcode'],0)
+    self.assertEquals(result['stdout'],'output - pre-upgrade.d\noutput - upgrade.d\noutput - post-upgrade.d')
+    self.assertEquals(result['stderr'],'errors - pre-upgrade.d\nerrors - upgrade.d\nerrors - post-upgrade.d')
+    # Checking failed result
+    write_stack_version_method.reset()
+    command = {
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.0.1',
+        'target_stack_version' : 'HDP-1.3.0',
+      },
+      'component' : 'HDFS'
+    }
+    isdir_method.return_value = True
+    executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr :\
+    {
+      'exitcode' : 1,
+      'stdout'   : "output - %s" % dir,
+      'stderr'   : "errors - %s" % dir,
+      }
+    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
+    self.assertTrue(write_stack_version_method.called)
+    self.assertEquals(result['exitcode'],1)
+    self.assertEquals(result['stdout'],'output - pre-upgrade.d')
+    self.assertEquals(result['stderr'],'errors - pre-upgrade.d')
+
+
+  def test_get_key_func(self):
+    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
+                 'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
+    # Checking unparseable
+    self.assertEqual(executor.get_key_func('fdsfds'), 999)
+    self.assertEqual(executor.get_key_func('99dfsfd'), 999)
+    self.assertEqual(executor.get_key_func('-fdfds'), 999)
+    # checking parseable
+    self.assertEqual(executor.get_key_func('99'), 99)
+    self.assertEqual(executor.get_key_func('45-install'), 45)
+    self.assertEqual(executor.get_key_func('33-install-staff'), 33)
+    #checking sorting of full list
+    testlist1 = ['7-fdfd', '10-erewfds', '11-fdfdfd', '1-hh', '20-kk', '01-tt']
+    testlist1.sort(key = executor.get_key_func)
+    self.assertEqual(testlist1,
+        ['1-hh', '01-tt', '7-fdfd', '10-erewfds', '11-fdfdfd', '20-kk'])
+
+
+  def test_split_stack_version(self):
+    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
+             'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
+    result = executor.split_stack_version("HDP-1.2.1")
+    self.assertEquals(result, ('HDP', '1', '2'))
+    result = executor.split_stack_version("HDP-1.3")
+    self.assertEquals(result, ('HDP', '1', '3'))
+    result = executor.split_stack_version("ComplexStackVersion-1.3.4.2.2")
+    self.assertEquals(result, ('ComplexStackVersion', '1', '3'))
+    pass
+
+
+  @patch('os.listdir')
+  @patch('os.path.isdir')
+  @patch.object(UpgradeExecutor.UpgradeExecutor, 'get_key_func')
+  def test_execute_dir(self, get_key_func_method, isdir_method, listdir_method):
+    pythonExecutor = MagicMock()
+    puppetExecutor = MagicMock()
+
+    command = {'debug': 'command'}
+    isdir_method.return_value = True
+    # Mocking sort() method of list
+    class MyList(list):
+      pass
+    files = MyList(['first.py', 'second.pp', 'third.py', 'fourth.nm',
+             'fifth-failing.py', 'six.py'])
+    files.sort = lambda key: None
+    listdir_method.return_value = files
+    # fifth-failing.py will fail
+    pythonExecutor.run_file.side_effect = [
+      {'exitcode' : 0,
+       'stdout'   : "stdout - first.py",
+       'stderr'   : "stderr - first.py"},
+      {'exitcode' : 0,
+       'stdout'   : "stdout - third.py",
+       'stderr'   : "stderr - third.py"},
+      {'exitcode' : 1,
+       'stdout'   : "stdout - fifth-failing.py",
+       'stderr'   : "stderr - fifth-failing.py"},
+      {'exitcode' : 0,
+       'stdout'   : "stdout - six.py",
+       'stderr'   : "stderr - six.py"},
+    ]
+    puppetExecutor.just_run_one_file.side_effect = [
+      {'exitcode' : 0,
+       'stdout'   : "stdout - second.pp",
+       'stderr'   : "stderr - second.pp"},
+    ]
+
+    executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
+        puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
+
+    result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr')
+    self.assertEquals(result['exitcode'],1)
+    self.assertEquals(result['stdout'],"\nstdout - first.py\nstdout - second.pp\nstdout - third.py\nUnrecognized file type, skipping: basedir/dir/fourth.nm\nstdout - fifth-failing.py")
+    self.assertEquals(result['stderr'],"\nstderr - first.py\nstderr - second.pp\nstderr - third.py\nNone\nstderr - fifth-failing.py")
+
+
+  @patch('os.listdir')
+  @patch('os.path.isdir')
+  def test_execute_dir_ignore_badly_named(self, isdir_method, listdir_method):
+    pythonExecutor = MagicMock()
+    puppetExecutor = MagicMock()
+
+    command = {'debug': 'command'}
+    isdir_method.return_value = True
+    files = ['00-first.py', 'badly-named.pp', '10-second.pp', '20-wrong.cpp']
+    listdir_method.return_value = files
+    # fifth-failing.py will fail
+    pythonExecutor.run_file.side_effect = [
+      {'exitcode' : 0,
+       'stdout'   : "stdout - python.py",
+       'stderr'   : "stderr - python.py"},
+    ]
+    puppetExecutor.just_run_one_file.side_effect = [
+      {'exitcode' : 0,
+       'stdout'   : "stdout - puppet.pp",
+       'stderr'   : "stderr - puppet.pp"},
+    ]
+
+    executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
+        puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
+
+    result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr')
+    self.assertEquals(result['exitcode'],0)
+    self.assertEquals(result['stdout'],'\nstdout - python.py\nstdout - puppet.pp\nUnrecognized file type, skipping: basedir/dir/20-wrong.cpp')
+    self.assertEquals(result['stderr'],'\nstderr - python.py\nstderr - puppet.pp\nNone')
+
+if __name__ == "__main__":
+  unittest.main(verbosity=2)
\ No newline at end of file

Added: incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack?rev=1451746&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack (added)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack Fri Mar  1 22:36:22 2013
@@ -0,0 +1,4 @@
+HDFS    HDP-1.2.0
+NAGIOS    HDP-1.2.1
+HBASE    HDP-1.3.0
+GANGLIA  HDP-1.2.2