You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2013/10/09 20:12:39 UTC

git commit: AMBARI-3490. Remove RCO management logic at ambari-agent (dlysnichencko)

Updated Branches:
  refs/heads/trunk 78e3d4142 -> 489a193dd


AMBARI-3490. Remove RCO management logic at ambari-agent (dlysnichencko)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/489a193d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/489a193d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/489a193d

Branch: refs/heads/trunk
Commit: 489a193dd2f54215f8b6f9dc9890032ca746b301
Parents: 78e3d41
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Wed Oct 9 21:10:30 2013 +0300
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Wed Oct 9 21:10:30 2013 +0300

----------------------------------------------------------------------
 ambari-agent/pom.xml                            |   4 -
 .../ambari_agent/ActionDependencyManager.py     | 163 ------------
 .../src/main/python/ambari_agent/ActionQueue.py |  79 ++----
 .../src/main/python/ambari_agent/Heartbeat.py   |   2 +-
 .../main/python/ambari_agent/UpgradeExecutor.py | 207 ---------------
 .../test/python/TestActionDependencyManager.py  | 180 -------------
 ambari-agent/src/test/python/TestActionQueue.py | 132 ++++------
 ambari-agent/src/test/python/TestController.py  |   5 +-
 ambari-agent/src/test/python/TestHeartbeat.py   |  30 +--
 .../src/test/python/TestUpgradeExecutor.py      | 264 -------------------
 10 files changed, 86 insertions(+), 980 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml
index 0341f0d..8681420 100644
--- a/ambari-agent/pom.xml
+++ b/ambari-agent/pom.xml
@@ -328,10 +328,6 @@
                   <location>../version</location>
                   <filter>true</filter>
                 </source>
