You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2013/10/09 20:12:39 UTC
git commit: AMBARI-3490. Remove RCO management logic at ambari-agent
(dlysnichencko)
Updated Branches:
refs/heads/trunk 78e3d4142 -> 489a193dd
AMBARI-3490. Remove RCO management logic at ambari-agent (dlysnichencko)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/489a193d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/489a193d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/489a193d
Branch: refs/heads/trunk
Commit: 489a193dd2f54215f8b6f9dc9890032ca746b301
Parents: 78e3d41
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Wed Oct 9 21:10:30 2013 +0300
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Wed Oct 9 21:10:30 2013 +0300
----------------------------------------------------------------------
ambari-agent/pom.xml | 4 -
.../ambari_agent/ActionDependencyManager.py | 163 ------------
.../src/main/python/ambari_agent/ActionQueue.py | 79 ++----
.../src/main/python/ambari_agent/Heartbeat.py | 2 +-
.../main/python/ambari_agent/UpgradeExecutor.py | 207 ---------------
.../test/python/TestActionDependencyManager.py | 180 -------------
ambari-agent/src/test/python/TestActionQueue.py | 132 ++++------
ambari-agent/src/test/python/TestController.py | 5 +-
ambari-agent/src/test/python/TestHeartbeat.py | 30 +--
.../src/test/python/TestUpgradeExecutor.py | 264 -------------------
10 files changed, 86 insertions(+), 980 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml
index 0341f0d..8681420 100644
--- a/ambari-agent/pom.xml
+++ b/ambari-agent/pom.xml
@@ -328,10 +328,6 @@
<location>../version</location>
<filter>true</filter>
</source>
- <!--<source>-->
- <!--<!– This file is also included into server rpm–>-->
- <!--<location>../ambari-common/src/main/resources/role_command_order.json</location>-->
- <!--</source>-->
</sources>
</mapping>
<!-- -->
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py b/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py
deleted file mode 100644
index 0addc9a..0000000
--- a/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py
+++ /dev/null
@@ -1,163 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-import logging
-import Queue
-import threading
-import pprint
-import os
-import json
-
-logger = logging.getLogger()
-
-class ActionDependencyManager():
- """
- Implemments a scheduler of role commands (like DATANODE-START) based on
- dependencies between them. Class does not execute actions, it only
- breaks them on groups that may be executed in parallel.
- """
-
- DEPS_FILE_NAME="role_command_order.json"
- COMMENT_STR="_comment"
-
- # Dictionary of dependencies. Format:
- # BlockedRole-Command : [BlockerRole1-Command1, BlockerRole2-Command2, ...]
-
-
- def __init__(self, config):
- self.deps = {}
- self.last_scheduled_group = []
- self.scheduled_action_groups = Queue.Queue()
- self.lock = threading.RLock()
- self.config = config
- #self.read_dependencies()
-
-
- def read_dependencies(self):
- """
- Load dependencies from file
- """
- prefix_dir = self.config.get('agent', 'prefix')
- action_order_file = os.path.join(prefix_dir, self.DEPS_FILE_NAME)
- with open(action_order_file) as f:
- action_order_data = json.load(f)
- for deps_group in action_order_data.keys():
- if deps_group != self.COMMENT_STR: # if entry is not a comment
- deps_group_list = action_order_data[deps_group]
- for blocked_str in deps_group_list:
- if blocked_str != self.COMMENT_STR: # if entry is not a comment
- blocker_list = deps_group_list[blocked_str]
- if blocked_str not in self.deps:
- self.deps[blocked_str]=[]
- for blocker_str in blocker_list:
- self.deps[blocked_str].append(blocker_str)
- pass
-
-
- def is_action_group_available(self):
- return not self.scheduled_action_groups.empty()
-
-
- def get_next_action_group(self):
- """
- Returns next group of scheduled actions that may be
- executed in parallel. If queue is empty, blocks until
- an item is available (until next put_action() call)
- """
- next_group = self.scheduled_action_groups.get(block=True)
- with self.lock: # Synchronized
- if next_group is self.last_scheduled_group:
- # Group is not eligible for appending, creating new one
- self.last_scheduled_group = []
-
- dump_str = pprint.pformat(next_group)
- logger.debug("Next action group: {0}".format(dump_str))
- return next_group
-
-
- def put_actions(self, actions):
- """
- Schedules actions to be executed in some time at future.
- Here we rely on serial command execution sequence received from server.
- Some of these commands may be executed in parallel with others, so we
- unite them into a group.
- """
- with self.lock: # Synchronized
- for action in actions:
- self.dump_info(action)
- was_empty = len(self.last_scheduled_group) == 0
- if self.can_be_executed_in_parallel(action, self.last_scheduled_group):
- self.last_scheduled_group.append(action)
- else: # create a new group
- self.last_scheduled_group = [action]
- was_empty = True
- if was_empty:
- # last_scheduled_group is not empty now, so we add it to the queue
- self.scheduled_action_groups.put(self.last_scheduled_group)
-
-
- def dump_info(self, action):
- """
- Prints info about command to log
- """
- logger.info("Adding " + action['commandType'] + " for service " + \
- action['serviceName'] + " of cluster " + \
- action['clusterName'] + " to the queue.")
- logger.debug(pprint.pformat(action))
-
-
- def can_be_executed_in_parallel(self, action, group):
- """
- Checks whether action may be executed in parallel with a given group
- """
- # Hack: parallel execution disabled
- return False
-
- # from ActionQueue import ActionQueue
- # # Empty group is compatible with any action
- # if not group:
- # return True
- # # Status commands are placed into a separate group to avoid race conditions
- # if action['commandType'] == ActionQueue.STATUS_COMMAND:
- # for scheduled_action in group:
- # if scheduled_action['commandType'] != ActionQueue.STATUS_COMMAND:
- # return False
- # return True
- # # We avoid executing install/upgrade threads in parallel with anything
- # standalone_commands = ["INSTALL", ActionQueue.ROLE_COMMAND_UPGRADE]
- # if action['roleCommand'] in standalone_commands:
- # return False
- # # We can not perform few actions (like STOP and START) for a component
- # # at the same time
- # for scheduled_action in group:
- # if scheduled_action['role'] == action['role']:
- # return False
- # # In other cases, check dependencies
- # pattern = "{0}-{1}"
- # new_action_str = pattern.format(action['role'], action['roleCommand'])
- # for scheduled_action in group:
- # if new_action_str in self.deps:
- # blockers = self.deps[new_action_str]
- # scheduled_action_str = pattern.format(
- # scheduled_action['role'], scheduled_action['roleCommand'])
- # if scheduled_action_str in blockers:
- # return False
- # # Everything seems to be ok
- # return True
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index c39cf72..a2ad9c5 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -17,21 +17,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
+import Queue
import logging
import traceback
import threading
-from threading import Thread
import pprint
import os
from LiveStatus import LiveStatus
from shell import shellRunner
import PuppetExecutor
-import UpgradeExecutor
import PythonExecutor
from ActualConfigHandler import ActualConfigHandler
-from ActionDependencyManager import ActionDependencyManager
from CommandStatusDict import CommandStatusDict
@@ -49,13 +47,12 @@ class ActionQueue(threading.Thread):
STATUS_COMMAND = 'STATUS_COMMAND'
EXECUTION_COMMAND = 'EXECUTION_COMMAND'
- ROLE_COMMAND_UPGRADE = 'UPGRADE'
IN_PROGRESS_STATUS = 'IN_PROGRESS'
def __init__(self, config, controller):
super(ActionQueue, self).__init__()
- self.commandQueue = ActionDependencyManager(config)
+ self.commandQueue = Queue.Queue()
self.commandStatuses = CommandStatusDict(callback_action =
self.status_update_callback)
self.config = config
@@ -71,51 +68,30 @@ class ActionQueue(threading.Thread):
return self._stop.isSet()
def put(self, commands):
- self.commandQueue.put_actions(commands)
+ for command in commands:
+ logger.info("Adding " + command['commandType'] + " for service " + \
+ command['serviceName'] + " of cluster " + \
+ command['clusterName'] + " to the queue.")
+ logger.debug(pprint.pformat(command))
+ self.commandQueue.put(command)
def run(self):
while not self.stopped():
- # Taking a new portion of tasks
- portion = self.commandQueue.get_next_action_group() # Will block if queue is empty
- portion = portion[::-1] # Reverse list order
- self.process_portion_of_actions(portion)
-
-
- def process_portion_of_actions(self, portion):
- # starting execution of a group of commands
- running_list = []
- finished_list = []
- while portion or running_list: # While not finished actions in portion
- # poll threads under execution
- for thread in running_list:
- if not thread.is_alive():
- finished_list.append(thread)
- # Remove finished from the running list
- running_list[:] = [b for b in running_list if not b in finished_list]
- # Start next actions
- free_slots = self.MAX_CONCURRENT_ACTIONS - len(running_list)
- while free_slots > 0 and portion: # Start new threads if available
- command = portion.pop()
- logger.debug("Took an element of Queue: " + pprint.pformat(command))
- if command['commandType'] == self.EXECUTION_COMMAND:
- # Start separate threads for commands of this type
- action_thread = Thread(target = self.execute_command_safely, args = (command, ))
- running_list.append(action_thread)
- free_slots -= 1
- action_thread.start()
- elif command['commandType'] == self.STATUS_COMMAND:
- # Execute status commands immediately, in current thread
- self.execute_status_command(command)
- else:
- logger.error("Unrecognized command " + pprint.pformat(command))
- pass
+ command = self.commandQueue.get() # Will block if queue is empty
+ self.process_command(command)
- def execute_command_safely(self, command):
+ def process_command(self, command):
+ logger.debug("Took an element of Queue: " + pprint.pformat(command))
# make sure we log failures
try:
- self.execute_command(command)
+ if command['commandType'] == self.EXECUTION_COMMAND:
+ self.execute_command(command)
+ elif command['commandType'] == self.STATUS_COMMAND:
+ self.execute_status_command(command)
+ else:
+ logger.error("Unrecognized command " + pprint.pformat(command))
except Exception, err:
# Should not happen
traceback.print_exc()
@@ -123,6 +99,9 @@ class ActionQueue(threading.Thread):
def execute_command(self, command):
+ '''
+ Executes commands of type EXECUTION_COMMAND
+ '''
clusterName = command['clusterName']
commandId = command['commandId']
@@ -147,17 +126,8 @@ class ActionQueue(threading.Thread):
self.config.get('puppet', 'puppet_home'),
self.config.get('puppet', 'facter_home'),
self.config.get('agent', 'prefix'), self.config)
- if command['roleCommand'] == ActionQueue.ROLE_COMMAND_UPGRADE:
- # Create new instances for the current thread
- pythonExecutor = PythonExecutor.PythonExecutor(
- self.config.get('agent', 'prefix'), self.config)
- upgradeExecutor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
- puppetExecutor, self.config)
- commandresult = upgradeExecutor.perform_stack_upgrade(command, in_progress_status['tmpout'],
- in_progress_status['tmperr'])
- else:
- commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'],
- in_progress_status['tmperr'])
+ commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'],
+ in_progress_status['tmperr'])
# dumping results
status = "COMPLETED"
if commandresult['exitcode'] != 0:
@@ -189,6 +159,9 @@ class ActionQueue(threading.Thread):
def execute_status_command(self, command):
+ '''
+ Executes commands of type STATUS_COMMAND
+ '''
try:
cluster = command['clusterName']
service = command['serviceName']
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
index c369677..f0e20c4 100644
--- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
+++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
@@ -55,7 +55,7 @@ class Heartbeat:
}
commandsInProgress = False
- if self.actionQueue.commandQueue.is_action_group_available():
+ if not self.actionQueue.commandQueue.empty():
commandsInProgress = True
if len(queueResult) != 0:
heartbeat['reports'] = queueResult['reports']
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py b/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
deleted file mode 100644
index b189921..0000000
--- a/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
+++ /dev/null
@@ -1,207 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-import json
-import os.path
-import logging
-import pprint
-import re
-from Grep import Grep
-from StackVersionsFileHandler import StackVersionsFileHandler
-
-
-logger = logging.getLogger()
-grep = Grep()
-
-class UpgradeExecutor:
-
- """ Class that performs the StackVersion stack upgrade"""
-
- SCRIPT_DIRS = [
- 'pre-upgrade.d',
- 'upgrade.d',
- 'post-upgrade.d'
- ]
-
- NAME_PARSING_FAILED_CODE = 999
-
- def __init__(self, pythonExecutor, puppetExecutor, config):
- self.pythonExecutor = pythonExecutor
- self.puppetExecutor = puppetExecutor
- self.stacksDir = config.get('stack', 'upgradeScriptsDir')
- self.config = config
- versionsFileDir = config.get('agent', 'prefix')
- self.versionsHandler = StackVersionsFileHandler(versionsFileDir)
-
-
- def perform_stack_upgrade(self, command, tmpout, tmperr):
- logger.info("Performing stack upgrade")
- params = command['commandParams']
- srcStack = params['source_stack_version']
- tgtStack = params['target_stack_version']
- component = command['role']
-
- srcStackTuple = self.split_stack_version(srcStack)
- tgtStackTuple = self.split_stack_version(tgtStack)
-
- if srcStackTuple is None or tgtStackTuple is None:
- errorstr = "Source (%s) or target (%s) version does not match pattern \
- <Name>-<Version>" % (srcStack, tgtStack)
- logger.info(errorstr)
- result = {
- 'exitcode' : 1,
- 'stdout' : 'None',
- 'stderr' : errorstr
- }
- elif srcStack != tgtStack:
- paramTuple = sum((srcStackTuple, tgtStackTuple), ())
- upgradeId = "%s-%s.%s_%s-%s.%s" % paramTuple
- # Check stack version (do we need upgrade?)
- basedir = os.path.join(self.stacksDir, upgradeId, component)
- if not os.path.isdir(basedir):
- errorstr = "Upgrade %s is not supported (dir %s does not exist)" \
- % (upgradeId, basedir)
- logger.error(errorstr)
- result = {
- 'exitcode' : 1,
- 'stdout' : errorstr,
- 'stderr' : errorstr
- }
- else:
- result = {
- 'exitcode' : 0,
- 'stdout' : '',
- 'stderr' : ''
- }
- # Request repos update (will be executed once before running any pp file)
- self.puppetExecutor.discardInstalledRepos()
- for dir in self.SCRIPT_DIRS:
- if result['exitcode'] != 0:
- break
- tmpRes = self.execute_dir(command, basedir, dir, tmpout, tmperr)
-
- result = {
- 'exitcode' : result['exitcode'] or tmpRes['exitcode'],
- 'stdout' : "%s\n%s" % (result['stdout'], tmpRes['stdout']),
- 'stderr' : "%s\n%s" % (result['stderr'], tmpRes['stderr']),
- }
-
- if result['exitcode'] == 0:
- logger.info("Upgrade %s successfully finished" % upgradeId)
- self.versionsHandler.write_stack_version(component, tgtStack)
- else:
- infostr = "target_stack_version (%s) matches current stack version" \
- " for component %s, nothing to do" % (tgtStack, component)
- logger.info(infostr)
- result = {
- 'exitcode' : 0,
- 'stdout' : infostr,
- 'stderr' : 'None'
- }
- result = {
- 'exitcode' : result['exitcode'],
- 'stdout' : grep.tail(result['stdout'], grep.OUTPUT_LAST_LINES),
- 'stderr' : grep.tail(result['stderr'], grep.OUTPUT_LAST_LINES)
- }
- return result
-
-
- def get_key_func(self, name):
- """
- Returns a number from filenames like 70-foobar.* or 999 for not matching
- filenames
- """
- parts = name.split('-', 1)
- if not parts or not parts[0].isdigit():
- logger.warn("Can't parse script filename number %s" % name)
- return self.NAME_PARSING_FAILED_CODE # unknown element will be placed to the end of list
- return int(parts[0])
-
-
- def split_stack_version(self, verstr):
- verdict = json.loads(verstr)
- stack_name = verdict["stackName"].strip()
-
- matchObj = re.match( r'(\d+).(\d+)', verdict["stackVersion"].strip(), re.M|re.I)
- if matchObj:
- stack_major_ver = matchObj.group(1)
- stack_minor_ver = matchObj.group(2)
- return stack_name, stack_major_ver, stack_minor_ver
- else:
- return None
-
-
- def execute_dir(self, command, basedir, dir, tmpout, tmperr):
- """
- Executes *.py and *.pp files located in a given directory.
- Files a executed in a numeric sorting order.
- """
- dirpath = os.path.join(basedir, dir)
- logger.info("Executing %s" % dirpath)
- if not os.path.isdir(dirpath):
- warnstr = "Script directory %s does not exist, skipping" % dirpath
- logger.warn(warnstr)
- result = {
- 'exitcode' : 0,
- 'stdout' : warnstr,
- 'stderr' : 'None'
- }
- return result
- fileList=os.listdir(dirpath)
- fileList.sort(key = self.get_key_func)
- formattedResult = {
- 'exitcode' : 0,
- 'stdout' : '',
- 'stderr' : ''
- }
- for filename in fileList:
- prevcode = formattedResult['exitcode']
- if prevcode != 0 or self.get_key_func(filename) == self.NAME_PARSING_FAILED_CODE:
- break
- filepath = os.path.join(dirpath, filename)
- if filename.endswith(".pp"):
- logger.info("Running puppet file %s" % filepath)
- result = self.puppetExecutor.run_manifest(command, filepath,
- tmpout, tmperr)
- elif filename.endswith(".py"):
- logger.info("Running python file %s" % filepath)
- result = self.pythonExecutor.run_file(command, filepath, tmpout, tmperr)
- elif filename.endswith(".pyc"):
- pass # skipping compiled files
- else:
- warnstr = "Unrecognized file type, skipping: %s" % filepath
- logger.warn(warnstr)
- result = {
- 'exitcode' : 0,
- 'stdout' : warnstr,
- 'stderr' : 'None'
- }
- formattedResult = {
- 'exitcode' : prevcode or result['exitcode'],
- 'stdout' : "%s\n%s" % (formattedResult['stdout'], result['stdout']),
- 'stderr' : "%s\n%s" % (formattedResult['stderr'], result['stderr']),
- }
- logger.debug("Result of %s: \n %s" % (dirpath, pprint.pformat(formattedResult)))
- return formattedResult
-
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestActionDependencyManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/TestActionDependencyManager.py b/ambari-agent/src/test/python/TestActionDependencyManager.py
deleted file mode 100644
index a718779..0000000
--- a/ambari-agent/src/test/python/TestActionDependencyManager.py
+++ /dev/null
@@ -1,180 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-from unittest import TestCase
-from ambari_agent.AmbariConfig import AmbariConfig
-from ambari_agent.ActionQueue import ActionQueue
-from ambari_agent.ActionDependencyManager import ActionDependencyManager
-import os, errno, time, pprint, tempfile, threading, sys
-from mock.mock import patch, MagicMock, call
-
-class TestActionDependencyManager(TestCase):
-
- dummy_RCO_file = os.path.join('dummy_files', 'test_rco_data.json')
-
- def setUp(self):
- self.config = AmbariConfig().getConfig()
- self.config.set('agent', 'prefix', os.getcwd())
- ActionDependencyManager.DEPS_FILE_NAME = self.dummy_RCO_file
-
- # TODO: disabled for now
- def disabled_test_init(self):
- """
- Tests config load
- """
- adm = ActionDependencyManager(self.config)
- deps_dump = pprint.pformat(adm.deps)
- expected = "{u'DATANODE-STOP': [u'JOBTRACKER-STOP',\n " \
- "u'TASKTRACKER-STOP',\n " \
- "u'RESOURCEMANAGER-STOP',\n " \
- "u'NODEMANAGER-STOP',\n " \
- "u'HISTORYSERVER-STOP',\n " \
- "u'HBASE_MASTER-STOP'],\n u'HBASE_MASTER-START': " \
- "[u'PEERSTATUS-START'],\n u'JOBTRACKER-START': " \
- "[u'PEERSTATUS-START'],\n u'RESOURCEMANAGER-START': " \
- "[u'NAMENODE-START', u'DATANODE-START'],\n " \
- "u'SECONDARY_NAMENODE-START': [u'DATANODE-START', " \
- "u'NAMENODE-START'],\n u'SECONDARY_NAMENODE-UPGRADE': " \
- "[u'NAMENODE-UPGRADE']}"
- self.assertEqual(deps_dump, expected)
-
-
- def test_is_action_group_available(self):
- adm = ActionDependencyManager(self.config)
- self.assertFalse(adm.is_action_group_available())
- adm.scheduled_action_groups.put(["test"])
- self.assertTrue(adm.is_action_group_available())
-
-
- def test_get_next_action_group(self):
- adm = ActionDependencyManager(self.config)
- test1 = ["test1"]
- test2 = ["test2"]
- adm.scheduled_action_groups.put(test1)
- adm.scheduled_action_groups.put(test2)
- adm.last_scheduled_group = test2
- self.assertTrue(adm.is_action_group_available())
- # Taking 1st
- self.assertEqual(test1, adm.get_next_action_group())
- self.assertTrue(len(adm.last_scheduled_group) == 1)
- self.assertTrue(adm.is_action_group_available())
- # Taking 2nd
- self.assertEqual(test2, adm.get_next_action_group())
- self.assertTrue(len(adm.last_scheduled_group) == 0)
- self.assertTrue(adm.last_scheduled_group is not test2)
- self.assertFalse(adm.is_action_group_available())
-
-
- @patch.object(ActionDependencyManager, "dump_info")
- @patch.object(ActionDependencyManager, "can_be_executed_in_parallel")
- def test_put_action(self, can_be_executed_in_parallel_mock, dump_info_mock):
- can_be_executed_in_parallel_mock.side_effect = [True, False, True, False,
- True, True, True, False]
- adm = ActionDependencyManager(self.config)
-
- adm.put_actions(list(range(0, 8)))
-
- queue = []
- while adm.is_action_group_available():
- next = adm.get_next_action_group()
- queue.append(next)
-
- str = pprint.pformat(queue)
- expected = "[[0], [1, 2], [3, 4, 5, 6], [7]]"
- self.assertEqual(str, expected)
-
-
- # TODO: disabled for now
- def disabled_test_can_be_executed_in_parallel(self):
- adm = ActionDependencyManager(self.config)
- # empty group
- group = []
- install_command = {
- 'role': 'DATANODE',
- 'roleCommand': 'INSTALL',
- 'commandType': ActionQueue.EXECUTION_COMMAND
- }
- upgrade_command = {
- 'role': 'DATANODE',
- 'roleCommand': 'UPGRADE',
- 'commandType': ActionQueue.EXECUTION_COMMAND
- }
- start_command = {
- 'role': 'DATANODE',
- 'roleCommand': 'START',
- 'commandType': ActionQueue.EXECUTION_COMMAND
- }
- stop_command = {
- 'role': 'DATANODE',
- 'roleCommand': 'STOP',
- 'commandType': ActionQueue.EXECUTION_COMMAND
- }
- status_command = {
- 'commandType': ActionQueue.STATUS_COMMAND
- }
- rm_start_command = {
- 'role': 'RESOURCEMANAGER',
- 'roleCommand': 'START',
- 'commandType': ActionQueue.EXECUTION_COMMAND
- }
- hm_start_command = {
- 'role': 'HBASE_MASTER',
- 'roleCommand': 'START',
- 'commandType': ActionQueue.EXECUTION_COMMAND
- }
- self.assertTrue(adm.can_be_executed_in_parallel(install_command, group))
- self.assertTrue(adm.can_be_executed_in_parallel(status_command, group))
- # multiple status commands
- group = []
- for i in range(0, 3):
- group.append(status_command)
- self.assertTrue(adm.can_be_executed_in_parallel(status_command, group))
- self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
- # new status command
- group = [install_command]
- self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
- # install/upgrade commands
- group = [install_command]
- self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
- self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
- self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
- self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
- group = [upgrade_command]
- self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
- self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
- self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
- self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
- # Other commands
- group = [start_command]
- self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
- self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
- self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
- self.assertTrue(adm.can_be_executed_in_parallel(hm_start_command, group))
- # Check dependency processing
- group = [start_command]
- self.assertFalse(adm.can_be_executed_in_parallel(rm_start_command, group))
- group = [start_command]
- self.assertTrue(adm.can_be_executed_in_parallel(hm_start_command, group))
- # actions for the same component
- group = [start_command]
- self.assertFalse(adm.can_be_executed_in_parallel(stop_command, group))
- group = [stop_command]
- self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
-
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/TestActionQueue.py b/ambari-agent/src/test/python/TestActionQueue.py
index 6fa1407..31643b2 100644
--- a/ambari-agent/src/test/python/TestActionQueue.py
+++ b/ambari-agent/src/test/python/TestActionQueue.py
@@ -17,13 +17,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
+from Queue import Queue
from unittest import TestCase
from ambari_agent.LiveStatus import LiveStatus
from ambari_agent.PuppetExecutor import PuppetExecutor
from ambari_agent.ActionQueue import ActionQueue
from ambari_agent.AmbariConfig import AmbariConfig
-from ambari_agent.ActionDependencyManager import ActionDependencyManager
import os, errno, time, pprint, tempfile, threading
import StringIO
import sys
@@ -31,7 +31,6 @@ from threading import Thread
from mock.mock import patch, MagicMock, call
from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
-from ambari_agent.UpgradeExecutor import UpgradeExecutor
class TestActionQueue(TestCase):
@@ -126,106 +125,79 @@ class TestActionQueue(TestCase):
}
- @patch.object(ActionDependencyManager, "read_dependencies")
- @patch.object(ActionDependencyManager, "get_next_action_group")
- @patch.object(ActionQueue, "process_portion_of_actions")
- def test_ActionQueueStartStop(self, process_portion_of_actions_mock,
- get_next_action_group_mock, read_dependencies_mock):
+ @patch.object(ActionQueue, "process_command")
+ @patch.object(Queue, "get")
+ def test_ActionQueueStartStop(self, get_mock, process_command_mock):
actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
actionQueue.start()
time.sleep(0.1)
actionQueue.stop()
actionQueue.join()
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
- self.assertTrue(get_next_action_group_mock.call_count > 1)
- self.assertTrue(process_portion_of_actions_mock.call_count > 1)
+ self.assertTrue(process_command_mock.call_count > 1)
- @patch.object(ActionDependencyManager, "read_dependencies")
+ @patch("traceback.print_exc")
@patch.object(ActionQueue, "execute_command")
@patch.object(ActionQueue, "execute_status_command")
- def test_process_portion_of_actions(self, execute_status_command_mock,
- executeCommand_mock, read_dependencies_mock):
+ def test_process_command(self, execute_status_command_mock,
+ execute_command_mock, print_exc_mock):
actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
- # Test execution of EXECUTION_COMMANDs
- max = 3
- actionQueue.MAX_CONCURRENT_ACTIONS = max
- unfreeze_flag = threading.Event()
- sync_lock = threading.RLock()
- stats = {
- 'waiting_threads' : 0
+ execution_command = {
+ 'commandType' : ActionQueue.EXECUTION_COMMAND,
}
- def side_effect(self):
- with sync_lock: # Synchtonized to avoid race effects during test execution
- stats['waiting_threads'] += 1
- unfreeze_flag.wait()
- executeCommand_mock.side_effect = side_effect
- portion = [self.datanode_install_command,
- self.namenode_install_command,
- self.snamenode_install_command,
- self.nagios_install_command,
- self.hbase_install_command]
+ status_command = {
+ 'commandType' : ActionQueue.STATUS_COMMAND,
+ }
+ wrong_command = {
+ 'commandType' : "SOME_WRONG_COMMAND",
+ }
+ # Try wrong command
+ actionQueue.process_command(wrong_command)
+ self.assertFalse(execute_command_mock.called)
+ self.assertFalse(execute_status_command_mock.called)
+ self.assertFalse(print_exc_mock.called)
- # We call method in a separate thread
- action_thread = Thread(target = actionQueue.process_portion_of_actions, args = (portion, ))
- action_thread.start()
- # Now we wait to check that only MAX_CONCURRENT_ACTIONS threads are running
- while stats['waiting_threads'] != max:
- time.sleep(0.1)
- self.assertEqual(stats['waiting_threads'], max)
- # unfreezing waiting threads
- unfreeze_flag.set()
- # wait until all threads are finished
- action_thread.join()
- self.assertTrue(executeCommand_mock.call_count == 5)
+ execute_command_mock.reset_mock()
+ execute_status_command_mock.reset_mock()
+ print_exc_mock.reset_mock()
+ # Try normal execution
+ actionQueue.process_command(execution_command)
+ self.assertTrue(execute_command_mock.called)
self.assertFalse(execute_status_command_mock.called)
- executeCommand_mock.reset_mock()
+ self.assertFalse(print_exc_mock.called)
+
+ execute_command_mock.reset_mock()
execute_status_command_mock.reset_mock()
+ print_exc_mock.reset_mock()
- # Test execution of STATUS_COMMANDs
- n = 5
- portion = []
- for i in range(0, n):
- status_command = {
- 'componentName': 'DATANODE',
- 'commandType': 'STATUS_COMMAND',
- }
- portion.append(status_command)
- actionQueue.process_portion_of_actions(portion)
- self.assertTrue(execute_status_command_mock.call_count == n)
- self.assertFalse(executeCommand_mock.called)
-
- # Test execution of unknown command
- unknown_command = {
- 'commandType': 'WRONG_COMMAND',
- }
- portion = [unknown_command]
- actionQueue.process_portion_of_actions(portion)
- # no exception expected
- pass
+ actionQueue.process_command(status_command)
+ self.assertFalse(execute_command_mock.called)
+ self.assertTrue(execute_status_command_mock.called)
+ self.assertFalse(print_exc_mock.called)
+ execute_command_mock.reset_mock()
+ execute_status_command_mock.reset_mock()
+ print_exc_mock.reset_mock()
- @patch("traceback.print_exc")
- @patch.object(ActionDependencyManager, "read_dependencies")
- @patch.object(ActionQueue, "execute_command")
- def test_execute_command_safely(self, execute_command_mock,
- read_dependencies_mock, print_exc_mock):
- actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
- # Try normal execution
- actionQueue.execute_command_safely('command')
- # Try exception ro check proper logging
+ # Try exception to check proper logging
def side_effect(self):
raise Exception("TerribleException")
execute_command_mock.side_effect = side_effect
- actionQueue.execute_command_safely('command')
+ actionQueue.process_command(execution_command)
+ self.assertTrue(print_exc_mock.called)
+
+ print_exc_mock.reset_mock()
+
+ execute_status_command_mock.side_effect = side_effect
+ actionQueue.process_command(execution_command)
self.assertTrue(print_exc_mock.called)
+
@patch("__builtin__.open")
@patch.object(ActionQueue, "status_update_callback")
- @patch.object(ActionDependencyManager, "read_dependencies")
- def test_execute_command(self, read_dependencies_mock,
- status_update_callback_mock, open_mock):
+ def test_execute_command(self, status_update_callback_mock, open_mock):
# Make file read calls visible
def open_side_effect(file, mode):
if mode == 'r':
@@ -251,10 +223,7 @@ class TestActionQueue(TestCase):
def patched_aq_execute_command(command):
# We have to perform patching for separate thread in the same thread
with patch.object(PuppetExecutor, "runCommand") as runCommand_mock:
- with patch.object(UpgradeExecutor, "perform_stack_upgrade") \
- as perform_stack_upgrade_mock:
runCommand_mock.side_effect = side_effect
- perform_stack_upgrade_mock.side_effect = side_effect
actionQueue.execute_command(command)
### Test install/start/stop command ###
## Test successful execution with configuration tags
@@ -379,11 +348,10 @@ class TestActionQueue(TestCase):
@patch.object(ActionQueue, "status_update_callback")
@patch.object(StackVersionsFileHandler, "read_stack_version")
- @patch.object(ActionDependencyManager, "read_dependencies")
@patch.object(ActionQueue, "execute_command")
@patch.object(LiveStatus, "build")
def test_execute_status_command(self, build_mock, execute_command_mock,
- read_dependencies_mock, read_stack_version_mock,
+ read_stack_version_mock,
status_update_callback):
actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
build_mock.return_value = "dummy report"
@@ -392,4 +360,4 @@ class TestActionQueue(TestCase):
report = actionQueue.result()
expected = 'dummy report'
self.assertEqual(len(report['componentStatus']), 1)
- self.assertEqual(report['componentStatus'][0], expected)
\ No newline at end of file
+ self.assertEqual(report['componentStatus'][0], expected)
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/TestController.py b/ambari-agent/src/test/python/TestController.py
index 79340ed..2b0e614 100644
--- a/ambari-agent/src/test/python/TestController.py
+++ b/ambari-agent/src/test/python/TestController.py
@@ -23,7 +23,6 @@ import StringIO
import ssl
import unittest, threading
from ambari_agent import Controller, ActionQueue
-from ambari_agent.ActionDependencyManager import ActionDependencyManager
from ambari_agent import hostname
import sys
from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE
@@ -153,9 +152,7 @@ class TestController(unittest.TestCase):
@patch("urllib2.build_opener")
@patch("urllib2.install_opener")
@patch.object(ActionQueue.ActionQueue, "run")
- @patch.object(ActionDependencyManager, "read_dependencies")
- @patch.object(ActionDependencyManager, "dump_info")
- def test_repeatRegistration(self, dump_info_mock, read_dependencies_mock,
+ def test_repeatRegistration(self,
run_mock, installMock, buildMock):
registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/TestHeartbeat.py b/ambari-agent/src/test/python/TestHeartbeat.py
index cceb764..5ef5d72 100644
--- a/ambari-agent/src/test/python/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/TestHeartbeat.py
@@ -21,7 +21,6 @@ limitations under the License.
from unittest import TestCase
import unittest
from ambari_agent.Heartbeat import Heartbeat
-from ambari_agent.ActionDependencyManager import ActionDependencyManager
from ambari_agent.ActionQueue import ActionQueue
from ambari_agent.LiveStatus import LiveStatus
from ambari_agent import AmbariConfig
@@ -47,8 +46,7 @@ class TestHeartbeat(TestCase):
sys.stdout = sys.__stdout__
- @patch.object(ActionDependencyManager, "read_dependencies")
- def test_build(self, read_dependencies_mock):
+ def test_build(self):
actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
heartbeat = Heartbeat(actionQueue)
result = heartbeat.build(100)
@@ -66,12 +64,9 @@ class TestHeartbeat(TestCase):
self.assertEquals(not heartbeat.reports, True, "Heartbeat should not contain task in progress")
- @patch.object(ActionDependencyManager, "read_dependencies")
@patch.object(ActionQueue, "result")
- @patch.object(ActionDependencyManager, "is_action_group_available")
@patch.object(HostInfo, "register")
- def test_no_mapping(self, register_mock, is_action_group_available_mock, result_mock,
- read_dependencies_mock):
+ def test_no_mapping(self, register_mock, result_mock):
result_mock.return_value = {
'reports': [{'status': 'IN_PROGRESS',
'stderr': 'Read from /tmp/errors-3.txt',
@@ -95,11 +90,8 @@ class TestHeartbeat(TestCase):
self.assertEqual(register_mock.call_args_list[0][0][1], False)
- @patch.object(ActionDependencyManager, "read_dependencies")
@patch.object(ActionQueue, "result")
- @patch.object(ActionDependencyManager, "is_action_group_available")
- def test_build_long_result(self, is_action_group_available_mock, result_mock,
- read_dependencies_mock):
+ def test_build_long_result(self, result_mock):
actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
result_mock.return_value = {
'reports': [{'status': 'IN_PROGRESS',
@@ -184,22 +176,17 @@ class TestHeartbeat(TestCase):
self.assertEquals(hb, expected)
- @patch.object(ActionDependencyManager, "read_dependencies")
- @patch.object(ActionDependencyManager, "dump_info")
- @patch.object(ActionDependencyManager, "can_be_executed_in_parallel")
@patch.object(HostInfo, 'register')
- def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock,
- can_be_executed_in_parallel_mock, dump_info_mock, read_dependencies_mock):
- can_be_executed_in_parallel_mock.return_value = False
+ def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
statusCommand = {
"serviceName" : 'HDFS',
"commandType" : "STATUS_COMMAND",
- "clusterName" : "",
+ "clusterName" : "c1",
"componentName" : "DATANODE",
'configurations':{'global' : {}}
}
- actionQueue.put(list(statusCommand))
+ actionQueue.put([statusCommand])
heartbeat = Heartbeat(actionQueue)
heartbeat.build(12, 6)
@@ -209,9 +196,8 @@ class TestHeartbeat(TestCase):
self.assertFalse(args[1])
- @patch.object(ActionDependencyManager, "read_dependencies")
@patch.object(HostInfo, 'register')
- def test_heartbeat_host_check_no_cmd(self, register_mock, read_dependencies_mock):
+ def test_heartbeat_host_check_no_cmd(self, register_mock):
actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
heartbeat = Heartbeat(actionQueue)
heartbeat.build(12, 6)
@@ -222,4 +208,4 @@ class TestHeartbeat(TestCase):
if __name__ == "__main__":
- unittest.main(verbosity=2)
\ No newline at end of file
+ unittest.main(verbosity=2)
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestUpgradeExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/TestUpgradeExecutor.py b/ambari-agent/src/test/python/TestUpgradeExecutor.py
deleted file mode 100644
index 7abb959..0000000
--- a/ambari-agent/src/test/python/TestUpgradeExecutor.py
+++ /dev/null
@@ -1,264 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-from unittest import TestCase
-import unittest
-import StringIO
-import socket
-import os, sys, pprint, json
-from mock.mock import patch
-from mock.mock import MagicMock
-from mock.mock import create_autospec
-import os, errno, tempfile
-from ambari_agent import UpgradeExecutor
-import logging
-from ambari_agent import AmbariConfig
-from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
-
-class TestUpgradeExecutor(TestCase):
-
- logger = logging.getLogger()
-
- @patch.object(StackVersionsFileHandler, 'write_stack_version')
- @patch('os.path.isdir')
- def test_perform_stack_upgrade(self, isdir_method, write_stack_version_method):
- puppetExecutor = MagicMock()
- executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
- puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
- # Checking matching versions
- command = {
- 'commandParams' : {
- 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
- 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
- },
- 'role' : 'HDFS'
- }
- result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
- self.assertTrue('matches current stack version' in result['stdout'])
- self.assertFalse(write_stack_version_method.called)
- # Checking unsupported update
- write_stack_version_method.reset()
- command = {
- 'commandParams' : {
- 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}',
- 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
- },
- 'role' : 'HDFS'
- }
- isdir_method.return_value = False
- result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
- self.assertTrue('not supported' in result['stderr'])
- self.assertFalse(write_stack_version_method.called)
- # Checking wrong source version
- write_stack_version_method.reset()
- command = {
- 'commandParams' : {
- 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"Wrong\"}',
- 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
- },
- 'role' : 'HDFS'
- }
- result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
- self.assertTrue('does not match pattern' in result['stderr'])
- self.assertFalse(write_stack_version_method.called)
- # Checking wrong target version
- write_stack_version_method.reset()
- command = {
- 'commandParams' : {
- 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
- 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"Wrong\"}',
- },
- 'role' : 'HDFS'
- }
- result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
- self.assertTrue('does not match pattern' in result['stderr'])
- self.assertFalse(write_stack_version_method.called)
- # Checking successful result
- write_stack_version_method.reset()
- command = {
- 'commandParams' : {
- 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}',
- 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
- },
- 'role' : 'HDFS'
- }
- isdir_method.return_value = True
- executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr : \
- {
- 'exitcode' : 0,
- 'stdout' : "output - %s" % dir,
- 'stderr' : "errors - %s" % dir,
- }
- result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
- self.assertTrue(write_stack_version_method.called)
- self.assertEquals(result['exitcode'],0)
- self.assertEquals(result['stdout'],'output - pre-upgrade.d\noutput - upgrade.d\noutput - post-upgrade.d')
- self.assertEquals(result['stderr'],'errors - pre-upgrade.d\nerrors - upgrade.d\nerrors - post-upgrade.d')
- # Checking failed result
- write_stack_version_method.reset()
- command = {
- 'commandParams' : {
- 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}',
- 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
- },
- 'role' : 'HDFS'
- }
- isdir_method.return_value = True
- executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr :\
- {
- 'exitcode' : 1,
- 'stdout' : "output - %s" % dir,
- 'stderr' : "errors - %s" % dir,
- }
- result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
- self.assertTrue(write_stack_version_method.called)
- self.assertEquals(result['exitcode'],1)
- self.assertEquals(result['stdout'],'output - pre-upgrade.d')
- self.assertEquals(result['stderr'],'errors - pre-upgrade.d')
-
-
- def test_get_key_func(self):
- executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
- 'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
- # Checking unparseable
- self.assertEqual(executor.get_key_func('fdsfds'), 999)
- self.assertEqual(executor.get_key_func('99dfsfd'), 999)
- self.assertEqual(executor.get_key_func('-fdfds'), 999)
- # checking parseable
- self.assertEqual(executor.get_key_func('99'), 99)
- self.assertEqual(executor.get_key_func('45-install'), 45)
- self.assertEqual(executor.get_key_func('33-install-staff'), 33)
- #checking sorting of full list
- testlist1 = ['7-fdfd', '10-erewfds', '11-fdfdfd', '1-hh', '20-kk', '01-tt']
- testlist1.sort(key = executor.get_key_func)
- self.assertEqual(testlist1,
- ['1-hh', '01-tt', '7-fdfd', '10-erewfds', '11-fdfdfd', '20-kk'])
-
-
- def test_split_stack_version(self):
- executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
- 'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
- result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1.2.1\"}')
- self.assertEquals(result, ('HDP', '1', '2'))
- result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1.3\"}')
- self.assertEquals(result, ('HDP', '1', '3'))
- result = executor.split_stack_version('{\"stackName\":\"ComplexStackVersion\",\"stackVersion\":\"1.3.4.2.2\"}')
- self.assertEquals(result, ('ComplexStackVersion', '1', '3'))
- result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1\"}')
- self.assertEquals(result, None)
- pass
-
-
- @patch('os.listdir')
- @patch('os.path.isdir')
- @patch.object(UpgradeExecutor.UpgradeExecutor, 'get_key_func')
- def test_execute_dir(self, get_key_func_method, isdir_method, listdir_method):
- pythonExecutor = MagicMock()
- puppetExecutor = MagicMock()
-
- command = {'debug': 'command'}
- isdir_method.return_value = True
- # Mocking sort() method of list
- class MyList(list):
- pass
- files = MyList(['first.py', 'second.pp', 'third.py', 'fourth.nm',
- 'fifth-failing.py', 'six.py'])
- files.sort = lambda key: None
- listdir_method.return_value = files
- # fifth-failing.py will fail
- pythonExecutor.run_file.side_effect = [
- {'exitcode' : 0,
- 'stdout' : "stdout - first.py",
- 'stderr' : "stderr - first.py"},
- {'exitcode' : 0,
- 'stdout' : "stdout - third.py",
- 'stderr' : "stderr - third.py"},
- {'exitcode' : 1,
- 'stdout' : "stdout - fifth-failing.py",
- 'stderr' : "stderr - fifth-failing.py"},
- {'exitcode' : 0,
- 'stdout' : "stdout - six.py",
- 'stderr' : "stderr - six.py"},
- ]
- puppetExecutor.run_manifest.side_effect = [
- {'exitcode' : 0,
- 'stdout' : "stdout - second.pp",
- 'stderr' : "stderr - second.pp"},
- ]
-
- executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
- puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
- result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr')
- self.assertEquals(result['exitcode'],1)
- self.assertEquals(result['stdout'],"\nstdout - first.py\nstdout - second.pp\nstdout - third.py\nUnrecognized file type, skipping: basedir/dir/fourth.nm\nstdout - fifth-failing.py")
- self.assertEquals(result['stderr'],"\nstderr - first.py\nstderr - second.pp\nstderr - third.py\nNone\nstderr - fifth-failing.py")
-
-
- @patch('os.path.isdir')
- def test_execute_dir_not_existing(self, isdir_method):
- pythonExecutor = MagicMock()
- puppetExecutor = MagicMock()
-
- command = {'debug': 'command'}
- isdir_method.return_value = False
-
- executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
- puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
- result= executor.execute_dir(command, 'basedir', 'not_existing_dir', 'tmpout', 'tmperr')
- self.assertEquals(result['exitcode'],0)
- self.assertEquals(result['stdout'],'Script directory basedir/not_existing_dir does not exist, skipping')
- self.assertEquals(result['stderr'],'None')
-
-
- @patch('os.listdir')
- @patch('os.path.isdir')
- def test_execute_dir_ignore_badly_named(self, isdir_method, listdir_method):
- pythonExecutor = MagicMock()
- puppetExecutor = MagicMock()
-
- command = {'debug': 'command'}
- isdir_method.return_value = True
- files = ['00-first.py', 'badly-named.pp', '10-second.pp', '20-wrong.cpp']
- listdir_method.return_value = files
- # fifth-failing.py will fail
- pythonExecutor.run_file.side_effect = [
- {'exitcode' : 0,
- 'stdout' : "stdout - python.py",
- 'stderr' : "stderr - python.py"},
- ]
- puppetExecutor.run_manifest.side_effect = [
- {'exitcode' : 0,
- 'stdout' : "stdout - puppet.pp",
- 'stderr' : "stderr - puppet.pp"},
- ]
-
- executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
- puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
- result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr')
- self.assertEquals(result['exitcode'],0)
- self.assertEquals(result['stdout'],'\nstdout - python.py\nstdout - puppet.pp\nUnrecognized file type, skipping: basedir/dir/20-wrong.cpp')
- self.assertEquals(result['stderr'],'\nstderr - python.py\nstderr - puppet.pp\nNone')
-
-if __name__ == "__main__":
- unittest.main(verbosity=2)
\ No newline at end of file