-                <!--<source>-->
-                  <!--&lt;!&ndash; This file is also included into server rpm&ndash;&gt;-->
-                  <!--<location>../ambari-common/src/main/resources/role_command_order.json</location>-->
-                <!--</source>-->
               </sources>
             </mapping>
             <!-- -->

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py b/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py
deleted file mode 100644
index 0addc9a..0000000
--- a/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py
+++ /dev/null
@@ -1,163 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-import logging
-import Queue
-import threading
-import pprint
-import os
-import json
-
-logger = logging.getLogger()
-
-class ActionDependencyManager():
-  """
-  Implemments a scheduler of role commands (like DATANODE-START) based on
-  dependencies between them. Class does not execute actions, it only
-  breaks them on groups that may be executed in parallel.
-  """
-
-  DEPS_FILE_NAME="role_command_order.json"
-  COMMENT_STR="_comment"
-
-  # Dictionary of dependencies. Format:
-  # BlockedRole-Command : [BlockerRole1-Command1, BlockerRole2-Command2, ...]
-
-
-  def __init__(self, config):
-    self.deps = {}
-    self.last_scheduled_group = []
-    self.scheduled_action_groups = Queue.Queue()
-    self.lock = threading.RLock()
-    self.config = config
-    #self.read_dependencies()
-
-
-  def read_dependencies(self):
-    """
-    Load dependencies from file
-    """
-    prefix_dir = self.config.get('agent', 'prefix')
-    action_order_file = os.path.join(prefix_dir, self.DEPS_FILE_NAME)
-    with open(action_order_file) as f:
-      action_order_data = json.load(f)
-    for deps_group in action_order_data.keys():
-      if deps_group != self.COMMENT_STR: # if entry is not a comment
-        deps_group_list = action_order_data[deps_group]
-        for blocked_str in deps_group_list:
-          if blocked_str != self.COMMENT_STR: # if entry is not a comment
-            blocker_list = deps_group_list[blocked_str]
-            if blocked_str not in self.deps:
-              self.deps[blocked_str]=[]
-            for blocker_str in blocker_list:
-              self.deps[blocked_str].append(blocker_str)
-    pass
-
-
-  def is_action_group_available(self):
-    return not self.scheduled_action_groups.empty()
-
-
-  def get_next_action_group(self):
-    """
-    Returns next group of scheduled actions that may be
-    executed in parallel. If queue is empty, blocks until
-    an item is available (until next put_action() call)
-    """
-    next_group = self.scheduled_action_groups.get(block=True)
-    with self.lock: # Synchronized
-      if next_group is self.last_scheduled_group:
-        # Group is not eligible for appending, creating new one
-        self.last_scheduled_group = []
-
-      dump_str = pprint.pformat(next_group)
-      logger.debug("Next action group: {0}".format(dump_str))
-      return next_group
-
-
-  def put_actions(self, actions):
-    """
-    Schedules actions to be executed in some time at future.
-    Here we rely on serial command execution sequence received from server.
-    Some of these commands may be executed in parallel with others, so we
-    unite them into a group.
-    """
-    with self.lock: # Synchronized
-      for action in actions:
-        self.dump_info(action)
-        was_empty = len(self.last_scheduled_group) == 0
-        if self.can_be_executed_in_parallel(action, self.last_scheduled_group):
-          self.last_scheduled_group.append(action)
-        else: # create a new group
-          self.last_scheduled_group = [action]
-          was_empty = True
-        if was_empty:
-          # last_scheduled_group is not empty now, so we add it to the queue
-          self.scheduled_action_groups.put(self.last_scheduled_group)
-
-
-  def dump_info(self, action):
-    """
-    Prints info about command to log
-    """
-    logger.info("Adding " + action['commandType'] + " for service " + \
-                action['serviceName'] + " of cluster " + \
-                action['clusterName'] + " to the queue.")
-    logger.debug(pprint.pformat(action))
-
-
-  def can_be_executed_in_parallel(self, action, group):
-    """
-    Checks whether action may be executed in parallel with a given group
-    """
-    # Hack: parallel execution disabled
-    return False
-
-    # from ActionQueue import ActionQueue
-    # # Empty group is compatible with any action
-    # if not group:
-    #   return True
-    # # Status commands are placed into a separate group to avoid race conditions
-    # if action['commandType'] == ActionQueue.STATUS_COMMAND:
-    #   for scheduled_action in group:
-    #     if scheduled_action['commandType'] != ActionQueue.STATUS_COMMAND:
-    #       return False
-    #   return True
-    # # We avoid executing install/upgrade threads in parallel with anything
-    # standalone_commands = ["INSTALL", ActionQueue.ROLE_COMMAND_UPGRADE]
-    # if action['roleCommand'] in standalone_commands:
-    #   return False
-    # # We can not perform few actions (like STOP and START) for a component
-    # # at the same time
-    # for scheduled_action in group:
-    #   if scheduled_action['role'] == action['role']:
-    #     return False
-    # # In other cases, check dependencies
-    # pattern = "{0}-{1}"
-    # new_action_str = pattern.format(action['role'], action['roleCommand'])
-    # for scheduled_action in group:
-    #   if new_action_str in self.deps:
-    #     blockers = self.deps[new_action_str]
-    #     scheduled_action_str = pattern.format(
-    #       scheduled_action['role'], scheduled_action['roleCommand'])
-    #     if scheduled_action_str in blockers:
-    #       return False
-    # # Everything seems to be ok
-    # return True

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index c39cf72..a2ad9c5 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -17,21 +17,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 '''
+import Queue
 
 import logging
 import traceback
 import threading
-from threading import Thread
 import pprint
 import os
 
 from LiveStatus import LiveStatus
 from shell import shellRunner
 import PuppetExecutor
-import UpgradeExecutor
 import PythonExecutor
 from ActualConfigHandler import ActualConfigHandler
-from ActionDependencyManager import ActionDependencyManager
 from CommandStatusDict import CommandStatusDict
 
 
@@ -49,13 +47,12 @@ class ActionQueue(threading.Thread):
 
   STATUS_COMMAND = 'STATUS_COMMAND'
   EXECUTION_COMMAND = 'EXECUTION_COMMAND'
-  ROLE_COMMAND_UPGRADE = 'UPGRADE'
 
   IN_PROGRESS_STATUS = 'IN_PROGRESS'
 
   def __init__(self, config, controller):
     super(ActionQueue, self).__init__()
-    self.commandQueue = ActionDependencyManager(config)
+    self.commandQueue = Queue.Queue()
     self.commandStatuses = CommandStatusDict(callback_action =
       self.status_update_callback)
     self.config = config
@@ -71,51 +68,30 @@ class ActionQueue(threading.Thread):
     return self._stop.isSet()
 
   def put(self, commands):
-    self.commandQueue.put_actions(commands)
+    for command in commands:
+      logger.info("Adding " + command['commandType'] + " for service " + \
+                  command['serviceName'] + " of cluster " + \
+                  command['clusterName'] + " to the queue.")
+      logger.debug(pprint.pformat(command))
+      self.commandQueue.put(command)
 
 
   def run(self):
     while not self.stopped():
-      # Taking a new portion of tasks
-      portion = self.commandQueue.get_next_action_group() # Will block if queue is empty
-      portion = portion[::-1] # Reverse list order
-      self.process_portion_of_actions(portion)
-
-
-  def process_portion_of_actions(self, portion):
-    # starting execution of a group of commands
-    running_list = []
-    finished_list = []
-    while portion or running_list: # While not finished actions in portion
-      # poll threads under execution
-      for thread in running_list:
-        if not thread.is_alive():
-          finished_list.append(thread)
-        # Remove finished from the running list
-      running_list[:] = [b for b in running_list if not b in finished_list]
-      # Start next actions
-      free_slots = self.MAX_CONCURRENT_ACTIONS - len(running_list)
-      while free_slots > 0 and portion: # Start new threads if available
-        command = portion.pop()
-        logger.debug("Took an element of Queue: " + pprint.pformat(command))
-        if command['commandType'] == self.EXECUTION_COMMAND:
-          # Start separate threads for commands of this type
-          action_thread = Thread(target =  self.execute_command_safely, args = (command, ))
-          running_list.append(action_thread)
-          free_slots -= 1
-          action_thread.start()
-        elif command['commandType'] == self.STATUS_COMMAND:
-          # Execute status commands immediately, in current thread
-          self.execute_status_command(command)
-        else:
-          logger.error("Unrecognized command " + pprint.pformat(command))
-    pass
+      command = self.commandQueue.get() # Will block if queue is empty
+      self.process_command(command)
 
 
-  def execute_command_safely(self, command):
+  def process_command(self, command):
+    logger.debug("Took an element of Queue: " + pprint.pformat(command))
     # make sure we log failures
     try:
-      self.execute_command(command)
+      if command['commandType'] == self.EXECUTION_COMMAND:
+        self.execute_command(command)
+      elif command['commandType'] == self.STATUS_COMMAND:
+        self.execute_status_command(command)
+      else:
+        logger.error("Unrecognized command " + pprint.pformat(command))
     except Exception, err:
       # Should not happen
       traceback.print_exc()
@@ -123,6 +99,9 @@ class ActionQueue(threading.Thread):
 
 
   def execute_command(self, command):
+    '''
+    Executes commands of type  EXECUTION_COMMAND
+    '''
     clusterName = command['clusterName']
     commandId = command['commandId']
 
@@ -147,17 +126,8 @@ class ActionQueue(threading.Thread):
       self.config.get('puppet', 'puppet_home'),
       self.config.get('puppet', 'facter_home'),
       self.config.get('agent', 'prefix'), self.config)
-    if command['roleCommand'] == ActionQueue.ROLE_COMMAND_UPGRADE:
-      # Create new instances for the current thread
-      pythonExecutor = PythonExecutor.PythonExecutor(
-          self.config.get('agent', 'prefix'), self.config)
-      upgradeExecutor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
-          puppetExecutor, self.config)
-      commandresult = upgradeExecutor.perform_stack_upgrade(command, in_progress_status['tmpout'],
-        in_progress_status['tmperr'])
-    else:
-      commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'],
-        in_progress_status['tmperr'])
+    commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'],
+      in_progress_status['tmperr'])
     # dumping results
     status = "COMPLETED"
     if commandresult['exitcode'] != 0:
@@ -189,6 +159,9 @@ class ActionQueue(threading.Thread):
 
 
   def execute_status_command(self, command):
+    '''
+    Executes commands of type STATUS_COMMAND
+    '''
     try:
       cluster = command['clusterName']
       service = command['serviceName']

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
index c369677..f0e20c4 100644
--- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
+++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
@@ -55,7 +55,7 @@ class Heartbeat:
                 }
 
     commandsInProgress = False
-    if self.actionQueue.commandQueue.is_action_group_available():
+    if not self.actionQueue.commandQueue.empty():
       commandsInProgress = True
     if len(queueResult) != 0:
       heartbeat['reports'] = queueResult['reports']

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py b/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
deleted file mode 100644
index b189921..0000000
--- a/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
+++ /dev/null
@@ -1,207 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-import json
-import os.path
-import logging
-import pprint
-import re
-from Grep import Grep
-from StackVersionsFileHandler import StackVersionsFileHandler
-
-
-logger = logging.getLogger()
-grep = Grep()
-
-class UpgradeExecutor:
-
-  """ Class that performs the StackVersion stack upgrade"""
-
-  SCRIPT_DIRS = [
-    'pre-upgrade.d',
-    'upgrade.d',
-    'post-upgrade.d'
-  ]
-
-  NAME_PARSING_FAILED_CODE = 999
-
-  def __init__(self, pythonExecutor, puppetExecutor, config):
-    self.pythonExecutor = pythonExecutor
-    self.puppetExecutor = puppetExecutor
-    self.stacksDir = config.get('stack', 'upgradeScriptsDir')
-    self.config = config
-    versionsFileDir = config.get('agent', 'prefix')
-    self.versionsHandler = StackVersionsFileHandler(versionsFileDir)
-
-
-  def perform_stack_upgrade(self, command, tmpout, tmperr):
-    logger.info("Performing stack upgrade")
-    params = command['commandParams']
-    srcStack = params['source_stack_version']
-    tgtStack = params['target_stack_version']
-    component = command['role']
-
-    srcStackTuple = self.split_stack_version(srcStack)
-    tgtStackTuple = self.split_stack_version(tgtStack)
-
-    if srcStackTuple is None or tgtStackTuple is None:
-      errorstr = "Source (%s) or target (%s) version does not match pattern \
-      <Name>-<Version>" % (srcStack, tgtStack)
-      logger.info(errorstr)
-      result = {
-        'exitcode' : 1,
-        'stdout'   : 'None',
-        'stderr'   : errorstr
-      }
-    elif srcStack != tgtStack:
-      paramTuple = sum((srcStackTuple, tgtStackTuple), ())
-      upgradeId = "%s-%s.%s_%s-%s.%s" % paramTuple
-      # Check stack version (do we need upgrade?)
-      basedir = os.path.join(self.stacksDir, upgradeId, component)
-      if not os.path.isdir(basedir):
-        errorstr = "Upgrade %s is not supported (dir %s does not exist)" \
-                   % (upgradeId, basedir)
-        logger.error(errorstr)
-        result = {
-          'exitcode' : 1,
-          'stdout'   : errorstr,
-          'stderr'   : errorstr
-        }
-      else:
-        result = {
-          'exitcode' : 0,
-          'stdout'   : '',
-          'stderr'   : ''
-        }
-        # Request repos update (will be executed once before running any pp file)
-        self.puppetExecutor.discardInstalledRepos()
-        for dir in self.SCRIPT_DIRS:
-          if result['exitcode'] != 0:
-            break
-          tmpRes = self.execute_dir(command, basedir, dir, tmpout, tmperr)
-
-          result = {
-            'exitcode' : result['exitcode'] or tmpRes['exitcode'],
-            'stdout'   : "%s\n%s" % (result['stdout'], tmpRes['stdout']),
-            'stderr'   : "%s\n%s" % (result['stderr'], tmpRes['stderr']),
-          }
-
-        if result['exitcode'] == 0:
-          logger.info("Upgrade %s successfully finished" % upgradeId)
-          self.versionsHandler.write_stack_version(component, tgtStack)
-    else:
-      infostr = "target_stack_version (%s) matches current stack version" \
-          " for component %s, nothing to do" % (tgtStack, component)
-      logger.info(infostr)
-      result = {
-        'exitcode' : 0,
-        'stdout'   : infostr,
-        'stderr'   : 'None'
-      }
-    result = {
-      'exitcode' : result['exitcode'],
-      'stdout'   : grep.tail(result['stdout'], grep.OUTPUT_LAST_LINES),
-      'stderr'   : grep.tail(result['stderr'], grep.OUTPUT_LAST_LINES)
-    }
-    return result
-
-
-  def get_key_func(self, name):
-    """
-    Returns a number from filenames like 70-foobar.* or 999 for not matching
-    filenames
-    """
-    parts = name.split('-', 1)
-    if not parts or not parts[0].isdigit():
-      logger.warn("Can't parse script filename number %s" % name)
-      return self.NAME_PARSING_FAILED_CODE # unknown element will be placed to the end of list
-    return int(parts[0])
-
-
-  def split_stack_version(self, verstr):
-    verdict = json.loads(verstr)
-    stack_name = verdict["stackName"].strip()
-
-    matchObj = re.match( r'(\d+).(\d+)', verdict["stackVersion"].strip(), re.M|re.I)
-    if matchObj:
-      stack_major_ver = matchObj.group(1)
-      stack_minor_ver = matchObj.group(2)
-      return stack_name, stack_major_ver, stack_minor_ver
-    else:
-      return None
-
-
-  def execute_dir(self, command, basedir, dir, tmpout, tmperr):
-    """
-    Executes *.py and *.pp files located in a given directory.
-    Files a executed in a numeric sorting order.
-    """
-    dirpath = os.path.join(basedir, dir)
-    logger.info("Executing %s" % dirpath)
-    if not os.path.isdir(dirpath):
-      warnstr = "Script directory %s does not exist, skipping" % dirpath
-      logger.warn(warnstr)
-      result = {
-        'exitcode' : 0,
-        'stdout'   : warnstr,
-        'stderr'   : 'None'
-      }
-      return result
-    fileList=os.listdir(dirpath)
-    fileList.sort(key = self.get_key_func)
-    formattedResult = {
-      'exitcode' : 0,
-      'stdout'   : '',
-      'stderr'   : ''
-    }
-    for filename in fileList:
-      prevcode = formattedResult['exitcode']
-      if prevcode != 0 or self.get_key_func(filename) == self.NAME_PARSING_FAILED_CODE:
-        break
-      filepath = os.path.join(dirpath, filename)
-      if filename.endswith(".pp"):
-        logger.info("Running puppet file %s" % filepath)
-        result = self.puppetExecutor.run_manifest(command, filepath,
-                                                                tmpout, tmperr)
-      elif filename.endswith(".py"):
-        logger.info("Running python file %s" % filepath)
-        result = self.pythonExecutor.run_file(command, filepath, tmpout, tmperr)
-      elif filename.endswith(".pyc"):
-        pass # skipping compiled files
-      else:
-        warnstr = "Unrecognized file type, skipping: %s" % filepath
-        logger.warn(warnstr)
-        result = {
-          'exitcode' : 0,
-          'stdout'   : warnstr,
-          'stderr'   : 'None'
-        }
-      formattedResult = {
-        'exitcode' : prevcode or result['exitcode'],
-        'stdout'   : "%s\n%s" % (formattedResult['stdout'], result['stdout']),
-        'stderr'   : "%s\n%s" % (formattedResult['stderr'], result['stderr']),
-      }
-    logger.debug("Result of %s: \n %s" % (dirpath, pprint.pformat(formattedResult)))
-    return formattedResult
-
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestActionDependencyManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/TestActionDependencyManager.py b/ambari-agent/src/test/python/TestActionDependencyManager.py
deleted file mode 100644
index a718779..0000000
--- a/ambari-agent/src/test/python/TestActionDependencyManager.py
+++ /dev/null
@@ -1,180 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-from unittest import TestCase
-from ambari_agent.AmbariConfig import AmbariConfig
-from ambari_agent.ActionQueue import ActionQueue
-from ambari_agent.ActionDependencyManager import ActionDependencyManager
-import os, errno, time, pprint, tempfile, threading, sys
-from mock.mock import patch, MagicMock, call
-
-class TestActionDependencyManager(TestCase):
-
-  dummy_RCO_file = os.path.join('dummy_files', 'test_rco_data.json')
-
-  def setUp(self):
-    self.config = AmbariConfig().getConfig()
-    self.config.set('agent', 'prefix', os.getcwd())
-    ActionDependencyManager.DEPS_FILE_NAME = self.dummy_RCO_file
-
-  # TODO: disabled for now
-  def disabled_test_init(self):
-    """
-    Tests config load
-    """
-    adm = ActionDependencyManager(self.config)
-    deps_dump = pprint.pformat(adm.deps)
-    expected = "{u'DATANODE-STOP': [u'JOBTRACKER-STOP',\n                    " \
-               "u'TASKTRACKER-STOP',\n                    " \
-               "u'RESOURCEMANAGER-STOP',\n                    " \
-               "u'NODEMANAGER-STOP',\n                    " \
-               "u'HISTORYSERVER-STOP',\n                    " \
-               "u'HBASE_MASTER-STOP'],\n u'HBASE_MASTER-START': " \
-               "[u'PEERSTATUS-START'],\n u'JOBTRACKER-START': " \
-               "[u'PEERSTATUS-START'],\n u'RESOURCEMANAGER-START': " \
-               "[u'NAMENODE-START', u'DATANODE-START'],\n " \
-               "u'SECONDARY_NAMENODE-START': [u'DATANODE-START', " \
-               "u'NAMENODE-START'],\n u'SECONDARY_NAMENODE-UPGRADE': " \
-               "[u'NAMENODE-UPGRADE']}"
-    self.assertEqual(deps_dump, expected)
-
-
-  def test_is_action_group_available(self):
-    adm = ActionDependencyManager(self.config)
-    self.assertFalse(adm.is_action_group_available())
-    adm.scheduled_action_groups.put(["test"])
-    self.assertTrue(adm.is_action_group_available())
-
-
-  def test_get_next_action_group(self):
-    adm = ActionDependencyManager(self.config)
-    test1 = ["test1"]
-    test2 = ["test2"]
-    adm.scheduled_action_groups.put(test1)
-    adm.scheduled_action_groups.put(test2)
-    adm.last_scheduled_group = test2
-    self.assertTrue(adm.is_action_group_available())
-    # Taking 1st
-    self.assertEqual(test1, adm.get_next_action_group())
-    self.assertTrue(len(adm.last_scheduled_group) == 1)
-    self.assertTrue(adm.is_action_group_available())
-    # Taking 2nd
-    self.assertEqual(test2, adm.get_next_action_group())
-    self.assertTrue(len(adm.last_scheduled_group) == 0)
-    self.assertTrue(adm.last_scheduled_group is not test2)
-    self.assertFalse(adm.is_action_group_available())
-
-
-  @patch.object(ActionDependencyManager, "dump_info")
-  @patch.object(ActionDependencyManager, "can_be_executed_in_parallel")
-  def test_put_action(self, can_be_executed_in_parallel_mock, dump_info_mock):
-    can_be_executed_in_parallel_mock.side_effect = [True, False, True, False,
-                                                     True, True, True, False]
-    adm = ActionDependencyManager(self.config)
-
-    adm.put_actions(list(range(0, 8)))
-
-    queue = []
-    while adm.is_action_group_available():
-      next = adm.get_next_action_group()
-      queue.append(next)
-
-    str = pprint.pformat(queue)
-    expected = "[[0], [1, 2], [3, 4, 5, 6], [7]]"
-    self.assertEqual(str, expected)
-
-
-  # TODO: disabled for now
-  def disabled_test_can_be_executed_in_parallel(self):
-    adm = ActionDependencyManager(self.config)
-    # empty group
-    group = []
-    install_command = {
-      'role': 'DATANODE',
-      'roleCommand': 'INSTALL',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    upgrade_command = {
-      'role': 'DATANODE',
-      'roleCommand': 'UPGRADE',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    start_command = {
-      'role': 'DATANODE',
-      'roleCommand': 'START',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    stop_command = {
-      'role': 'DATANODE',
-      'roleCommand': 'STOP',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    status_command = {
-      'commandType': ActionQueue.STATUS_COMMAND
-    }
-    rm_start_command = {
-      'role': 'RESOURCEMANAGER',
-      'roleCommand': 'START',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    hm_start_command = {
-      'role': 'HBASE_MASTER',
-      'roleCommand': 'START',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    self.assertTrue(adm.can_be_executed_in_parallel(install_command, group))
-    self.assertTrue(adm.can_be_executed_in_parallel(status_command, group))
-    # multiple status commands
-    group = []
-    for i in range(0, 3):
-      group.append(status_command)
-    self.assertTrue(adm.can_be_executed_in_parallel(status_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
-    # new status command
-    group = [install_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
-    # install/upgrade commands
-    group = [install_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
-    group = [upgrade_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
-    # Other commands
-    group = [start_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
-    self.assertTrue(adm.can_be_executed_in_parallel(hm_start_command, group))
-    # Check dependency processing
-    group = [start_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(rm_start_command, group))
-    group = [start_command]
-    self.assertTrue(adm.can_be_executed_in_parallel(hm_start_command, group))
-    # actions for the same component
-    group = [start_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(stop_command, group))
-    group = [stop_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
-

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/TestActionQueue.py b/ambari-agent/src/test/python/TestActionQueue.py
index 6fa1407..31643b2 100644
--- a/ambari-agent/src/test/python/TestActionQueue.py
+++ b/ambari-agent/src/test/python/TestActionQueue.py
@@ -17,13 +17,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 '''
+from Queue import Queue
 
 from unittest import TestCase
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent.PuppetExecutor import PuppetExecutor
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.AmbariConfig import AmbariConfig
-from ambari_agent.ActionDependencyManager import ActionDependencyManager
 import os, errno, time, pprint, tempfile, threading
 import StringIO
 import sys
@@ -31,7 +31,6 @@ from threading import Thread
 
 from mock.mock import patch, MagicMock, call
 from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
-from ambari_agent.UpgradeExecutor import UpgradeExecutor
 
 
 class TestActionQueue(TestCase):
@@ -126,106 +125,79 @@ class TestActionQueue(TestCase):
   }
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  @patch.object(ActionDependencyManager, "get_next_action_group")
-  @patch.object(ActionQueue, "process_portion_of_actions")
-  def test_ActionQueueStartStop(self, process_portion_of_actions_mock,
-                          get_next_action_group_mock, read_dependencies_mock):
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(Queue, "get")
+  def test_ActionQueueStartStop(self, get_mock, process_command_mock):
     actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
     actionQueue.start()
     time.sleep(0.1)
     actionQueue.stop()
     actionQueue.join()
     self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
-    self.assertTrue(get_next_action_group_mock.call_count > 1)
-    self.assertTrue(process_portion_of_actions_mock.call_count > 1)
+    self.assertTrue(process_command_mock.call_count > 1)
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
+  @patch("traceback.print_exc")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(ActionQueue, "execute_status_command")
-  def test_process_portion_of_actions(self, execute_status_command_mock,
-            executeCommand_mock, read_dependencies_mock):
+  def test_process_command(self, execute_status_command_mock,
+                           execute_command_mock, print_exc_mock):
     actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
-    # Test execution of EXECUTION_COMMANDs
-    max = 3
-    actionQueue.MAX_CONCURRENT_ACTIONS = max
-    unfreeze_flag = threading.Event()
-    sync_lock = threading.RLock()
-    stats = {
-      'waiting_threads' : 0
+    execution_command = {
+      'commandType' : ActionQueue.EXECUTION_COMMAND,
     }
-    def side_effect(self):
-      with sync_lock: # Synchtonized to avoid race effects during test execution
-        stats['waiting_threads'] += 1
-      unfreeze_flag.wait()
-    executeCommand_mock.side_effect = side_effect
-    portion = [self.datanode_install_command,
-               self.namenode_install_command,
-               self.snamenode_install_command,
-               self.nagios_install_command,
-               self.hbase_install_command]
+    status_command = {
+      'commandType' : ActionQueue.STATUS_COMMAND,
+    }
+    wrong_command = {
+      'commandType' : "SOME_WRONG_COMMAND",
+    }
+    # Try wrong command
+    actionQueue.process_command(wrong_command)
+    self.assertFalse(execute_command_mock.called)
+    self.assertFalse(execute_status_command_mock.called)
+    self.assertFalse(print_exc_mock.called)
 
-    # We call method in a separate thread
-    action_thread = Thread(target =  actionQueue.process_portion_of_actions, args = (portion, ))
-    action_thread.start()
-    # Now we wait to check that only MAX_CONCURRENT_ACTIONS threads are running
-    while stats['waiting_threads'] != max:
-      time.sleep(0.1)
-    self.assertEqual(stats['waiting_threads'], max)
-    # unfreezing waiting threads
-    unfreeze_flag.set()
-    # wait until all threads are finished
-    action_thread.join()
-    self.assertTrue(executeCommand_mock.call_count == 5)
+    execute_command_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
+    print_exc_mock.reset_mock()
+    # Try normal execution
+    actionQueue.process_command(execution_command)
+    self.assertTrue(execute_command_mock.called)
     self.assertFalse(execute_status_command_mock.called)
-    executeCommand_mock.reset_mock()
+    self.assertFalse(print_exc_mock.called)
+
+    execute_command_mock.reset_mock()
     execute_status_command_mock.reset_mock()
+    print_exc_mock.reset_mock()
 
-    # Test execution of STATUS_COMMANDs
-    n = 5
-    portion = []
-    for i in range(0, n):
-      status_command = {
-        'componentName': 'DATANODE',
-        'commandType': 'STATUS_COMMAND',
-      }
-      portion.append(status_command)
-    actionQueue.process_portion_of_actions(portion)
-    self.assertTrue(execute_status_command_mock.call_count == n)
-    self.assertFalse(executeCommand_mock.called)
-
-    # Test execution of unknown command
-    unknown_command = {
-      'commandType': 'WRONG_COMMAND',
-    }
-    portion = [unknown_command]
-    actionQueue.process_portion_of_actions(portion)
-    # no exception expected
-    pass
+    actionQueue.process_command(status_command)
+    self.assertFalse(execute_command_mock.called)
+    self.assertTrue(execute_status_command_mock.called)
+    self.assertFalse(print_exc_mock.called)
 
+    execute_command_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
+    print_exc_mock.reset_mock()
 
-  @patch("traceback.print_exc")
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  @patch.object(ActionQueue, "execute_command")
-  def test_execute_command_safely(self, execute_command_mock,
-                                  read_dependencies_mock, print_exc_mock):
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
-    # Try normal execution
-    actionQueue.execute_command_safely('command')
-    # Try exception ro check proper logging
+    # Try exception to check proper logging
     def side_effect(self):
       raise Exception("TerribleException")
     execute_command_mock.side_effect = side_effect
-    actionQueue.execute_command_safely('command')
+    actionQueue.process_command(execution_command)
+    self.assertTrue(print_exc_mock.called)
+
+    print_exc_mock.reset_mock()
+
+    execute_status_command_mock.side_effect = side_effect
+    actionQueue.process_command(execution_command)
     self.assertTrue(print_exc_mock.called)
 
 
+
   @patch("__builtin__.open")
   @patch.object(ActionQueue, "status_update_callback")
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  def test_execute_command(self, read_dependencies_mock,
-                           status_update_callback_mock, open_mock):
+  def test_execute_command(self, status_update_callback_mock, open_mock):
     # Make file read calls visible
     def open_side_effect(file, mode):
       if mode == 'r':
@@ -251,10 +223,7 @@ class TestActionQueue(TestCase):
     def patched_aq_execute_command(command):
       # We have to perform patching for separate thread in the same thread
       with patch.object(PuppetExecutor, "runCommand") as runCommand_mock:
-        with patch.object(UpgradeExecutor, "perform_stack_upgrade") \
-              as perform_stack_upgrade_mock:
           runCommand_mock.side_effect = side_effect
-          perform_stack_upgrade_mock.side_effect = side_effect
           actionQueue.execute_command(command)
     ### Test install/start/stop command ###
     ## Test successful execution with configuration tags
@@ -379,11 +348,10 @@ class TestActionQueue(TestCase):
 
   @patch.object(ActionQueue, "status_update_callback")
   @patch.object(StackVersionsFileHandler, "read_stack_version")
-  @patch.object(ActionDependencyManager, "read_dependencies")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(LiveStatus, "build")
   def test_execute_status_command(self, build_mock, execute_command_mock,
-                                  read_dependencies_mock, read_stack_version_mock,
+                                  read_stack_version_mock,
                                   status_update_callback):
     actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
     build_mock.return_value = "dummy report"
@@ -392,4 +360,4 @@ class TestActionQueue(TestCase):
     report = actionQueue.result()
     expected = 'dummy report'
     self.assertEqual(len(report['componentStatus']), 1)
-    self.assertEqual(report['componentStatus'][0], expected)
\ No newline at end of file
+    self.assertEqual(report['componentStatus'][0], expected)

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/TestController.py b/ambari-agent/src/test/python/TestController.py
index 79340ed..2b0e614 100644
--- a/ambari-agent/src/test/python/TestController.py
+++ b/ambari-agent/src/test/python/TestController.py
@@ -23,7 +23,6 @@ import StringIO
 import ssl
 import unittest, threading
 from ambari_agent import Controller, ActionQueue
-from  ambari_agent.ActionDependencyManager import ActionDependencyManager
 from ambari_agent import hostname
 import sys
 from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE
@@ -153,9 +152,7 @@ class TestController(unittest.TestCase):
   @patch("urllib2.build_opener")
   @patch("urllib2.install_opener")
   @patch.object(ActionQueue.ActionQueue, "run")
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  @patch.object(ActionDependencyManager, "dump_info")
-  def test_repeatRegistration(self, dump_info_mock, read_dependencies_mock,
+  def test_repeatRegistration(self,
                               run_mock, installMock, buildMock):
 
     registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/TestHeartbeat.py b/ambari-agent/src/test/python/TestHeartbeat.py
index cceb764..5ef5d72 100644
--- a/ambari-agent/src/test/python/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/TestHeartbeat.py
@@ -21,7 +21,6 @@ limitations under the License.
 from unittest import TestCase
 import unittest
 from ambari_agent.Heartbeat import Heartbeat
-from ambari_agent.ActionDependencyManager import ActionDependencyManager
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent import AmbariConfig
@@ -47,8 +46,7 @@ class TestHeartbeat(TestCase):
     sys.stdout = sys.__stdout__
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  def test_build(self, read_dependencies_mock):
+  def test_build(self):
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     heartbeat = Heartbeat(actionQueue)
     result = heartbeat.build(100)
@@ -66,12 +64,9 @@ class TestHeartbeat(TestCase):
     self.assertEquals(not heartbeat.reports, True, "Heartbeat should not contain task in progress")
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
   @patch.object(ActionQueue, "result")
-  @patch.object(ActionDependencyManager, "is_action_group_available")
   @patch.object(HostInfo, "register")
-  def test_no_mapping(self, register_mock, is_action_group_available_mock, result_mock,
-                      read_dependencies_mock):
+  def test_no_mapping(self, register_mock, result_mock):
     result_mock.return_value = {
       'reports': [{'status': 'IN_PROGRESS',
                    'stderr': 'Read from /tmp/errors-3.txt',
@@ -95,11 +90,8 @@ class TestHeartbeat(TestCase):
     self.assertEqual(register_mock.call_args_list[0][0][1], False)
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
   @patch.object(ActionQueue, "result")
-  @patch.object(ActionDependencyManager, "is_action_group_available")
-  def test_build_long_result(self, is_action_group_available_mock, result_mock,
-                  read_dependencies_mock):
+  def test_build_long_result(self, result_mock):
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     result_mock.return_value = {
       'reports': [{'status': 'IN_PROGRESS',
@@ -184,22 +176,17 @@ class TestHeartbeat(TestCase):
     self.assertEquals(hb, expected)
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  @patch.object(ActionDependencyManager, "dump_info")
-  @patch.object(ActionDependencyManager, "can_be_executed_in_parallel")
   @patch.object(HostInfo, 'register')
-  def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock,
-      can_be_executed_in_parallel_mock, dump_info_mock, read_dependencies_mock):
-    can_be_executed_in_parallel_mock.return_value = False
+  def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     statusCommand = {
       "serviceName" : 'HDFS',
       "commandType" : "STATUS_COMMAND",
-      "clusterName" : "",
+      "clusterName" : "c1",
       "componentName" : "DATANODE",
       'configurations':{'global' : {}}
     }
-    actionQueue.put(list(statusCommand))
+    actionQueue.put([statusCommand])
 
     heartbeat = Heartbeat(actionQueue)
     heartbeat.build(12, 6)
@@ -209,9 +196,8 @@ class TestHeartbeat(TestCase):
     self.assertFalse(args[1])
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
   @patch.object(HostInfo, 'register')
-  def test_heartbeat_host_check_no_cmd(self, register_mock, read_dependencies_mock):
+  def test_heartbeat_host_check_no_cmd(self, register_mock):
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     heartbeat = Heartbeat(actionQueue)
     heartbeat.build(12, 6)
@@ -222,4 +208,4 @@ class TestHeartbeat(TestCase):
 
 
 if __name__ == "__main__":
-  unittest.main(verbosity=2)
\ No newline at end of file
+  unittest.main(verbosity=2)

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestUpgradeExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/TestUpgradeExecutor.py b/ambari-agent/src/test/python/TestUpgradeExecutor.py
deleted file mode 100644
index 7abb959..0000000
--- a/ambari-agent/src/test/python/TestUpgradeExecutor.py
+++ /dev/null
@@ -1,264 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-from unittest import TestCase
-import unittest
-import StringIO
-import socket
-import os, sys, pprint, json
-from mock.mock import patch
-from mock.mock import MagicMock
-from mock.mock import create_autospec
-import os, errno, tempfile
-from ambari_agent import UpgradeExecutor
-import logging
-from ambari_agent import AmbariConfig
-from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
-
-class TestUpgradeExecutor(TestCase):
-
-  logger = logging.getLogger()
-
-  @patch.object(StackVersionsFileHandler, 'write_stack_version')
-  @patch('os.path.isdir')
-  def test_perform_stack_upgrade(self, isdir_method, write_stack_version_method):
-    puppetExecutor = MagicMock()
-    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
-      puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
-    # Checking matching versions
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-       },
-      'role' : 'HDFS'
-    }
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue('matches current stack version' in result['stdout'])
-    self.assertFalse(write_stack_version_method.called)
-    # Checking unsupported update
-    write_stack_version_method.reset()
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-      },
-      'role' : 'HDFS'
-    }
-    isdir_method.return_value = False
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue('not supported' in result['stderr'])
-    self.assertFalse(write_stack_version_method.called)
-    # Checking wrong source version
-    write_stack_version_method.reset()
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"Wrong\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-        },
-      'role' : 'HDFS'
-    }
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue('does not match pattern' in result['stderr'])
-    self.assertFalse(write_stack_version_method.called)
-    # Checking wrong target version
-    write_stack_version_method.reset()
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"Wrong\"}',
-        },
-      'role' : 'HDFS'
-    }
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue('does not match pattern' in result['stderr'])
-    self.assertFalse(write_stack_version_method.called)
-    # Checking successful result
-    write_stack_version_method.reset()
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-      },
-      'role' : 'HDFS'
-    }
-    isdir_method.return_value = True
-    executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr : \
-      {
-        'exitcode' : 0,
-        'stdout'   : "output - %s" % dir,
-        'stderr'   : "errors - %s" % dir,
-      }
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue(write_stack_version_method.called)
-    self.assertEquals(result['exitcode'],0)
-    self.assertEquals(result['stdout'],'output - pre-upgrade.d\noutput - upgrade.d\noutput - post-upgrade.d')
-    self.assertEquals(result['stderr'],'errors - pre-upgrade.d\nerrors - upgrade.d\nerrors - post-upgrade.d')
-    # Checking failed result
-    write_stack_version_method.reset()
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-      },
-      'role' : 'HDFS'
-    }
-    isdir_method.return_value = True
-    executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr :\
-    {
-      'exitcode' : 1,
-      'stdout'   : "output - %s" % dir,
-      'stderr'   : "errors - %s" % dir,
-      }
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue(write_stack_version_method.called)
-    self.assertEquals(result['exitcode'],1)
-    self.assertEquals(result['stdout'],'output - pre-upgrade.d')
-    self.assertEquals(result['stderr'],'errors - pre-upgrade.d')
-
-
-  def test_get_key_func(self):
-    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
-                 'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
-    # Checking unparseable
-    self.assertEqual(executor.get_key_func('fdsfds'), 999)
-    self.assertEqual(executor.get_key_func('99dfsfd'), 999)
-    self.assertEqual(executor.get_key_func('-fdfds'), 999)
-    # checking parseable
-    self.assertEqual(executor.get_key_func('99'), 99)
-    self.assertEqual(executor.get_key_func('45-install'), 45)
-    self.assertEqual(executor.get_key_func('33-install-staff'), 33)
-    #checking sorting of full list
-    testlist1 = ['7-fdfd', '10-erewfds', '11-fdfdfd', '1-hh', '20-kk', '01-tt']
-    testlist1.sort(key = executor.get_key_func)
-    self.assertEqual(testlist1,
-        ['1-hh', '01-tt', '7-fdfd', '10-erewfds', '11-fdfdfd', '20-kk'])
-
-
-  def test_split_stack_version(self):
-    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
-             'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
-    result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1.2.1\"}')
-    self.assertEquals(result, ('HDP', '1', '2'))
-    result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1.3\"}')
-    self.assertEquals(result, ('HDP', '1', '3'))
-    result = executor.split_stack_version('{\"stackName\":\"ComplexStackVersion\",\"stackVersion\":\"1.3.4.2.2\"}')
-    self.assertEquals(result, ('ComplexStackVersion', '1', '3'))
-    result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1\"}')
-    self.assertEquals(result, None)
-    pass
-
-
-  @patch('os.listdir')
-  @patch('os.path.isdir')
-  @patch.object(UpgradeExecutor.UpgradeExecutor, 'get_key_func')
-  def test_execute_dir(self, get_key_func_method, isdir_method, listdir_method):
-    pythonExecutor = MagicMock()
-    puppetExecutor = MagicMock()
-
-    command = {'debug': 'command'}
-    isdir_method.return_value = True
-    # Mocking sort() method of list
-    class MyList(list):
-      pass
-    files = MyList(['first.py', 'second.pp', 'third.py', 'fourth.nm',
-             'fifth-failing.py', 'six.py'])
-    files.sort = lambda key: None
-    listdir_method.return_value = files
-    # fifth-failing.py will fail
-    pythonExecutor.run_file.side_effect = [
-      {'exitcode' : 0,
-       'stdout'   : "stdout - first.py",
-       'stderr'   : "stderr - first.py"},
-      {'exitcode' : 0,
-       'stdout'   : "stdout - third.py",
-       'stderr'   : "stderr - third.py"},
-      {'exitcode' : 1,
-       'stdout'   : "stdout - fifth-failing.py",
-       'stderr'   : "stderr - fifth-failing.py"},
-      {'exitcode' : 0,
-       'stdout'   : "stdout - six.py",
-       'stderr'   : "stderr - six.py"},
-    ]
-    puppetExecutor.run_manifest.side_effect = [
-      {'exitcode' : 0,
-       'stdout'   : "stdout - second.pp",
-       'stderr'   : "stderr - second.pp"},
-    ]
-
-    executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
-        puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
-    result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr')
-    self.assertEquals(result['exitcode'],1)
-    self.assertEquals(result['stdout'],"\nstdout - first.py\nstdout - second.pp\nstdout - third.py\nUnrecognized file type, skipping: basedir/dir/fourth.nm\nstdout - fifth-failing.py")
-    self.assertEquals(result['stderr'],"\nstderr - first.py\nstderr - second.pp\nstderr - third.py\nNone\nstderr - fifth-failing.py")
-
-
-  @patch('os.path.isdir')
-  def test_execute_dir_not_existing(self, isdir_method):
-    pythonExecutor = MagicMock()
-    puppetExecutor = MagicMock()
-
-    command = {'debug': 'command'}
-    isdir_method.return_value = False
-
-    executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
-        puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
-    result= executor.execute_dir(command, 'basedir', 'not_existing_dir', 'tmpout', 'tmperr')
-    self.assertEquals(result['exitcode'],0)
-    self.assertEquals(result['stdout'],'Script directory basedir/not_existing_dir does not exist, skipping')
-    self.assertEquals(result['stderr'],'None')
-
-
-  @patch('os.listdir')
-  @patch('os.path.isdir')
-  def test_execute_dir_ignore_badly_named(self, isdir_method, listdir_method):
-    pythonExecutor = MagicMock()
-    puppetExecutor = MagicMock()
-
-    command = {'debug': 'command'}
-    isdir_method.return_value = True
-    files = ['00-first.py', 'badly-named.pp', '10-second.pp', '20-wrong.cpp']
-    listdir_method.return_value = files
-    # fifth-failing.py will fail
-    pythonExecutor.run_file.side_effect = [
-      {'exitcode' : 0,
-       'stdout'   : "stdout - python.py",
-       'stderr'   : "stderr - python.py"},
-    ]
-    puppetExecutor.run_manifest.side_effect = [
-      {'exitcode' : 0,
-       'stdout'   : "stdout - puppet.pp",
-       'stderr'   : "stderr - puppet.pp"},
-    ]
-
-    executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
-        puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
-    result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr')
-    self.assertEquals(result['exitcode'],0)
-    self.assertEquals(result['stdout'],'\nstdout - python.py\nstdout - puppet.pp\nUnrecognized file type, skipping: basedir/dir/20-wrong.cpp')
-    self.assertEquals(result['stderr'],'\nstderr - python.py\nstderr - puppet.pp\nNone')
-
-if __name__ == "__main__":
-  unittest.main(verbosity=2)
\ No newline at end of file