You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ha...@apache.org on 2018/07/26 18:02:23 UTC

[ambari] branch trunk updated: [AMBARI-23444] Allow Custom Hooks on a Per-Role Basis (dgrinenko) (#1860)

This is an automated email from the ASF dual-hosted git repository.

hapylestat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f12f5e2  [AMBARI-23444] Allow Custom Hooks on a Per-Role Basis (dgrinenko) (#1860)
f12f5e2 is described below

commit f12f5e2135ed65aaa0fb009aff84cd4bdb80d81f
Author: Dmytro Grinenko <ha...@gmail.com>
AuthorDate: Thu Jul 26 21:02:21 2018 +0300

    [AMBARI-23444] Allow Custom Hooks on a Per-Role Basis (dgrinenko) (#1860)
    
    [AMBARI-23444] Allow Custom Hooks on a Per-Role Basis (dgrinenko)
---
 .../src/main/python/ambari_agent/ActionQueue.py    | 305 ++++++++-------------
 .../src/main/python/ambari_agent/AmbariConfig.py   |   4 +-
 .../BackgroundCommandExecutionHandle.py            |  11 +-
 .../ambari_agent/CommandHooksOrchestrator.py       | 168 ++++++++++++
 .../main/python/ambari_agent/CommandStatusDict.py  |  25 +-
 .../python/ambari_agent/ComponentStatusExecutor.py |  36 +--
 .../ambari_agent/CustomServiceOrchestrator.py      | 281 +++++++++----------
 .../src/main/python/ambari_agent/FileCache.py      |  75 +++--
 .../main/python/ambari_agent/InitializerModule.py  |   3 +
 .../src/main/python/ambari_agent/LiveStatus.py     |  13 +-
 .../src/main/python/ambari_agent/PythonExecutor.py | 207 ++++++--------
 .../ambari_agent/PythonReflectiveExecutor.py       |  12 +-
 .../main/python/ambari_agent/RecoveryManager.py    |  25 +-
 .../main/python/ambari_agent/models/__init__.py    |  12 -
 .../main/python/ambari_agent/models/commands.py    |  36 ++-
 .../src/main/python/ambari_agent/models/hooks.py   |  13 +-
 .../test/python/ambari_agent/TestActionQueue.py    |  11 +-
 .../ambari_agent/TestCommandHooksOrchestrator.py   |  89 ++++++
 .../ambari_agent/TestCustomServiceOrchestrator.py  |   8 +-
 .../src/test/python/ambari_agent/TestFileCache.py  |  28 +-
 .../test/python/ambari_agent/TestPythonExecutor.py |  10 +-
 .../resource_management/libraries/script/hook.py   |  17 +-
 .../custom_actions/scripts/ru_execute_tasks.py     |  13 +-
 .../stack-hooks/after-INSTALL/scripts/hook.py      |   2 +
 .../stack-hooks/before-ANY/scripts/hook.py         |   7 +-
 .../stack-hooks/before-INSTALL/scripts/hook.py     |   8 +-
 .../stack-hooks/before-RESTART/scripts/hook.py     |   3 +-
 .../stack-hooks/before-SET_KEYTAB/scripts/hook.py  |   3 +-
 .../stack-hooks/before-START/scripts/hook.py       |   8 +-
 29 files changed, 784 insertions(+), 649 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index e1830f4..393e651 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -1,6 +1,4 @@
-#!/usr/bin/env python
-
-'''
+"""
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -16,11 +14,11 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-'''
+"""
+
 import Queue
 
 import logging
-import traceback
 import threading
 import pprint
 import os
@@ -29,11 +27,9 @@ import time
 import signal
 
 from AgentException import AgentException
-from LiveStatus import LiveStatus
-from ActualConfigHandler import ActualConfigHandler
 from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
+from ambari_agent.models.commands import AgentCommand, CommandStatus
 from ambari_commons.str_utils import split_on_chunks
-from resource_management.libraries.script import Script
 
 
 logger = logging.getLogger()
@@ -41,6 +37,7 @@ installScriptHash = -1
 
 MAX_SYMBOLS_PER_LOG_MESSAGE = 7900
 
+
 class ActionQueue(threading.Thread):
   """ Action Queue for the agent. We pick one command at a time from the queue
   and execute it
@@ -50,27 +47,9 @@ class ActionQueue(threading.Thread):
   # How many actions can be performed in parallel. Feel free to change
   MAX_CONCURRENT_ACTIONS = 5
 
-
-  #How much time(in seconds) we need wait for new incoming execution command before checking
-  #status command queue
+  # How much time(in seconds) we need wait for new incoming execution command before checking status command queue
   EXECUTION_COMMAND_WAIT_TIME = 2
 
-  STATUS_COMMAND = 'STATUS_COMMAND'
-  EXECUTION_COMMAND = 'EXECUTION_COMMAND'
-  AUTO_EXECUTION_COMMAND = 'AUTO_EXECUTION_COMMAND'
-  BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND'
-  ROLE_COMMAND_INSTALL = 'INSTALL'
-  ROLE_COMMAND_START = 'START'
-  ROLE_COMMAND_STOP = 'STOP'
-  ROLE_COMMAND_CUSTOM_COMMAND = 'CUSTOM_COMMAND'
-  CUSTOM_COMMAND_RESTART = 'RESTART'
-  CUSTOM_COMMAND_RECONFIGURE = 'RECONFIGURE'
-  CUSTOM_COMMAND_START = ROLE_COMMAND_START
-
-  IN_PROGRESS_STATUS = 'IN_PROGRESS'
-  COMPLETED_STATUS = 'COMPLETED'
-  FAILED_STATUS = 'FAILED'
-
   def __init__(self, initializer_module):
     super(ActionQueue, self).__init__()
     self.commandQueue = Queue.Queue()
@@ -92,17 +71,15 @@ class ActionQueue(threading.Thread):
 
   def put(self, commands):
     for command in commands:
-      if not command.has_key('serviceName'):
-        command['serviceName'] = "null"
-      if not command.has_key('clusterId'):
-        command['clusterId'] = "null"
-
-      logger.info("Adding " + command['commandType'] + " for role " + \
-                  command['role'] + " for service " + \
-                  command['serviceName'] + " of cluster_id " + \
-                  command['clusterId'] + " to the queue.")
-      if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND :
-        self.backgroundCommandQueue.put(self.createCommandHandle(command))
+      if "serviceName" not in command:
+        command["serviceName"] = "null"
+      if "clusterId" not in command:
+        command["clusterId"] = "null"
+
+      logger.info("Adding {commandType} for role {role} for service {serviceName} of cluster_id {clusterId} to the queue".format(**command))
+
+      if command['commandType'] == AgentCommand.background_execution:
+        self.backgroundCommandQueue.put(self.create_command_handle(command))
       else:
         self.commandQueue.put(command)
 
@@ -128,10 +105,9 @@ class ActionQueue(threading.Thread):
         if queued_command['taskId'] != task_id:
           self.commandQueue.put(queued_command)
         else:
-          logger.info("Canceling " + queued_command['commandType'] + \
-                      " for service " + queued_command['serviceName'] + \
-                      " and role " +  queued_command['role'] + \
-                      " with taskId " + str(queued_command['taskId']))
+          logger.info("Canceling {commandType} for service {serviceName} and role {role} with taskId {taskId}".format(
+            **queued_command
+          ))
 
       # Kill if in progress
       self.customServiceOrchestrator.cancel_command(task_id, reason)
@@ -141,8 +117,8 @@ class ActionQueue(threading.Thread):
   def run(self):
     while not self.stop_event.is_set():
       try:
-        self.processBackgroundQueueSafeEmpty()
-        self.fillRecoveryCommands()
+        self.process_background_queue_safe_empty()
+        self.fill_recovery_commands()
         try:
           if self.parallel_execution == 0:
             command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
@@ -181,31 +157,32 @@ class ActionQueue(threading.Thread):
         logger.exception("ActionQueue thread failed with exception. Re-running it")
     logger.info("ActionQueue thread has successfully finished")
 
-  def fillRecoveryCommands(self):
+  def fill_recovery_commands(self):
     if self.recovery_manager.enabled() and not self.tasks_in_progress_or_pending():
       self.put(self.recovery_manager.get_recovery_commands())
 
-  def processBackgroundQueueSafeEmpty(self):
+  def process_background_queue_safe_empty(self):
     while not self.backgroundCommandQueue.empty():
       try:
         command = self.backgroundCommandQueue.get(False)
-        if command.has_key('__handle') and command['__handle'].status == None:
+        if "__handle" in command and command["__handle"].status is None:
           self.process_command(command)
       except Queue.Empty:
         pass
 
-  def createCommandHandle(self, command):
-    if command.has_key('__handle'):
+  def create_command_handle(self, command):
+    if "__handle" in command:
       raise AgentException("Command already has __handle")
+
     command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], None, self.on_background_command_complete_callback)
     return command
 
   def process_command(self, command):
     # make sure we log failures
-    commandType = command['commandType']
-    logger.debug("Took an element of Queue (command type = %s).", commandType)
+    command_type = command['commandType']
+    logger.debug("Took an element of Queue (command type = %s).", command_type)
     try:
-      if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
+      if command_type in AgentCommand.AUTO_EXECUTION_COMMAND_GROUP:
         try:
           if self.recovery_manager.enabled():
             self.recovery_manager.on_execution_command_start()
@@ -218,23 +195,30 @@ class ActionQueue(threading.Thread):
       else:
         logger.error("Unrecognized command %s", pprint.pformat(command))
     except Exception:
-      logger.exception("Exception while processing {0} command".format(commandType))
+      logger.exception("Exception while processing {0} command".format(command_type))
 
   def tasks_in_progress_or_pending(self):
     return not self.commandQueue.empty() or self.recovery_manager.has_active_command()
 
   def execute_command(self, command):
-    '''
+    """
     Executes commands of type EXECUTION_COMMAND
-    '''
-    clusterId = command['clusterId']
-    commandId = command['commandId']
-    isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND
-    isAutoExecuteCommand = command['commandType'] == self.AUTO_EXECUTION_COMMAND
+    """
+    cluster_id = command['clusterId']
+    command_id = command['commandId']
+    command_type = command['commandType']
+
+    num_attempts = 0
+    retry_duration = 0  # even with 0 allow one attempt
+    retry_able = False
+    delay = 1
+    log_command_output = True
+    command_canceled = False
+    command_result = {}
+
     message = "Executing command with id = {commandId}, taskId = {taskId} for role = {role} of " \
-              "cluster_id {cluster}.".format(
-              commandId = str(commandId), taskId = str(command['taskId']),
-              role=command['role'], cluster=clusterId)
+              "cluster_id {cluster}.".format(commandId=str(command_id), taskId=str(command['taskId']),
+              role=command['role'], cluster=cluster_id)
     logger.info(message)
 
     taskId = command['taskId']
@@ -242,47 +226,41 @@ class ActionQueue(threading.Thread):
     in_progress_status = self.commandStatuses.generate_report_template(command)
     # The path of the files that contain the output log and error log use a prefix that the agent advertises to the
     # server. The prefix is defined in agent-config.ini
-    if not isAutoExecuteCommand:
+    if command_type != AgentCommand.auto_execution:
       in_progress_status.update({
         'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
         'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
-        'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
-        'status': self.IN_PROGRESS_STATUS
+        'structuredOut': self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
+        'status': CommandStatus.in_progress
       })
     else:
       in_progress_status.update({
         'tmpout': self.tmpdir + os.sep + 'auto_output-' + str(taskId) + '.txt',
         'tmperr': self.tmpdir + os.sep + 'auto_errors-' + str(taskId) + '.txt',
-        'structuredOut' : self.tmpdir + os.sep + 'auto_structured-out-' + str(taskId) + '.json',
-        'status': self.IN_PROGRESS_STATUS
+        'structuredOut': self.tmpdir + os.sep + 'auto_structured-out-' + str(taskId) + '.json',
+        'status': CommandStatus.in_progress
       })
 
     self.commandStatuses.put_command_status(command, in_progress_status)
 
-    numAttempts = 0
-    retryDuration = 0  # even with 0 allow one attempt
-    retryAble = False
-    delay = 1
-    log_command_output = True
-    if 'commandParams' in command and 'log_output' in command['commandParams'] and "false" == command['commandParams']['log_output']:
-      log_command_output = False
-
     if 'commandParams' in command:
       if 'max_duration_for_retries' in command['commandParams']:
-        retryDuration = int(command['commandParams']['max_duration_for_retries'])
-      if 'command_retry_enabled' in command['commandParams']:
-        retryAble = command['commandParams']['command_retry_enabled'] == "true"
-    if isAutoExecuteCommand:
-      retryAble = False
-
-    logger.info("Command execution metadata - taskId = {taskId}, retry enabled = {retryAble}, max retry duration (sec) = {retryDuration}, log_output = {log_command_output}".
-                 format(taskId=taskId, retryAble=retryAble, retryDuration=retryDuration, log_command_output=log_command_output))
-    command_canceled = False
+        retry_duration = int(command['commandParams']['max_duration_for_retries'])
+      if 'command_retry_enabled' in command['commandParams'] and command_type != AgentCommand.auto_execution:
+        #  for AgentCommand.auto_execution command retry_able should be always false
+        retry_able = command['commandParams']['command_retry_enabled'] == "true"
+      if 'log_output' in command['commandParams']:
+        log_command_output = command['commandParams']['log_output'] != "false"
+
+    logger.info("Command execution metadata - taskId = {taskId}, retry enabled = {retryAble}, max retry duration (sec)"
+                " = {retryDuration}, log_output = {log_command_output}".format(
+      taskId=taskId, retryAble=retry_able, retryDuration=retry_duration, log_command_output=log_command_output))
 
     self.cancelEvent.clear()
-    self.taskIdsToCancel.discard(taskId) # for case of command reschedule (e.g. command and cancel for the same taskId are send at the same time)
+    # for case of command reschedule (e.g. command and cancel for the same taskId are send at the same time)
+    self.taskIdsToCancel.discard(taskId)
 
-    while retryDuration >= 0:
+    while retry_duration >= 0:
       if taskId in self.taskIdsToCancel:
         logger.info('Command with taskId = {0} canceled'.format(taskId))
         command_canceled = True
@@ -290,43 +268,43 @@ class ActionQueue(threading.Thread):
         self.taskIdsToCancel.discard(taskId)
         break
 
-      numAttempts += 1
+      num_attempts += 1
       start = 0
-      if retryAble:
+      if retry_able:
         start = int(time.time())
       # running command
-      commandresult = self.customServiceOrchestrator.runCommand(command,
-                                                                in_progress_status['tmpout'],
-                                                                in_progress_status['tmperr'],
-                                                                override_output_files=numAttempts == 1,
-                                                                retry=numAttempts > 1)
+      command_result = self.customServiceOrchestrator.runCommand(command,
+                                                                 in_progress_status['tmpout'],
+                                                                 in_progress_status['tmperr'],
+                                                                 override_output_files=num_attempts == 1,
+                                                                 retry=num_attempts > 1)
       end = 1
-      if retryAble:
+      if retry_able:
         end = int(time.time())
-      retryDuration -= (end - start)
+      retry_duration -= (end - start)
 
       # dumping results
-      if isCommandBackground:
+      if command_type == AgentCommand.background_execution:
         logger.info("Command is background command, quit retrying. Exit code: {exitCode}, retryAble: {retryAble}, retryDuration (sec): {retryDuration}, last delay (sec): {delay}"
-                    .format(cid=taskId, exitCode=commandresult['exitcode'], retryAble=retryAble, retryDuration=retryDuration, delay=delay))
+                    .format(cid=taskId, exitCode=command_result['exitcode'], retryAble=retry_able, retryDuration=retry_duration, delay=delay))
         return
       else:
-        if commandresult['exitcode'] == 0:
-          status = self.COMPLETED_STATUS
+        if command_result['exitcode'] == 0:
+          status = CommandStatus.completed
         else:
-          status = self.FAILED_STATUS
-          if (commandresult['exitcode'] == -signal.SIGTERM) or (commandresult['exitcode'] == -signal.SIGKILL):
+          status = CommandStatus.failed
+          if (command_result['exitcode'] == -signal.SIGTERM) or (command_result['exitcode'] == -signal.SIGKILL):
             logger.info('Command with taskId = {cid} was canceled!'.format(cid=taskId))
             command_canceled = True
             self.taskIdsToCancel.discard(taskId)
             break
 
-      if status != self.COMPLETED_STATUS and retryAble and retryDuration > 0:
+      if status != CommandStatus.completed and retry_able and retry_duration > 0:
         delay = self.get_retry_delay(delay)
-        if delay > retryDuration:
-          delay = retryDuration
-        retryDuration -= delay  # allow one last attempt
-        commandresult['stderr'] += "\n\nCommand failed. Retrying command execution ...\n\n"
+        if delay > retry_duration:
+          delay = retry_duration
+        retry_duration -= delay  # allow one last attempt
+        command_result['stderr'] += "\n\nCommand failed. Retrying command execution ...\n\n"
         logger.info("Retrying command with taskId = {cid} after a wait of {delay}".format(cid=taskId, delay=delay))
         if 'agentLevelParams' not in command:
           command['agentLevelParams'] = {}
@@ -337,108 +315,67 @@ class ActionQueue(threading.Thread):
         continue
       else:
         logger.info("Quit retrying for command with taskId = {cid}. Status: {status}, retryAble: {retryAble}, retryDuration (sec): {retryDuration}, last delay (sec): {delay}"
-                    .format(cid=taskId, status=status, retryAble=retryAble, retryDuration=retryDuration, delay=delay))
+                    .format(cid=taskId, status=status, retryAble=retry_able, retryDuration=retry_duration, delay=delay))
         break
 
     self.taskIdsToCancel.discard(taskId)
 
     # do not fail task which was rescheduled from server
     if command_canceled:
-      with self.lock:
-        with self.commandQueue.mutex:
+      with self.lock, self.commandQueue.mutex:
           for com in self.commandQueue.queue:
             if com['taskId'] == command['taskId']:
-              logger.info('Command with taskId = {cid} was rescheduled by server. '
-                          'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId))
+              logger.info("Command with taskId = {cid} was rescheduled by server. "
+                          "Fail report on cancelled command won't be sent with heartbeat.".format(cid=taskId))
               return
 
     # final result to stdout
-    commandresult['stdout'] += '\n\nCommand completed successfully!\n' if status == self.COMPLETED_STATUS else '\n\nCommand failed after ' + str(numAttempts) + ' tries\n'
-    logger.info('Command with taskId = {cid} completed successfully!'.format(cid=taskId) if status == self.COMPLETED_STATUS else 'Command with taskId = {cid} failed after {attempts} tries'.format(cid=taskId, attempts=numAttempts))
-
-    roleResult = self.commandStatuses.generate_report_template(command)
-    roleResult.update({
-      'stdout': commandresult['stdout'],
-      'stderr': commandresult['stderr'],
-      'exitCode': commandresult['exitcode'],
+    command_result['stdout'] += '\n\nCommand completed successfully!\n' if status == CommandStatus.completed else '\n\nCommand failed after ' + str(num_attempts) + ' tries\n'
+    logger.info('Command with taskId = {cid} completed successfully!'.format(cid=taskId) if status == CommandStatus.completed else 'Command with taskId = {cid} failed after {attempts} tries'.format(cid=taskId, attempts=num_attempts))
+
+    role_result = self.commandStatuses.generate_report_template(command)
+    role_result.update({
+      'stdout': command_result['stdout'],
+      'stderr': command_result['stderr'],
+      'exitCode': command_result['exitcode'],
       'status': status,
     })
 
-    if self.config.has_option("logging","log_command_executes") \
+    if self.config.has_option("logging", "log_command_executes") \
         and int(self.config.get("logging", "log_command_executes")) == 1 \
         and log_command_output:
 
-      if roleResult['stdout'] != '':
+      if role_result['stdout'] != '':
           logger.info("Begin command output log for command with id = " + str(command['taskId']) + ", role = "
                       + command['role'] + ", roleCommand = " + command['roleCommand'])
-          self.log_command_output(roleResult['stdout'], str(command['taskId']))
+          self.log_command_output(role_result['stdout'], str(command['taskId']))
           logger.info("End command output log for command with id = " + str(command['taskId']) + ", role = "
                       + command['role'] + ", roleCommand = " + command['roleCommand'])
 
-      if roleResult['stderr'] != '':
+      if role_result['stderr'] != '':
           logger.info("Begin command stderr log for command with id = " + str(command['taskId']) + ", role = "
                       + command['role'] + ", roleCommand = " + command['roleCommand'])
-          self.log_command_output(roleResult['stderr'], str(command['taskId']))
+          self.log_command_output(role_result['stderr'], str(command['taskId']))
           logger.info("End command stderr log for command with id = " + str(command['taskId']) + ", role = "
                       + command['role'] + ", roleCommand = " + command['roleCommand'])
 
-    if roleResult['stdout'] == '':
-      roleResult['stdout'] = 'None'
-    if roleResult['stderr'] == '':
-      roleResult['stderr'] = 'None'
+    if role_result['stdout'] == '':
+      role_result['stdout'] = 'None'
+    if role_result['stderr'] == '':
+      role_result['stderr'] = 'None'
 
     # let ambari know name of custom command
 
     if 'commandParams' in command and command['commandParams'].has_key('custom_command'):
-      roleResult['customCommand'] = command['commandParams']['custom_command']
+      role_result['customCommand'] = command['commandParams']['custom_command']
 
-    if 'structuredOut' in commandresult:
-      roleResult['structuredOut'] = str(json.dumps(commandresult['structuredOut']))
+    if 'structuredOut' in command_result:
+      role_result['structuredOut'] = str(json.dumps(command_result['structuredOut']))
     else:
-      roleResult['structuredOut'] = ''
-
-    # let recovery manager know the current state
-    if status == self.COMPLETED_STATUS:
-      # let ambari know that configuration tags were applied
-      configHandler = ActualConfigHandler(self.config, self.configTags)
-      """
-      #update
-      if 'commandParams' in command:
-        command_params = command['commandParams']
-        if command_params and command_params.has_key('forceRefreshConfigTags') and len(command_params['forceRefreshConfigTags']) > 0  :
-          forceRefreshConfigTags = command_params['forceRefreshConfigTags'].split(',')
-          logger.info("Got refresh additional component tags command")
-
-          for configTag in forceRefreshConfigTags :
-            configHandler.update_component_tag(command['role'], configTag, command['configurationTags'][configTag])
-
-          roleResult['customCommand'] = self.CUSTOM_COMMAND_RESTART # force restart for component to evict stale_config on server side
-          command['configurationTags'] = configHandler.read_actual_component(command['role'])
-
-      if command.has_key('configurationTags'):
-        configHandler.write_actual(command['configurationTags'])
-        roleResult['configurationTags'] = command['configurationTags']
-      component = {'serviceName':command['serviceName'],'componentName':command['role']}
-      if 'roleCommand' in command and \
-          (command['roleCommand'] == self.ROLE_COMMAND_START or
-             (command['roleCommand'] == self.ROLE_COMMAND_INSTALL and component in LiveStatus.CLIENT_COMPONENTS) or
-               (command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND and
-                  'custom_command' in command['hostLevelParams'] and
-                      command['hostLevelParams']['custom_command'] in (self.CUSTOM_COMMAND_RESTART,
-                                                                       self.CUSTOM_COMMAND_START,
-                                                                       self.CUSTOM_COMMAND_RECONFIGURE))):
-        configHandler.write_actual_component(command['role'],
-                                             command['configurationTags'])
-        if 'clientsToUpdateConfigs' in command['hostLevelParams'] and command['hostLevelParams']['clientsToUpdateConfigs']:
-          configHandler.write_client_components(command['serviceName'],
-                                                command['configurationTags'],
-                                                command['hostLevelParams']['clientsToUpdateConfigs'])
-        roleResult['configurationTags'] = configHandler.read_actual_component(
-            command['role'])
-    """
+      role_result['structuredOut'] = ''
 
     self.recovery_manager.process_execution_command_result(command, status)
-    self.commandStatuses.put_command_status(command, roleResult)
+    self.commandStatuses.put_command_status(command, role_result)
 
     cluster_id = str(command['clusterId'])
 
@@ -446,7 +383,7 @@ class ActionQueue(threading.Thread):
       service_name = command['serviceName']
       if service_name != 'null':
         component_name = command['role']
-        self.component_status_executor.check_component_status(clusterId, service_name, component_name, "STATUS", report=True)
+        self.component_status_executor.check_component_status(cluster_id, service_name, component_name, "STATUS", report=True)
 
   def log_command_output(self, text, taskId):
     """
@@ -469,25 +406,21 @@ class ActionQueue(threading.Thread):
     """
     return last_delay * 2
 
-  def command_was_canceled(self):
-    self.customServiceOrchestrator
-
   def on_background_command_complete_callback(self, process_condensed_result, handle):
     logger.debug('Start callback: %s', process_condensed_result)
     logger.debug('The handle is: %s', handle)
-    status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
+    status = CommandStatus.completed if handle.exitCode == 0 else CommandStatus.failed
 
     aborted_postfix = self.customServiceOrchestrator.command_canceled_reason(handle.command['taskId'])
     if aborted_postfix:
-      status = self.FAILED_STATUS
+      status = CommandStatus.failed
       logger.debug('Set status to: %s , reason = %s', status, aborted_postfix)
     else:
       aborted_postfix = ''
 
+    role_result = self.commandStatuses.generate_report_template(handle.command)
 
-    roleResult = self.commandStatuses.generate_report_template(handle.command)
-
-    roleResult.update({
+    role_result.update({
       'stdout': process_condensed_result['stdout'] + aborted_postfix,
       'stderr': process_condensed_result['stderr'] + aborted_postfix,
       'exitCode': process_condensed_result['exitcode'],
@@ -495,10 +428,8 @@ class ActionQueue(threading.Thread):
       'status': status,
     })
 
-    self.commandStatuses.put_command_status(handle.command, roleResult)
+    self.commandStatuses.put_command_status(handle.command, role_result)
 
-  # Removes all commands from the queue
   def reset(self):
-    queue = self.commandQueue
-    with queue.mutex:
-      queue.queue.clear()
+    with self.commandQueue.mutex:
+      self.commandQueue.queue.clear()
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 9a7a41a..0d80dd0 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -110,8 +110,8 @@ class AmbariConfig:
   def get(self, section, value, default=None):
     try:
       return str(self.config.get(section, value)).strip()
-    except ConfigParser.Error, err:
-      if default != None:
+    except ConfigParser.Error as err:
+      if default is not None:
         return default
       raise err
 
diff --git a/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py b/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py
index 17b7ce5..6d55d57 100644
--- a/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py
+++ b/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py
@@ -1,6 +1,4 @@
-#!/usr/bin/env python
-
-'''
+"""
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -16,12 +14,13 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-'''
+"""
 import logging
 
 logger = logging.getLogger()
 installScriptHash = -1
 
+
 class BackgroundCommandExecutionHandle:
   
   SCHEDULED_STATUS = 'SCHEDULED'
@@ -38,7 +37,5 @@ class BackgroundCommandExecutionHandle:
     self.on_background_command_started = on_background_command_started
     self.on_background_command_complete_callback = on_background_command_complete_callback
 
-
-
   def __str__(self):
-    return "[BackgroundHandle: pid='{0}', status='{1}', exitCode='{2}', commandId='{3}']".format(self.pid, self.status, self.exitCode, self.commandId)
\ No newline at end of file
+    return "[BackgroundHandle: pid='{0}', status='{1}', exitCode='{2}', commandId='{3}']".format(self.pid, self.status, self.exitCode, self.commandId)
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandHooksOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CommandHooksOrchestrator.py
new file mode 100644
index 0000000..9113d2b
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/CommandHooksOrchestrator.py
@@ -0,0 +1,168 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import os
+import logging
+
+from models.commands import AgentCommand
+from models.hooks import HookPrefix
+
+__all__ = ["ResolvedHooks", "HooksOrchestrator"]
+
+
+class ResolvedHooks(object):
+  """
+  Hooks sequence holder
+  """
+
+  def __init__(self, pre_hooks=set(), post_hooks=set()):
+    """
+    Creates response instance with generated hooks sequence
+
+    :arg pre_hooks hook sequence, typically generator is passed
+    :arg post_hooks hook sequence, typically generator is passed
+
+    :type pre_hooks Collections.Iterable|types.GeneratorType
+    :type post_hooks Collections.Iterable|types.GeneratorType
+    """
+    self._pre_hooks = pre_hooks
+    self._post_hooks = post_hooks
+
+  @property
+  def pre_hooks(self):
+    """
+    :rtype list
+    """
+    # Converting generator to real sequence on first user request
+    if not isinstance(self._pre_hooks, (list, set)):
+      self._pre_hooks = list(self._pre_hooks)
+
+    return self._pre_hooks
+
+  @property
+  def post_hooks(self):
+    """
+    :rtype list
+    """
+    # Converting generator to real sequence on first user request
+    if not isinstance(self._post_hooks, (list, set)):
+      self._post_hooks = list(self._post_hooks)
+
+    return self._post_hooks
+
+
+class HookSequenceBuilder(object):
+  """
+  Sequence builder according to passed definition
+  """
+
+  # ToDo: move hooks sequence definition to configuration or text file definition?
+  _hooks_sequences = {
+    HookPrefix.pre: [
+      "{prefix}-{command}",
+      "{prefix}-{command}-{service}",
+      "{prefix}-{command}-{service}-{role}"
+    ],
+    HookPrefix.post: [
+      "{prefix}-{command}-{service}-{role}",
+      "{prefix}-{command}-{service}",
+      "{prefix}-{command}"
+    ]
+  }
+
+  def build(self, prefix, command, service, role):
+    """
+    Building hooks sequence depends on incoming data
+
+    :type prefix str
+    :type command str
+    :type service str
+    :type role str
+    :rtype types.GeneratorType
+    """
+    if prefix not in self._hooks_sequences:
+      raise TypeError("Unable to locate hooks sequence definition for '{}' prefix".format(prefix))
+
+    for hook_definition in self._hooks_sequences[prefix]:
+      if "service" in hook_definition and service is None:
+        continue
+
+      if "role" is hook_definition and role is None:
+        continue
+
+      yield hook_definition.format(prefix=prefix, command=command, service=service, role=role)
+
+
+class HooksOrchestrator(object):
+  """
+   Resolving hooks according to HookSequenceBuilder definitions
+  """
+
+  def __init__(self, injector):
+    """
+    :type injector InitializerModule
+    """
+    self._file_cache = injector.file_cache
+    self._logger = logging.getLogger()
+    self._hook_builder = HookSequenceBuilder()
+
+  def resolve_hooks(self, command, command_name):
+    """
+    Resolving available hooks sequences which should be appended or prepended to script execution chain
+
+    :type command dict
+    :type command_name str
+    :rtype ResolvedHooks
+    """
+    command_type = command["commandType"]
+    if command_type == AgentCommand.status or not command_name:
+      return None
+
+    hook_dir = self._file_cache.get_hook_base_dir(command)
+
+    if not hook_dir:
+      return ResolvedHooks()
+
+    service = command["serviceName"] if "serviceName" in command else None
+    component = command["role"] if "role" in command else None
+
+    pre_hooks_seq = self._hook_builder.build(HookPrefix.pre, command_name, service, component)
+    post_hooks_seq = self._hook_builder.build(HookPrefix.post, command_name, service, component)
+
+    return ResolvedHooks(
+      self._resolve_hooks_path(hook_dir, pre_hooks_seq),
+      self._resolve_hooks_path(hook_dir, post_hooks_seq)
+    )
+
+  def _resolve_hooks_path(self, stack_hooks_dir, hooks_sequence):
+    """
+    Returns a tuple(path to hook script, hook base dir) according to passed hooks_sequence
+
+    :type stack_hooks_dir str
+    :type hooks_sequence collections.Iterable|types.GeneratorType
+    """
+
+    for hook in hooks_sequence:
+      hook_base_dir = os.path.join(stack_hooks_dir, hook)
+      hook_script_path = os.path.join(hook_base_dir, "scripts", "hook.py")
+
+      if not os.path.isfile(hook_script_path):
+        self._logger.debug("Hook script {0} not found, skipping".format(hook_script_path))
+        continue
+
+      yield hook_script_path, hook_base_dir
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 84af058..567ea2d 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -29,6 +29,7 @@ from collections import defaultdict
 from Grep import Grep
 
 from ambari_agent import Constants
+from ambari_agent.models.commands import CommandStatus, AgentCommand
 from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
 
 logger = logging.getLogger()
@@ -72,7 +73,7 @@ class CommandStatusDict():
       self.current_state.pop(key, None)
 
     is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]})
-    updatable = report['status'] == ActionQueue.IN_PROGRESS_STATUS and self.command_update_output
+    updatable = report['status'] == CommandStatus.in_progress and self.command_update_output
 
     if not is_sent or updatable:
       self.queue_report_sending(key, command, report)
@@ -136,25 +137,25 @@ class CommandStatusDict():
     generation
     """
     self.generated_reports = []
-    from ActionQueue import ActionQueue
-    with self.lock: # Synchronized
-      resultReports = defaultdict(lambda:[])
+
+    with self.lock:
+      result_reports = defaultdict(lambda:[])
       for key, item in self.current_state.items():
         command = item[0]
         report = item[1]
         cluster_id = report['clusterId']
-        if command ['commandType'] in [ActionQueue.EXECUTION_COMMAND, ActionQueue.BACKGROUND_EXECUTION_COMMAND]:
-          if (report['status']) != ActionQueue.IN_PROGRESS_STATUS:
-            resultReports[cluster_id].append(report)
+        if command['commandType'] in AgentCommand.EXECUTION_COMMAND_GROUP:
+          if (report['status']) != CommandStatus.in_progress:
+            result_reports[cluster_id].append(report)
             self.reported_reports.add(key)
           else:
             in_progress_report = self.generate_in_progress_report(command, report)
-            resultReports[cluster_id].append(in_progress_report)
-        elif command ['commandType'] in [ActionQueue.AUTO_EXECUTION_COMMAND]:
+            result_reports[cluster_id].append(in_progress_report)
+        elif command['commandType'] == AgentCommand.auto_execution:
           logger.debug("AUTO_EXECUTION_COMMAND task deleted %s", command['commandId'])
           self.reported_reports.add(key)
           pass
-      return resultReports
+      return result_reports
 
   def clear_reported_reports(self, result_reports):
     with self.lock:
@@ -178,8 +179,6 @@ class CommandStatusDict():
     Reads stdout/stderr for IN_PROGRESS command from disk file
     and populates other fields of report.
     """
-    from ActionQueue import ActionQueue
-    
     files_to_read = [report['tmpout'], report['tmperr'], report['structuredOut']]
     files_content = ['...', '...', '{}']
 
@@ -200,7 +199,7 @@ class CommandStatusDict():
       'stderr': err,
       'structuredOut': tmpstructuredout,
       'exitCode': 777,
-      'status': ActionQueue.IN_PROGRESS_STATUS,
+      'status': CommandStatus.in_progress,
     })
     return inprogress
 
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 5d20495..64e6ae7 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -1,6 +1,4 @@
-#!/usr/bin/env python
-
-'''
+"""
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -16,7 +14,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-'''
+"""
 
 import logging
 import threading
@@ -24,9 +22,10 @@ import threading
 from ambari_agent import Constants
 from ambari_agent.LiveStatus import LiveStatus
 from collections import defaultdict
+
+from ambari_agent.models.commands import AgentCommand
 from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
 
-logger = logging.getLogger(__name__)
 
 class ComponentStatusExecutor(threading.Thread):
   def __init__(self, initializer_module):
@@ -39,6 +38,7 @@ class ComponentStatusExecutor(threading.Thread):
     self.recovery_manager = initializer_module.recovery_manager
     self.reported_component_status = defaultdict(lambda:defaultdict(lambda:defaultdict(lambda:None))) # component statuses which were received by server
     self.server_responses_listener = initializer_module.server_responses_listener
+    self.logger = logging.getLogger(__name__)
     threading.Thread.__init__(self)
 
   def run(self):
@@ -46,7 +46,7 @@ class ComponentStatusExecutor(threading.Thread):
     Run an endless loop which executes all status commands every 'status_commands_run_interval' seconds.
     """
     if self.status_commands_run_interval == 0:
-      logger.warn("ComponentStatusExecutor is turned off. Some functionality might not work correctly.")
+      self.logger.warn("ComponentStatusExecutor is turned off. Some functionality might not work correctly.")
       return
 
     while not self.stop_event.is_set():
@@ -63,15 +63,15 @@ class ComponentStatusExecutor(threading.Thread):
             # multithreading: if cluster was deleted during iteration
             continue
 
-          if not 'status_commands_to_run' in metadata_cache:
+          if 'status_commands_to_run' not in metadata_cache:
             continue
 
           status_commands_to_run = metadata_cache.status_commands_to_run
 
-          if not 'components' in topology_cache:
+          if 'components' not in topology_cache:
             continue
 
-          current_host_id =  self.topology_cache.get_current_host_id(cluster_id)
+          current_host_id = self.topology_cache.get_current_host_id(cluster_id)
 
           if current_host_id is None:
             continue
@@ -84,11 +84,11 @@ class ComponentStatusExecutor(threading.Thread):
                 break
 
               # cluster was already removed
-              if not cluster_id in self.topology_cache.get_cluster_ids():
+              if cluster_id not in self.topology_cache.get_cluster_ids():
                 break
 
               # check if component is installed on current host
-              if not current_host_id in component_dict.hostIds:
+              if current_host_id not in component_dict.hostIds:
                 break
 
               service_name = component_dict.serviceName
@@ -96,7 +96,7 @@ class ComponentStatusExecutor(threading.Thread):
 
               # do not run status commands for the component which is starting/stopping or doing other action
               if self.customServiceOrchestrator.commandsRunningForComponent(cluster_id, component_name):
-                logger.info("Skipping status command for {0}. Since command for it is running".format(component_name))
+                self.logger.info("Skipping status command for {0}. Since command for it is running".format(component_name))
                 continue
 
               result = self.check_component_status(cluster_id, service_name, component_name, command_name)
@@ -108,10 +108,10 @@ class ComponentStatusExecutor(threading.Thread):
       except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
         pass
       except:
-        logger.exception("Exception in ComponentStatusExecutor. Re-running it")
+        self.logger.exception("Exception in ComponentStatusExecutor. Re-running it")
 
       self.stop_event.wait(self.status_commands_run_interval)
-    logger.info("ComponentStatusExecutor has successfully finished")
+    self.logger.info("ComponentStatusExecutor has successfully finished")
 
   def check_component_status(self, cluster_id, service_name, component_name, command_name, report=False):
     """
@@ -126,7 +126,7 @@ class ComponentStatusExecutor(threading.Thread):
       'serviceName': service_name,
       'role': component_name,
       'clusterId': cluster_id,
-      'commandType': 'STATUS_COMMAND',
+      'commandType': AgentCommand.status,
     }
 
     component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
@@ -135,8 +135,8 @@ class ComponentStatusExecutor(threading.Thread):
     # log if status command failed
     if status == LiveStatus.DEAD_STATUS:
       stderr = component_status_result['stderr']
-      if not "ComponentIsNotRunning" in stderr and not "ClientComponentHasNoStatus" in stderr:
-        logger.info("Status command for {0} failed:\n{1}".format(component_name, stderr))
+      if "ComponentIsNotRunning" not in stderr and "ClientComponentHasNoStatus" not in stderr:
+        self.logger.info("Status command for {0} failed:\n{1}".format(component_name, stderr))
 
     result = {
       'serviceName': service_name,
@@ -177,5 +177,5 @@ class ComponentStatusExecutor(threading.Thread):
     This needs to be done to remove information about clusters which where deleted (e.g. ambari-server reset)
     """
     for cluster_id in self.reported_component_status.keys():
-      if not cluster_id in self.topology_cache.get_cluster_ids():
+      if cluster_id not in self.topology_cache.get_cluster_ids():
         del self.reported_component_status[cluster_id]
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index c66a623..98ef35d 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -18,28 +18,34 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-import logging
+
 import os
-import ambari_simplejson as json
+
+
 import sys
-import time
 import uuid
-from ambari_commons import shell
+import logging
 import threading
+import ambari_simplejson as json
 from collections import defaultdict
+from ConfigParser import NoOptionError
 
-from AgentException import AgentException
-from PythonExecutor import PythonExecutor
+from ambari_commons import shell, subprocess32
+from ambari_commons.constants import AGENT_TMP_DIR
 from resource_management.libraries.functions.log_process_information import log_process_information
 from resource_management.core.utils import PasswordString
-from ambari_commons import subprocess32
+
+from ambari_agent.models.commands import AgentCommand
 from ambari_agent.Utils import Utils
-from ambari_commons.constants import AGENT_TMP_DIR
+
+from AgentException import AgentException
+from PythonExecutor import PythonExecutor
 
 
 logger = logging.getLogger()
 
-class CustomServiceOrchestrator():
+
+class CustomServiceOrchestrator(object):
   """
   Executes a command for custom service. stdout and stderr are written to
   tmpoutfile and to tmperrfile respectively.
@@ -51,9 +57,6 @@ class CustomServiceOrchestrator():
   CUSTOM_ACTION_COMMAND = 'ACTIONEXECUTE'
   CUSTOM_COMMAND_COMMAND = 'CUSTOM_COMMAND'
 
-  PRE_HOOK_PREFIX="before"
-  POST_HOOK_PREFIX="after"
-
   HOSTS_LIST_KEY = "all_hosts"
   PING_PORTS_KEY = "all_ping_ports"
   RACKS_KEY = "all_racks"
@@ -82,6 +85,7 @@ class CustomServiceOrchestrator():
     self.configuration_builder = initializer_module.configuration_builder
     self.host_level_params_cache = initializer_module.host_level_params_cache
     self.config = initializer_module.config
+    self.hooks_orchestrator = initializer_module.hooks_orchestrator
     self.tmp_dir = self.config.get('agent', 'prefix')
     self.force_https_protocol = self.config.get_force_https_protocol_name()
     self.ca_cert_file_path = self.config.get_ca_cert_file_path()
@@ -304,7 +308,6 @@ class CustomServiceOrchestrator():
 
     return cmd_result
 
-
   def runCommand(self, command_header, tmpoutfile, tmperrfile, forced_command_name=None,
                  override_output_files=True, retry=False, is_status_command=False):
     """
@@ -313,10 +316,7 @@ class CustomServiceOrchestrator():
     """
     incremented_commands_for_component = False
 
-    # Make sure the return variable has been initialized
     ret = None
-
-    # Make sure the json_path variable has been initialized
     json_path = None
 
     try:
@@ -326,38 +326,31 @@ class CustomServiceOrchestrator():
       timeout = int(command['commandParams']['command_timeout'])
       cluster_id = str(command['clusterId'])
 
-      server_url_prefix = command['ambariLevelParams']['jdk_location']
-
       # Status commands have no taskId nor roleCommand
       if not is_status_command:
         task_id = command['taskId']
         command_name = command['roleCommand']
       else:
         task_id = 'status'
+        command_name = None
 
       if forced_command_name is not None:  # If not supplied as an argument
         command_name = forced_command_name
 
-      if command_name == self.CUSTOM_ACTION_COMMAND:
-        base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix)
+      if command_name and command_name == self.CUSTOM_ACTION_COMMAND:
+        base_dir = self.file_cache.get_custom_actions_base_dir(command)
         script_tuple = (os.path.join(base_dir, 'scripts', script), base_dir)
-        hook_dir = None
       else:
         if command_name == self.CUSTOM_COMMAND_COMMAND:
           command_name = command['commandParams']['custom_command']
 
         # forces a hash challenge on the directories to keep them updated, even
         # if the return type is not used
-        self.file_cache.get_host_scripts_base_dir(server_url_prefix)
-        hook_dir = self.file_cache.get_hook_base_dir(command, server_url_prefix)
-        base_dir = self.file_cache.get_service_base_dir(command, server_url_prefix)
-        self.file_cache.get_custom_resources_subdir(command, server_url_prefix)
-
+        base_dir = self.file_cache.get_service_base_dir(command)
         script_path = self.resolve_script_path(base_dir, script)
         script_tuple = (script_path, base_dir)
 
-      tmpstrucoutfile = os.path.join(self.tmp_dir,
-                                    "structured-out-{0}.json".format(task_id))
+      tmpstrucoutfile = os.path.join(self.tmp_dir, "structured-out-{0}.json".format(task_id))
 
       # We don't support anything else yet
       if script_type.upper() != self.SCRIPT_TYPE_PYTHON:
@@ -366,30 +359,36 @@ class CustomServiceOrchestrator():
 
       # Execute command using proper interpreter
       handle = None
-      if command.has_key('__handle'):
+      if "__handle" in command:
         handle = command['__handle']
         handle.on_background_command_started = self.map_task_to_process
         del command['__handle']
 
       # If command contains credentialStoreEnabled, then
       # generate the JCEKS file for the configurations.
-      credentialStoreEnabled = False
+      credential_store_enabled = False
       if 'serviceLevelParams' in command and 'credentialStoreEnabled' in command['serviceLevelParams']:
-        credentialStoreEnabled = command['serviceLevelParams']['credentialStoreEnabled']
+        credential_store_enabled = command['serviceLevelParams']['credentialStoreEnabled']
 
-      if credentialStoreEnabled and command_name != self.COMMAND_NAME_STATUS:
+      if credential_store_enabled and command_name != self.COMMAND_NAME_STATUS:
         if 'commandBeingRetried' not in command['agentLevelParams'] or command['agentLevelParams']['commandBeingRetried'] != "true":
           self.generateJceks(command)
         else:
           logger.info("Skipping generation of jceks files as this is a retry of the command")
 
-
       json_path = self.dump_command_to_json(command, retry)
-      pre_hook_tuple = self.resolve_hook_script_path(hook_dir,
-          self.PRE_HOOK_PREFIX, command_name, script_type)
-      post_hook_tuple = self.resolve_hook_script_path(hook_dir,
-          self.POST_HOOK_PREFIX, command_name, script_type)
-      py_file_list = [pre_hook_tuple, script_tuple, post_hook_tuple]
+      hooks = self.hooks_orchestrator.resolve_hooks(command, command_name)
+      """:type hooks ambari_agent.CommandHooksOrchestrator.ResolvedHooks"""
+
+      py_file_list = []
+      if hooks:
+       py_file_list.extend(hooks.pre_hooks)
+
+      py_file_list.append(script_tuple)
+
+      if hooks:
+       py_file_list.extend(hooks.post_hooks)
+
       # filter None values
       filtered_py_file_list = [i for i in py_file_list if i]
 
@@ -397,37 +396,41 @@ class CustomServiceOrchestrator():
 
       # Executing hooks and script
       ret = None
-      from ActionQueue import ActionQueue
-      if command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1:
+
+      if "commandType" in command and command['commandType'] == AgentCommand.background_execution\
+        and len(filtered_py_file_list) > 1:
+
         raise AgentException("Background commands are supported without hooks only")
 
       python_executor = self.get_py_executor(forced_command_name)
-      backup_log_files = not command_name in self.DONT_BACKUP_LOGS_FOR_COMMANDS
-      log_out_files = self.config.get("logging","log_out_files", default="0") != "0"
+      backup_log_files = command_name not in self.DONT_BACKUP_LOGS_FOR_COMMANDS
+      try:
+       log_out_files = self.config.get("logging", "log_out_files", default=None) is not None
+      except NoOptionError:
+       log_out_files = None
 
       if cluster_id != '-1' and cluster_id != 'null':
         self.commands_for_component_in_progress[cluster_id][command['role']] += 1
         incremented_commands_for_component = True
 
       for py_file, current_base_dir in filtered_py_file_list:
-        log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
+        log_info_on_failure = command_name not in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
         script_params = [command_name, json_path, current_base_dir, tmpstrucoutfile, logger_level, self.exec_tmp_dir,
                          self.force_https_protocol, self.ca_cert_file_path]
 
         if log_out_files:
           script_params.append("-o")
 
-        ret = python_executor.run_file(py_file, script_params,
-                               tmpoutfile, tmperrfile, timeout,
-                               tmpstrucoutfile, self.map_task_to_process,
-                               task_id, override_output_files, backup_log_files = backup_log_files,
-                               handle = handle, log_info_on_failure=log_info_on_failure)
+        ret = python_executor.run_file(py_file, script_params, tmpoutfile, tmperrfile, timeout,
+                                       tmpstrucoutfile, self.map_task_to_process, task_id, override_output_files,
+                                       backup_log_files=backup_log_files, handle=handle,
+                                       log_info_on_failure=log_info_on_failure)
         # Next run_file() invocations should always append to current output
         override_output_files = False
         if ret['exitcode'] != 0:
           break
 
-      if not ret: # Something went wrong
+      if not ret:
         raise AgentException("No script has been executed")
 
       # if canceled and not background command
@@ -442,15 +445,14 @@ class CustomServiceOrchestrator():
           with open(tmperrfile, "a") as f:
             f.write(cancel_reason)
 
-    except Exception, e: # We do not want to let agent fail completely
+    except Exception as e:
       exc_type, exc_obj, exc_tb = sys.exc_info()
-      message = "Caught an exception while executing "\
-        "custom service command: {0}: {1}; {2}".format(exc_type, exc_obj, str(e))
+      message = "Caught an exception while executing custom service command: {0}: {1}; {2}".format(exc_type, exc_obj, e)
       logger.exception(message)
       ret = {
-        'stdout' : message,
-        'stderr' : message,
-        'structuredOut' : '{}',
+        'stdout': message,
+        'stderr': message,
+        'structuredOut': '{}',
         'exitcode': 1,
       }
     finally:
@@ -469,7 +471,7 @@ class CustomServiceOrchestrator():
 
   def command_canceled_reason(self, task_id):
     with self.commands_in_progress_lock:
-      if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO)
+      if task_id in self.commands_in_progress:
         logger.debug('Pop with taskId %s', task_id)
         pid = self.commands_in_progress.pop(task_id)
         if not isinstance(pid, (int, long)):
@@ -503,7 +505,7 @@ class CustomServiceOrchestrator():
 
     # topology needs to be decompressed if and only if it originates from command header
     if 'clusterHostInfo' in command_header and command_header['clusterHostInfo']:
-      command['clusterHostInfo'] = self.decompressClusterHostInfo(command['clusterHostInfo'])
+      command['clusterHostInfo'] = self.decompress_cluster_host_info(command['clusterHostInfo'])
 
     return command
 
@@ -513,7 +515,7 @@ class CustomServiceOrchestrator():
      Exit code 0 means that component is running and any other exit code means that
      component is not running
     """
-    override_output_files=True # by default, we override status command output
+    override_output_files = True
     if logger.level == logging.DEBUG:
       override_output_files = False
 
@@ -544,153 +546,126 @@ class CustomServiceOrchestrator():
       raise AgentException(message)
     return path
 
-
-  def resolve_hook_script_path(self, stack_hooks_dir, prefix, command_name, script_type):
-    """
-    Returns a tuple(path to hook script, hook base dir) according to string prefix
-    and command name. If script does not exist, returns None
-    """
-    if not stack_hooks_dir:
-      return None
-    hook_dir = "{0}-{1}".format(prefix, command_name)
-    hook_base_dir = os.path.join(stack_hooks_dir, hook_dir)
-    hook_script_path = os.path.join(hook_base_dir, "scripts", "hook.py")
-    if not os.path.isfile(hook_script_path):
-      logger.debug("Hook script {0} not found, skipping".format(hook_script_path))
-      return None
-    return hook_script_path, hook_base_dir
-
-
   def dump_command_to_json(self, command, retry=False):
     """
     Converts command to json file and returns file path
     """
     # Now, dump the json file
     command_type = command['commandType']
-    from ActionQueue import ActionQueue  # To avoid cyclic dependency
-    if command_type == ActionQueue.STATUS_COMMAND:
+
+    if command_type == AgentCommand.status:
       # make sure status commands that run in parallel don't use the same files
       file_path = os.path.join(self.tmp_dir, "status_command_{0}.json".format(uuid.uuid4()))
     else:
       task_id = command['taskId']
       file_path = os.path.join(self.tmp_dir, "command-{0}.json".format(task_id))
-      if command_type == ActionQueue.AUTO_EXECUTION_COMMAND:
+      if command_type == AgentCommand.auto_execution:
         file_path = os.path.join(self.tmp_dir, "auto_command-{0}.json".format(task_id))
 
     # Json may contain passwords, that's why we need proper permissions
     if os.path.isfile(file_path):
       os.unlink(file_path)
-    with os.fdopen(os.open(file_path, os.O_WRONLY | os.O_CREAT,
-                           0600), 'w') as f:
-      content = json.dumps(command, sort_keys = False, indent = 4)
+    with os.fdopen(os.open(file_path, os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f:
+      content = json.dumps(command, sort_keys=False, indent=4)
       f.write(content)
     return file_path
 
-  def decompressClusterHostInfo(self, clusterHostInfo):
-    info = clusterHostInfo.copy()
-    #Pop info not related to host roles
-    hostsList = info.pop(self.HOSTS_LIST_KEY)
-    pingPorts = info.pop(self.PING_PORTS_KEY)
+  def decompress_cluster_host_info(self, cluster_host_info):
+    info = cluster_host_info.copy()
+    hosts_list = info.pop(self.HOSTS_LIST_KEY)
+    ping_ports = info.pop(self.PING_PORTS_KEY)
     racks = info.pop(self.RACKS_KEY)
     ipv4_addresses = info.pop(self.IPV4_ADDRESSES_KEY)
 
-    ambariServerHost = info.pop(self.AMBARI_SERVER_HOST)
-    ambariServerPort = info.pop(self.AMBARI_SERVER_PORT)
-    ambariServerUseSsl = info.pop(self.AMBARI_SERVER_USE_SSL)
+    ambari_server_host = info.pop(self.AMBARI_SERVER_HOST)
+    ambari_server_port = info.pop(self.AMBARI_SERVER_PORT)
+    ambari_server_use_ssl = info.pop(self.AMBARI_SERVER_USE_SSL)
 
-    decompressedMap = {}
+    decompressed_map = {}
 
-    for k,v in info.items():
+    for k, v in info.items():
       # Convert from 1-3,5,6-8 to [1,2,3,5,6,7,8]
-      indexes = self.convertRangeToList(v)
+      indexes = self.convert_range_to_list(v)
       # Convert from [1,2,3,5,6,7,8] to [host1,host2,host3...]
-      decompressedMap[k] = [hostsList[i] for i in indexes]
+      decompressed_map[k] = [hosts_list[i] for i in indexes]
 
-    #Convert from ['1:0-2,4', '42:3,5-7'] to [1,1,1,42,1,42,42,42]
-    pingPorts = self.convertMappedRangeToList(pingPorts)
-    racks = self.convertMappedRangeToList(racks)
-    ipv4_addresses = self.convertMappedRangeToList(ipv4_addresses)
+    # Convert from ['1:0-2,4', '42:3,5-7'] to [1,1,1,42,1,42,42,42]
+    ping_ports = self.convert_mapped_range_to_list(ping_ports)
+    racks = self.convert_mapped_range_to_list(racks)
+    ipv4_addresses = self.convert_mapped_range_to_list(ipv4_addresses)
 
-    #Convert all elements to str
-    pingPorts = map(str, pingPorts)
+    ping_ports = map(str, ping_ports)
 
-    #Add ping ports to result
-    decompressedMap[self.PING_PORTS_KEY] = pingPorts
-    #Add hosts list to result
-    decompressedMap[self.HOSTS_LIST_KEY] = hostsList
-    #Add racks list to result
-    decompressedMap[self.RACKS_KEY] = racks
-    #Add ips list to result
-    decompressedMap[self.IPV4_ADDRESSES_KEY] = ipv4_addresses
-    #Add ambari-server properties to result
-    decompressedMap[self.AMBARI_SERVER_HOST] = ambariServerHost
-    decompressedMap[self.AMBARI_SERVER_PORT] = ambariServerPort
-    decompressedMap[self.AMBARI_SERVER_USE_SSL] = ambariServerUseSsl
+    decompressed_map[self.PING_PORTS_KEY] = ping_ports
+    decompressed_map[self.HOSTS_LIST_KEY] = hosts_list
+    decompressed_map[self.RACKS_KEY] = racks
+    decompressed_map[self.IPV4_ADDRESSES_KEY] = ipv4_addresses
+    decompressed_map[self.AMBARI_SERVER_HOST] = ambari_server_host
+    decompressed_map[self.AMBARI_SERVER_PORT] = ambari_server_port
+    decompressed_map[self.AMBARI_SERVER_USE_SSL] = ambari_server_use_ssl
 
-    return decompressedMap
+    return decompressed_map
 
-  # Converts from 1-3,5,6-8 to [1,2,3,5,6,7,8]
-  def convertRangeToList(self, list):
-
-    resultList = []
+  def convert_range_to_list(self, range_to_convert):
+    """
+    Converts from 1-3,5,6-8 to [1,2,3,5,6,7,8]
 
-    for i in list:
+    :type range_to_convert list
+    """
+    result_list = []
 
+    for i in range_to_convert:
       ranges = i.split(',')
 
       for r in ranges:
-        rangeBounds = r.split('-')
-        if len(rangeBounds) == 2:
+        range_bounds = r.split('-')
+        if len(range_bounds) == 2:
 
-          if not rangeBounds[0] or not rangeBounds[1]:
-            raise AgentException("Broken data in given range, expected - ""m-n"" or ""m"", got : " + str(r))
+          if not range_bounds[0] or not range_bounds[1]:
+            raise AgentException("Broken data in given range, expected - ""m-n"" or ""m"", got: " + str(r))
 
-
-          resultList.extend(range(int(rangeBounds[0]), int(rangeBounds[1]) + 1))
-        elif len(rangeBounds) == 1:
-          resultList.append((int(rangeBounds[0])))
+          result_list.extend(range(int(range_bounds[0]), int(range_bounds[1]) + 1))
+        elif len(range_bounds) == 1:
+          result_list.append((int(range_bounds[0])))
         else:
-          raise AgentException("Broken data in given range, expected - ""m-n"" or ""m"", got : " + str(r))
+          raise AgentException("Broken data in given range, expected - ""m-n"" or ""m"", got: " + str(r))
 
-    return resultList
+    return result_list
 
-  #Converts from ['1:0-2,4', '42:3,5-7'] to [1,1,1,42,1,42,42,42]
-  def convertMappedRangeToList(self, list):
+  def convert_mapped_range_to_list(self, range_to_convert):
+    """
+    Converts from ['1:0-2,4', '42:3,5-7'] to [1,1,1,42,1,42,42,42]
 
-    resultDict = {}
+    :type range_to_convert list
+    """
+    result_dict = {}
 
-    for i in list:
-      valueToRanges = i.split(":")
-      if len(valueToRanges) <> 2:
+    for i in range_to_convert:
+      value_to_ranges = i.split(":")
+      if len(value_to_ranges) != 2:
         raise AgentException("Broken data in given value to range, expected format - ""value:m-n"", got - " + str(i))
-      value = valueToRanges[0]
-      rangesToken = valueToRanges[1]
+      value = value_to_ranges[0]
+      ranges_token = value_to_ranges[1]
 
-      for r in rangesToken.split(','):
+      for r in ranges_token.split(','):
+        range_indexes = r.split('-')
 
-        rangeIndexes = r.split('-')
+        if len(range_indexes) == 2:
 
-        if len(rangeIndexes) == 2:
-
-          if not rangeIndexes[0] or not rangeIndexes[1]:
+          if not range_indexes[0] or not range_indexes[1]:
             raise AgentException("Broken data in given value to range, expected format - ""value:m-n"", got - " + str(r))
 
-          start = int(rangeIndexes[0])
-          end = int(rangeIndexes[1])
+          start = int(range_indexes[0])
+          end = int(range_indexes[1])
 
           for k in range(start, end + 1):
-            resultDict[k] = value if not value.isdigit() else int(value)
-
-
-        elif len(rangeIndexes) == 1:
-          index = int(rangeIndexes[0])
-
-          resultDict[index] = value if not value.isdigit() else int(value)
-
+            result_dict[k] = value if not value.isdigit() else int(value)
 
-    resultList = dict(sorted(resultDict.items())).values()
+        elif len(range_indexes) == 1:
+          index = int(range_indexes[0])
+          result_dict[index] = value if not value.isdigit() else int(value)
 
-    return resultList
+    return dict(sorted(result_dict.items())).values()
 
   def conditionally_remove_command_file(self, command_json_path, command_result):
     """
@@ -741,7 +716,7 @@ class CustomServiceOrchestrator():
         try:
           os.remove(command_json_path)
           removed_command_file = True
-        except Exception, e:
+        except OSError as e:
           logger.error("Failed to remove %s due to error: %s", command_json_path, str(e))
 
     return removed_command_file
diff --git a/ambari-agent/src/main/python/ambari_agent/FileCache.py b/ambari-agent/src/main/python/ambari_agent/FileCache.py
index aec0357..9e1c7af 100644
--- a/ambari-agent/src/main/python/ambari_agent/FileCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/FileCache.py
@@ -73,8 +73,18 @@ class FileCache():
   def reset(self):
     self.uptodate_paths = [] # Paths that already have been recently checked
 
+  def get_server_url_prefix(self, command):
+    """
+     Returns server url prefix if exists
+
+    :type command dict
+    """
+    try:
+      return command['ambariLevelParams']['jdk_location']
+    except KeyError:
+      return ""
 
-  def get_service_base_dir(self, command, server_url_prefix):
+  def get_service_base_dir(self, command):
     """
     Returns a base directory for service
     """
@@ -82,11 +92,9 @@ class FileCache():
       service_subpath = command['commandParams']['service_package_folder']
     else:
       service_subpath = command['serviceLevelParams']['service_package_folder']
-    return self.provide_directory(self.cache_dir, service_subpath,
-                                  server_url_prefix)
+    return self.provide_directory(self.cache_dir, service_subpath, self.get_server_url_prefix(command))
 
-
-  def get_hook_base_dir(self, command, server_url_prefix):
+  def get_hook_base_dir(self, command):
     """
     Returns a base directory for hooks
     """
@@ -94,20 +102,18 @@ class FileCache():
       hooks_path = command['clusterLevelParams']['hooks_folder']
     except KeyError:
       return None
-    return self.provide_directory(self.cache_dir, hooks_path,
-                                  server_url_prefix)
-
+    return self.provide_directory(self.cache_dir, hooks_path, self.get_server_url_prefix(command))
 
-  def get_custom_actions_base_dir(self, server_url_prefix):
+  def get_custom_actions_base_dir(self, command):
     """
     Returns a base directory for custom action scripts
     """
     return self.provide_directory(self.cache_dir,
                                   self.CUSTOM_ACTIONS_CACHE_DIRECTORY,
-                                  server_url_prefix)
+                                  self.get_server_url_prefix(command))
 
 
-  def get_custom_resources_subdir(self, command, server_url_prefix):
+  def get_custom_resources_subdir(self, command):
     """
     Returns a custom directory which must be a subdirectory of the resources dir
     """
@@ -116,20 +122,16 @@ class FileCache():
     except KeyError:
       return None
 
-    return self.provide_directory(self.cache_dir,
-                                  custom_dir,
-                                  server_url_prefix)
-
+    return self.provide_directory(self.cache_dir, custom_dir, self.get_server_url_prefix(command))
 
-  def get_host_scripts_base_dir(self, server_url_prefix):
+  def get_host_scripts_base_dir(self, command):
     """
     Returns a base directory for host scripts (host alerts, etc) which
     are scripts that are not part of the main agent code
     """
     return self.provide_directory(self.cache_dir,
                                   self.HOST_SCRIPTS_CACHE_DIRECTORY,
-                                  server_url_prefix)
-
+                                  self.get_server_url_prefix(command))
 
   def auto_cache_update_enabled(self):
     from AmbariConfig import AmbariConfig
@@ -182,7 +184,7 @@ class FileCache():
                                                  subdirectory, self.ARCHIVE_NAME)
           membuffer = self.fetch_url(download_url)
           # extract only when the archive is not zero sized
-          if (membuffer.getvalue().strip()):
+          if membuffer.getvalue().strip():
             self.invalidate_directory(full_path)
             self.unpack_archive(membuffer, full_path)
             self.write_hash_sum(full_path, remote_hash)
@@ -193,7 +195,7 @@ class FileCache():
             pass
         # Finally consider cache directory up-to-date
         self.uptodate_paths.append(full_path)
-    except CachingException, e:
+    except CachingException as e:
       if self.tolerate_download_failures:
         # ignore
         logger.warn("Error occurred during cache update. "
@@ -209,9 +211,7 @@ class FileCache():
 
     return full_path
 
-
-  def build_download_url(self, server_url_prefix,
-                         directory, filename):
+  def build_download_url(self, server_url_prefix, directory, filename):
     """
     Builds up a proper download url for file. Used for downloading files
     from the server.
@@ -221,7 +221,6 @@ class FileCache():
     return "{0}/{1}/{2}".format(server_url_prefix,
                                 urllib.pathname2url(directory), filename)
 
-
   def fetch_url(self, url):
     """
     Fetches content on url to in-memory buffer and returns the resulting buffer.
@@ -242,10 +241,8 @@ class FileCache():
         if not buff:
           break
       return memory_buffer
-    except Exception, err:
-      raise CachingException("Can not download file from"
-                             " url {0} : {1}".format(url, str(err)))
-
+    except Exception as err:
+      raise CachingException("Can not download file from url {0} : {1}".format(url, str(err)))
 
   def read_hash_sum(self, directory):
     """
@@ -257,8 +254,7 @@ class FileCache():
       with open(hash_file) as fh:
         return fh.readline().strip()
     except:
-      return None # We don't care
-
+      return None
 
   def write_hash_sum(self, directory, new_hash):
     """
@@ -270,10 +266,8 @@ class FileCache():
       with open(hash_file, "w") as fh:
         fh.write(new_hash)
       os.chmod(hash_file, 0o644)
-    except Exception, err:
-      raise CachingException("Can not write to file {0} : {1}".format(hash_file,
-                                                                 str(err)))
-
+    except Exception as err:
+      raise CachingException("Can not write to file {0} : {1}".format(hash_file, str(err)))
 
   def invalidate_directory(self, directory):
     """
@@ -287,7 +281,7 @@ class FileCache():
     logger.debug("Invalidating directory {0}".format(directory))
     try:
       if os.path.exists(directory):
-        if os.path.isfile(directory): # It would be a strange situation
+        if os.path.isfile(directory):  # It would be a strange situation
           os.unlink(directory)
         elif os.path.isdir(directory):
           """
@@ -298,12 +292,11 @@ class FileCache():
           execute_with_retries(CLEAN_DIRECTORY_TRIES, CLEAN_DIRECTORY_TRY_SLEEP, OSError, shutil.rmtree, directory)
         # create directory itself and any parent directories
       os.makedirs(directory)
-    except Exception, err:
+    except Exception as err:
       logger.exception("Can not invalidate cache directory {0}".format(directory))
       raise CachingException("Can not invalidate cache directory {0}: {1}",
                              directory, str(err))
 
-
   def unpack_archive(self, mem_buffer, target_directory):
     """
     Unpacks contents of in-memory buffer to file system.
@@ -317,9 +310,7 @@ class FileCache():
         if not os.path.isdir(concrete_dir):
           os.makedirs(concrete_dir)
         logger.debug("Unpacking file {0} to {1}".format(name, concrete_dir))
-        if filename!='':
+        if filename != '':
           zfile.extract(name, target_directory)
-    except Exception, err:
-      raise CachingException("Can not unpack zip file to "
-                             "directory {0} : {1}".format(
-                            target_directory, str(err)))
+    except Exception as err:
+      raise CachingException("Can not unpack zip file to directory {0} : {1}".format(target_directory, str(err)))
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index b641c06..b15aaec 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -21,6 +21,7 @@ limitations under the License.
 import threading
 import logging
 
+from ambari_agent.CommandHooksOrchestrator import HooksOrchestrator
 from ambari_agent.FileCache import FileCache
 from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache
@@ -69,6 +70,7 @@ class InitializerModule:
     self.server_responses_listener = None
     self.file_cache = None
     self.customServiceOrchestrator = None
+    self.hooks_orchestrator = None
     self.recovery_manager = None
     self.commandStatuses = None
     self.action_queue = None
@@ -92,6 +94,7 @@ class InitializerModule:
     self.server_responses_listener = ServerResponsesListener(self)
     self.file_cache = FileCache(self.config)
     self.customServiceOrchestrator = CustomServiceOrchestrator(self)
+    self.hooks_orchestrator = HooksOrchestrator(self)
     self.recovery_manager = RecoveryManager(self)
     self.commandStatuses = CommandStatusDict(self)
 
diff --git a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
index a506e28..2809faa 100644
--- a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
+++ b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 
-'''
+"""
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -16,13 +16,11 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-'''
+"""
 
 import logging
 from ActualConfigHandler import ActualConfigHandler
 
-logger = logging.getLogger()
-
 
 class LiveStatus:
 
@@ -33,8 +31,8 @@ class LiveStatus:
   LIVE_STATUS = "STARTED"
   DEAD_STATUS = "INSTALLED"
 
-  def __init__(self, cluster, service, component, globalConfig, config,
-               configTags):
+  def __init__(self, cluster, service, component, globalConfig, config, configTags):
+    self.logger = logging.getLogger()
     self.cluster = cluster
     self.service = service
     self.component = component
@@ -47,7 +45,6 @@ class LiveStatus:
     :param component_status: component status to include into report
     :return: populated livestatus dict
     """
-    global LIVE_STATUS, DEAD_STATUS
 
     livestatus = {"componentName": self.component,
                   "msg": "",
@@ -62,5 +59,5 @@ class LiveStatus:
     if active_config is not None:
       livestatus['configurationTags'] = active_config
 
-    logger.debug("The live status for component %s of service %s is %s", self.component, self.service, livestatus)
+    self.logger.debug("The live status for component %s of service %s is %s", self.component, self.service, livestatus)
     return livestatus
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index 8b9e9ca..977562b 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 
-'''
+"""
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -16,26 +16,22 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-'''
-import ambari_simplejson as json
+"""
+
 import logging
 import os
-from ambari_commons import subprocess32
 import pprint
 import threading
-import platform
-from threading import Thread
-import time
-from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
-from resource_management.libraries.functions.log_process_information import log_process_information
-from ambari_commons.os_check import OSConst, OSCheck
-from Grep import Grep
 import sys
+
+import ambari_simplejson as json
+
+from ambari_commons import subprocess32
 from ambari_commons import shell
-from ambari_commons.shell import shellRunner
 
+from Grep import Grep
+from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
 
-logger = logging.getLogger()
 
 class PythonExecutor(object):
   """
@@ -45,44 +41,39 @@ class PythonExecutor(object):
   """
   NO_ERROR = "none"
 
-  def __init__(self, tmpDir, config):
+  def __init__(self, tmp_dir, config):
+    self.logger = logging.getLogger()
     self.grep = Grep()
     self.event = threading.Event()
     self.python_process_has_been_killed = False
-    self.tmpDir = tmpDir
+    self.tmpDir = tmp_dir
     self.config = config
     self.log_max_symbols_size = self.config.log_max_symbols_size
-    pass
 
+  def open_subprocess32_files(self, tmp_out_file, tmp_err_file, override_output_files, backup_log_files=True):
+    mode = "w" if override_output_files else "a"
 
-  def open_subprocess32_files(self, tmpoutfile, tmperrfile, override_output_files, backup_log_files = True):
-    if override_output_files: # Recreate files, existing files are backed up if backup_log_files is True
-      if backup_log_files:
-        self.back_up_log_file_if_exists(tmpoutfile)
-        self.back_up_log_file_if_exists(tmperrfile)
-      tmpout =  open(tmpoutfile, 'w')
-      tmperr =  open(tmperrfile, 'w')
-    else: # Append to files
-      tmpout =  open(tmpoutfile, 'a')
-      tmperr =  open(tmperrfile, 'a')
-    return tmpout, tmperr
+    if override_output_files and backup_log_files:
+      self.back_up_log_file_if_exists(tmp_out_file)
+      self.back_up_log_file_if_exists(tmp_err_file)
+
+    return open(tmp_out_file, mode), open(tmp_err_file, mode)
 
   def back_up_log_file_if_exists(self, file_path):
     if os.path.isfile(file_path):
       counter = 0
       while True:
-        # Find backup name that is not used yet (saves logs
-        # from multiple command retries)
+        # Find backup name that is not used yet (saves logs from multiple command retries)
         backup_name = file_path + "." + str(counter)
         if not os.path.isfile(backup_name):
           break
         counter += 1
       os.rename(file_path, backup_name)
 
-  def run_file(self, script, script_params, tmpoutfile, tmperrfile,
-               timeout, tmpstructedoutfile, callback, task_id,
-               override_output_files = True, backup_log_files = True, handle = None,
-               log_info_on_failure = True):
+  def run_file(self, script, script_params, tmp_out_file, tmp_err_file,
+               timeout, tmp_structed_outfile, callback, task_id,
+               override_output_files=True, backup_log_files=True, handle=None,
+               log_info_on_failure=True):
     """
     Executes the specified python file in a separate subprocess32.
     Method returns only when the subprocess32 is finished.
@@ -93,39 +84,58 @@ class PythonExecutor(object):
     recreated or appended.
     The structured out file, however, is preserved during multiple invocations that use the same file.
     """
-    pythonCommand = self.python_command(script, script_params)
-    if logger.isEnabledFor(logging.DEBUG):
-      logger.debug("Running command %s", pprint.pformat(pythonCommand))
+    python_command = self.python_command(script, script_params)
+    if self.logger.isEnabledFor(logging.DEBUG):
+      self.logger.debug("Running command %s", pprint.pformat(python_command))
+
+    def background_executor():
+      logger = logging.getLogger()
+      process_out, process_err = self.open_subprocess32_files(tmp_out_file, tmp_err_file, True)
+
+      logger.debug("Starting process command %s", python_command)
+      p = self.launch_python_subprocess32(python_command, process_out, process_err)
+
+      logger.debug("Process has been started. Pid = %s", p.pid)
+
+      handle.pid = p.pid
+      handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS
+      handle.on_background_command_started(handle.command['taskId'], p.pid)
+
+      p.communicate()
+
+      handle.exitCode = p.returncode
+      process_condensed_result = self.prepare_process_result(p.returncode, tmp_out_file, tmp_err_file, tmp_structed_outfile)
+      logger.debug("Calling callback with args %s", process_condensed_result)
+      handle.on_background_command_complete_callback(process_condensed_result, handle)
+      logger.debug("Exiting from thread for holder pid %s", handle.pid)
 
     if handle is None:
-      tmpout, tmperr = self.open_subprocess32_files(tmpoutfile, tmperrfile, override_output_files, backup_log_files)
+      tmpout, tmperr = self.open_subprocess32_files(tmp_out_file, tmp_err_file, override_output_files, backup_log_files)
 
-      process = self.launch_python_subprocess32(pythonCommand, tmpout, tmperr)
+      process = self.launch_python_subprocess32(python_command, tmpout, tmperr)
       # map task_id to pid
       callback(task_id, process.pid)
-      logger.debug("Launching watchdog thread")
+      self.logger.debug("Launching watchdog thread")
       self.event.clear()
       self.python_process_has_been_killed = False
-      thread = Thread(target =  self.python_watchdog_func, args = (process, timeout))
+      thread = threading.Thread(target=self.python_watchdog_func, args=(process, timeout))
       thread.start()
       # Waiting for the process to be either finished or killed
       process.communicate()
       self.event.set()
       thread.join()
-      result = self.prepare_process_result(process.returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout)
+      result = self.prepare_process_result(process.returncode, tmp_out_file, tmp_err_file, tmp_structed_outfile, timeout=timeout)
 
       if log_info_on_failure and result['exitcode']:
-        self.on_failure(pythonCommand, result)
+        self.on_failure(python_command, result)
 
       return result
     else:
-      holder = Holder(pythonCommand, tmpoutfile, tmperrfile, tmpstructedoutfile, handle)
-
-      background = BackgroundThread(holder, self)
+      background = threading.Thread(target=background_executor, args=())
       background.start()
       return {"exitcode": 777}
 
-  def on_failure(self, pythonCommand, result):
+  def on_failure(self, python_command, result):
     """
     Log some useful information after task failure.
     """
@@ -137,11 +147,13 @@ class PythonExecutor(object):
     out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)
 
     if self.python_process_has_been_killed:
-      error = str(error) + "\n Python script has been killed due to timeout" + \
-              (" after waiting %s secs" % str(timeout) if timeout else "")
+      error = "{error}\nPython script has been killed due to timeout{timeout_details}".format(
+        error=error,
+        timeout_details="" if not timeout else " after waiting {} secs".format(timeout)
+      )
       returncode = 999
-    result = self.condenseOutput(out, error, returncode, structured_out)
-    logger.debug("Result: %s", result)
+    result = self.condense_output(out, error, returncode, structured_out)
+    self.logger.debug("Result: %s", result)
     return result
 
   def read_result_from_files(self, out_path, err_path, structured_out_path):
@@ -150,93 +162,46 @@ class PythonExecutor(object):
     try:
       with open(structured_out_path, 'r') as fp:
         structured_out = json.load(fp)
-    except Exception:
-      if os.path.exists(structured_out_path):
-        errMsg = 'Unable to read structured output from ' + structured_out_path
-        structured_out = {
-          'msg' : errMsg
-        }
-        logger.warn(structured_out)
-      else:
-        structured_out = {}
+    except (TypeError, ValueError):
+      structured_out = {
+        "msg": "Unable to read structured output from " + structured_out_path
+      }
+      self.logger.warn(structured_out)
+    except (OSError, IOError):
+      structured_out = {}
     return out, error, structured_out
 
-  def preexec_fn(self):
-    os.setpgid(0, 0)
-
   def launch_python_subprocess32(self, command, tmpout, tmperr):
     """
     Creates subprocess32 with given parameters. This functionality was moved to separate method
     to make possible unit testing
     """
-    close_fds = None if OSCheck.get_os_family() == OSConst.WINSRV_FAMILY else True
     command_env = dict(os.environ)
-    if OSCheck.get_os_family() == OSConst.WINSRV_FAMILY:
-      command_env["PYTHONPATH"] = os.pathsep.join(sys.path)
-      for k, v in command_env.iteritems():
-        command_env[k] = str(v)
+    return subprocess32.Popen(command, stdout=tmpout, stderr=tmperr, close_fds=True, env=command_env,
+                              preexec_fn=lambda: os.setpgid(0, 0))
 
-    return subprocess32.Popen(command,
-      stdout=tmpout,
-      stderr=tmperr, close_fds=close_fds, env=command_env, preexec_fn=self.preexec_fn)
-
-  def isSuccessfull(self, returncode):
-    return not self.python_process_has_been_killed and returncode == 0
+  def is_successful(self, return_code):
+    return not self.python_process_has_been_killed and return_code == 0
 
   def python_command(self, script, script_params):
-    #we need manually pass python executable on windows because sys.executable will return service wrapper
-    python_binary = os.environ['PYTHON_EXE'] if 'PYTHON_EXE' in os.environ else sys.executable
-    python_command = [python_binary, script] + script_params
+    """
+    :type script str
+    :type script_params list|set
+    """
+    python_command = [sys.executable, script] + script_params
     return python_command
 
-  def condenseOutput(self, stdout, stderr, retcode, structured_out):
-    result = {
-      "exitcode": retcode,
+  def condense_output(self, stdout, stderr, ret_code, structured_out):
+    return {
+      "exitcode": ret_code,
       "stdout": self.grep.tail_by_symbols(stdout, self.log_max_symbols_size) if self.log_max_symbols_size else stdout,
       "stderr": self.grep.tail_by_symbols(stderr, self.log_max_symbols_size) if self.log_max_symbols_size else stderr,
-      "structuredOut" : structured_out
+      "structuredOut": structured_out
     }
 
-    return result
-
-  def python_watchdog_func(self, python, timeout):
+  def python_watchdog_func(self, process, timeout):
     self.event.wait(timeout)
-    if python.returncode is None:
-      logger.error("subprocess32 timed out and will be killed")
-      shell.kill_process_with_children(python.pid)
+    if process.returncode is None:
+      self.logger.error("Executed command with pid {} timed out and will be killed".format(process.pid))
+      shell.kill_process_with_children(process.pid)
       self.python_process_has_been_killed = True
-    pass
-
-class Holder:
-  def __init__(self, command, out_file, err_file, structured_out_file, handle):
-    self.command = command
-    self.out_file = out_file
-    self.err_file = err_file
-    self.structured_out_file = structured_out_file
-    self.handle = handle
-
-class BackgroundThread(threading.Thread):
-  def __init__(self, holder, pythonExecutor):
-    threading.Thread.__init__(self)
-    self.holder = holder
-    self.pythonExecutor = pythonExecutor
-
-  def run(self):
-    process_out, process_err = self.pythonExecutor.open_subprocess32_files(self.holder.out_file, self.holder.err_file, True)
-
-    logger.debug("Starting process command %s", self.holder.command)
-    process = self.pythonExecutor.launch_python_subprocess32(self.holder.command, process_out, process_err)
-
-    logger.debug("Process has been started. Pid = %s", process.pid)
-
-    self.holder.handle.pid = process.pid
-    self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS
-    self.holder.handle.on_background_command_started(self.holder.handle.command['taskId'], process.pid)
-
-    process.communicate()
-
-    self.holder.handle.exitCode = process.returncode
-    process_condensed_result = self.pythonExecutor.prepare_process_result(process.returncode, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
-    logger.debug("Calling callback with args %s", process_condensed_result)
-    self.holder.handle.on_background_command_complete_callback(process_condensed_result, self.holder.handle)
-    logger.debug("Exiting from thread for holder pid %s", self.holder.handle.pid)
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
index eeacd22..7aec009 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
@@ -38,11 +38,11 @@ class PythonReflectiveExecutor(PythonExecutor):
   Running the commands not in new proccess, but reflectively makes this really fast.
   """
   
-  def __init__(self, tmpDir, config):
-    super(PythonReflectiveExecutor, self).__init__(tmpDir, config)
+  def __init__(self, tmp_dir, config):
+    super(PythonReflectiveExecutor, self).__init__(tmp_dir, config)
     
-  def run_file(self, script, script_params, tmpoutfile, tmperrfile,
-               timeout, tmpstructedoutfile, callback, task_id,
+  def run_file(self, script, script_params, tmp_out_file, tmp_err_file,
+               timeout, tmp_structed_outfile, callback, task_id,
                override_output_files = True, backup_log_files = True,
                handle = None, log_info_on_failure=True):
     pythonCommand = self.python_command(script, script_params)
@@ -50,7 +50,7 @@ class PythonReflectiveExecutor(PythonExecutor):
       logger.debug("Running command reflectively %s", pprint.pformat(pythonCommand))
     
     script_dir = os.path.dirname(script)
-    self.open_subprocess32_files(tmpoutfile, tmperrfile, override_output_files, backup_log_files)
+    self.open_subprocess32_files(tmp_out_file, tmp_err_file, override_output_files, backup_log_files)
     returncode = 1
 
     try:
@@ -69,7 +69,7 @@ class PythonReflectiveExecutor(PythonExecutor):
     else: 
       returncode = 0
       
-    return self.prepare_process_result(returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout)
+    return self.prepare_process_result(returncode, tmp_out_file, tmp_err_file, tmp_structed_outfile, timeout=timeout)
   
 class PythonContext:
   """
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index 78c430d..0ce65c9 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -21,6 +21,7 @@ import pprint
 
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.LiveStatus import LiveStatus
+from ambari_agent.models.commands import CommandStatus, RoleCommand, CustomCommand, AgentCommand
 
 logger = logging.getLogger()
 
@@ -576,25 +577,25 @@ class RecoveryManager:
     if self.ROLE_COMMAND not in command or not self.configured_for_recovery(command['role']):
       return
 
-    if status == ActionQueue.COMPLETED_STATUS:
-      if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
+    if status == CommandStatus.completed:
+      if command[self.ROLE_COMMAND] == RoleCommand.start:
         self.update_current_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
         logger.info("After EXECUTION_COMMAND (START), with taskId={}, current state of {} to {}".format(
           command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
 
-      elif command['roleCommand'] == ActionQueue.ROLE_COMMAND_STOP or command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
+      elif command['roleCommand'] == RoleCommand.stop or command[self.ROLE_COMMAND] == RoleCommand.install:
         self.update_current_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
         logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId={}, current state of {} to {}".format(
           command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
 
-      elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_CUSTOM_COMMAND:
-        if 'custom_command' in command and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART:
+      elif command[self.ROLE_COMMAND] == RoleCommand.custom_command:
+        if 'custom_command' in command and command['custom_command'] == CustomCommand.restart:
           self.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
           logger.info("After EXECUTION_COMMAND (RESTART), current state of {} to {}".format(
             command[self.ROLE], self.get_current_status(command[self.ROLE])))
 
-    elif status == ActionQueue.FAILED_STATUS:
-      if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
+    elif status == CommandStatus.failed:
+      if command[self.ROLE_COMMAND] == RoleCommand.install:
         self.update_current_status(command[self.ROLE], self.INSTALL_FAILED)
         logger.info("After EXECUTION_COMMAND (INSTALL), with taskId={}, current state of {} to {}".format(
           command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
@@ -606,25 +607,25 @@ class RecoveryManager:
     if not self.enabled():
       return
 
-    if self.COMMAND_TYPE not in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
+    if self.COMMAND_TYPE not in command or not command[self.COMMAND_TYPE] == AgentCommand.execution:
       return
 
     if self.ROLE not in command:
       return
 
-    if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \
+    if command[self.ROLE_COMMAND] in (RoleCommand.install, RoleCommand.stop) \
         and self.configured_for_recovery(command[self.ROLE]):
 
       self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
       logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of {} to {}".format(
         command[self.ROLE], self.get_desired_status(command[self.ROLE])))
 
-    elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START and self.configured_for_recovery(command[self.ROLE]):
+    elif command[self.ROLE_COMMAND] == RoleCommand.start and self.configured_for_recovery(command[self.ROLE]):
       self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
       logger.info("Received EXECUTION_COMMAND (START), desired state of {} to {}".format(
         command[self.ROLE], self.get_desired_status(command[self.ROLE])))
 
-    elif 'custom_command' in command and  command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \
+    elif 'custom_command' in command and command['custom_command'] == CustomCommand.restart \
             and self.configured_for_recovery(command[self.ROLE]):
 
       self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
@@ -644,7 +645,7 @@ class RecoveryManager:
       command = {
         self.CLUSTER_ID: self.cluster_id,
         self.ROLE_COMMAND: command_name,
-        self.COMMAND_TYPE: ActionQueue.AUTO_EXECUTION_COMMAND,
+        self.COMMAND_TYPE: AgentCommand.auto_execution,
         self.TASK_ID: command_id,
         self.ROLE: component,
         self.COMMAND_ID: command_id
diff --git a/ambari-server/src/main/resources/stack-hooks/before-RESTART/scripts/hook.py b/ambari-agent/src/main/python/ambari_agent/models/__init__.py
similarity index 79%
copy from ambari-server/src/main/resources/stack-hooks/before-RESTART/scripts/hook.py
copy to ambari-agent/src/main/python/ambari_agent/models/__init__.py
index 14b9d99..53ed4d3 100644
--- a/ambari-server/src/main/resources/stack-hooks/before-RESTART/scripts/hook.py
+++ b/ambari-agent/src/main/python/ambari_agent/models/__init__.py
@@ -14,16 +14,4 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-
 """
-
-from resource_management import *
-
-class BeforeRestartHook(Hook):
-
-  def hook(self, env):
-    self.run_custom_hook('before-START')
-
-if __name__ == "__main__":
-  BeforeRestartHook().execute()
-
diff --git a/ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/hook.py b/ambari-agent/src/main/python/ambari_agent/models/commands.py
similarity index 53%
copy from ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/hook.py
copy to ambari-agent/src/main/python/ambari_agent/models/commands.py
index c34be0b..eb96e9a 100644
--- a/ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/hook.py
+++ b/ambari-agent/src/main/python/ambari_agent/models/commands.py
@@ -14,23 +14,33 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-
 """
 
-from resource_management import *
-from shared_initialization import *
 
-class BeforeAnyHook(Hook):
+class AgentCommand(object):
+  status = "STATUS_COMMAND"
+  execution = "EXECUTION_COMMAND"
+  auto_execution = "AUTO_EXECUTION_COMMAND"
+  background_execution = "BACKGROUND_EXECUTION_COMMAND"
+
+  AUTO_EXECUTION_COMMAND_GROUP = [execution, auto_execution, background_execution]
+  EXECUTION_COMMAND_GROUP = [execution, background_execution]
+
+
+class RoleCommand(object):
+  install = 'INSTALL'
+  start = 'START'
+  stop = 'STOP'
+  custom_command = 'CUSTOM_COMMAND'
 
-  def hook(self, env):
-    import params
-    env.set_params(params)
 
-    setup_users()
-    if params.has_namenode or params.dfs_type == 'HCFS':
-      setup_hadoop_env()
-    setup_java()
+class CustomCommand(object):
+  restart = 'RESTART'
+  reconfigure = 'RECONFIGURE'
+  start = RoleCommand.start
 
-if __name__ == "__main__":
-  BeforeAnyHook().execute()
 
+class CommandStatus(object):
+  in_progress = 'IN_PROGRESS'
+  completed = 'COMPLETED'
+  failed = 'FAILED'
diff --git a/ambari-server/src/main/resources/stack-hooks/before-RESTART/scripts/hook.py b/ambari-agent/src/main/python/ambari_agent/models/hooks.py
similarity index 79%
copy from ambari-server/src/main/resources/stack-hooks/before-RESTART/scripts/hook.py
copy to ambari-agent/src/main/python/ambari_agent/models/hooks.py
index 14b9d99..f9183bd 100644
--- a/ambari-server/src/main/resources/stack-hooks/before-RESTART/scripts/hook.py
+++ b/ambari-agent/src/main/python/ambari_agent/models/hooks.py
@@ -14,16 +14,9 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-
 """
 
-from resource_management import *
-
-class BeforeRestartHook(Hook):
-
-  def hook(self, env):
-    self.run_custom_hook('before-START')
-
-if __name__ == "__main__":
-  BeforeRestartHook().execute()
 
+class HookPrefix(object):
+  pre = "before"
+  post = "after"
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index d75fbb2..57740b4 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -29,6 +29,7 @@ from threading import Thread
 import copy
 import signal
 
+from ambari_agent.models.commands import CommandStatus, AgentCommand
 from mock.mock import patch, MagicMock, call
 from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
 from ambari_agent.PythonExecutor import PythonExecutor
@@ -365,10 +366,10 @@ class TestActionQueue(TestCase):
     
     actionQueue = ActionQueue(initializer_module)
     execution_command = {
-      'commandType' : ActionQueue.EXECUTION_COMMAND,
+      'commandType' : AgentCommand.execution,
     }
     status_command = {
-      'commandType' : ActionQueue.STATUS_COMMAND,
+      'commandType' : AgentCommand.status,
     }
     wrong_command = {
       'commandType' : "SOME_WRONG_COMMAND",
@@ -1078,14 +1079,14 @@ class TestActionQueue(TestCase):
 
     execute_command = copy.deepcopy(self.background_command)
     actionQueue.put([execute_command])
-    actionQueue.processBackgroundQueueSafeEmpty();
+    actionQueue.process_background_queue_safe_empty()
     # actionQueue.controller.statusCommandExecutor.process_results();
     
     # assert that python execturor start
     self.assertTrue(runCommand_mock.called)
     runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
     self.assertTrue(runningCommand is not None)
-    self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
+    self.assertEqual(runningCommand[1]['status'], CommandStatus.in_progress)
     
     reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
     self.assertEqual(len(reports), 1)
@@ -1130,7 +1131,7 @@ class TestActionQueue(TestCase):
     actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
                                                                  None, command_complete_w)
     actionQueue.put([self.background_command])
-    actionQueue.processBackgroundQueueSafeEmpty();
+    actionQueue.process_background_queue_safe_empty();
     
     with lock:
       complete_done.wait(0.1)
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCommandHooksOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCommandHooksOrchestrator.py
new file mode 100644
index 0000000..29256fc
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/TestCommandHooksOrchestrator.py
@@ -0,0 +1,89 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import os
+from unittest import TestCase
+
+from ambari_agent.models.hooks import HookPrefix
+from mock.mock import patch
+from ambari_agent.CommandHooksOrchestrator import HookSequenceBuilder, ResolvedHooks, HooksOrchestrator
+
+
+class TestCommandHooksOrchestrator(TestCase):
+  def setUp(self):
+    def injector():
+      pass
+
+    def file_cache():
+      pass
+
+    file_cache.__setattr__("get_hook_base_dir", lambda x: os.path.join("tmp"))
+    injector.__setattr__("file_cache", file_cache)
+
+    self._orchestrator = HooksOrchestrator(injector)
+
+  @patch("os.path.isfile")
+  def test_check_orchestrator(self, is_file_mock):
+    is_file_mock.return_value = True
+
+    ret = self._orchestrator.resolve_hooks({
+     "commandType": "EXECUTION_COMMAND",
+     "serviceName": "ZOOKEEPER",
+     "role": "ZOOKEEPER_SERVER"
+    }, "START")
+
+    self.assertTrue(ret)
+    self.assertEquals(len(ret.post_hooks), 3)
+    self.assertEquals(len(ret.pre_hooks), 3)
+
+  def test_hook_seq_builder(self):
+    seq = list(HookSequenceBuilder().build(HookPrefix.pre, "cmd", "srv", "role"))
+    seq_rev = list(HookSequenceBuilder().build(HookPrefix.post, "cmd", "srv", "role"))
+
+    # testing base default sequence definition
+    check_list = [
+      "before-cmd",
+      "before-cmd-srv",
+      "before-cmd-srv-role"
+    ]
+
+    check_list_1 = [
+      "after-cmd-srv-role",
+      "after-cmd-srv",
+      "after-cmd"
+    ]
+
+    self.assertEquals(seq, check_list)
+    self.assertEquals(seq_rev, check_list_1)
+
+  def test_hook_resolved(self):
+    def pre():
+      for i in range(1, 5):
+        yield i
+
+    def post():
+      for i in range(1, 3):
+        yield i
+
+    ret = ResolvedHooks(pre(), post())
+
+    self.assertEqual(ret.pre_hooks, list(pre()))
+    self.assertEqual(ret.post_hooks, list(post()))
+
+
+
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index a885289..110a943 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -22,6 +22,8 @@ from multiprocessing.pool import ThreadPool
 import os
 
 import pprint
+
+from ambari_agent.models.commands import CommandStatus
 from ambari_commons import shell
 
 from unittest import TestCase
@@ -401,7 +403,7 @@ class TestCustomServiceOrchestrator:#(TestCase):
     get_configuration_mock.return_value = execute_command
     
     actionQueue.put([execute_command])
-    actionQueue.processBackgroundQueueSafeEmpty()
+    actionQueue.process_background_queue_safe_empty()
 
     time.sleep(.1)
 
@@ -419,7 +421,7 @@ class TestCustomServiceOrchestrator:#(TestCase):
 
     runningCommand = actionQueue.commandStatuses.get_command_status(19)
     self.assertTrue(runningCommand is not None)
-    self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS)
+    self.assertEqual(runningCommand['status'], CommandStatus.failed)
 
 
   @patch.object(ConfigurationBuilder, "get_configuration")
@@ -565,7 +567,7 @@ class TestCustomServiceOrchestrator:#(TestCase):
     import TestActionQueue
     pyex = PythonExecutor(orchestrator.tmp_dir, orchestrator.config)
     TestActionQueue.patch_output_file(pyex)
-    pyex.condenseOutput = MagicMock()
+    pyex.condense_output = MagicMock()
     get_py_executor_mock.return_value = pyex
     orchestrator.dump_command_to_json = MagicMock()
 
diff --git a/ambari-agent/src/test/python/ambari_agent/TestFileCache.py b/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
index 383050d..829f065 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
@@ -67,9 +67,12 @@ class TestFileCache(TestCase):
     command = {
       'commandParams' : {
         'service_package_folder' : os.path.join('stacks', 'HDP', '2.1.1', 'services', 'ZOOKEEPER', 'package')
+      },
+      'ambariLevelParams': {
+        'jdk_location': 'server_url_pref'
       }
     }
-    res = fileCache.get_service_base_dir(command, "server_url_pref")
+    res = fileCache.get_service_base_dir(command)
     self.assertEquals(
       pprint.pformat(provide_directory_mock.call_args_list[0][0]),
       "('/var/lib/ambari-agent/cache',\n "
@@ -84,9 +87,12 @@ class TestFileCache(TestCase):
     # Check missing parameter
     command = {
       'clusterLevelParams' : {
+      },
+      'ambariLevelParams': {
+        'jdk_location': 'server_url_pref'
       }
     }
-    base = fileCache.get_hook_base_dir(command, "server_url_pref")
+    base = fileCache.get_hook_base_dir(command)
     self.assertEqual(base, None)
     self.assertFalse(provide_directory_mock.called)
 
@@ -94,11 +100,14 @@ class TestFileCache(TestCase):
     command = {
       'clusterLevelParams' : {
         'hooks_folder' : 'stack-hooks'
+      },
+      'ambariLevelParams': {
+        'jdk_location': 'server_url_pref'
       }
     }
     provide_directory_mock.return_value = "dummy value"
     fileCache = FileCache(self.config)
-    res = fileCache.get_hook_base_dir(command, "server_url_pref")
+    res = fileCache.get_hook_base_dir(command)
     self.assertEquals(
       pprint.pformat(provide_directory_mock.call_args_list[0][0]),
       "('/var/lib/ambari-agent/cache', "
@@ -111,7 +120,11 @@ class TestFileCache(TestCase):
   def test_get_custom_actions_base_dir(self, provide_directory_mock):
     provide_directory_mock.return_value = "dummy value"
     fileCache = FileCache(self.config)
-    res = fileCache.get_custom_actions_base_dir("server_url_pref")
+    res = fileCache.get_custom_actions_base_dir({
+      'ambariLevelParams': {
+        'jdk_location': 'server_url_pref'
+      }
+    })
     self.assertEquals(
       pprint.pformat(provide_directory_mock.call_args_list[0][0]),
       "('/var/lib/ambari-agent/cache', 'custom_actions', 'server_url_pref')")
@@ -125,10 +138,13 @@ class TestFileCache(TestCase):
     command = {
       'commandParams': {
         'custom_folder' : 'dashboards'
-      }
+      },
+      'ambariLevelParams': {
+          'jdk_location': 'server_url_pref'
+        }
     }
 
-    res = fileCache.get_custom_resources_subdir(command, "server_url_pref")
+    res = fileCache.get_custom_resources_subdir(command)
     self.assertEquals(
       pprint.pformat(provide_directory_mock.call_args_list[0][0]),
       "('/var/lib/ambari-agent/cache', 'dashboards', 'server_url_pref')")
diff --git a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
index 472bf4c..76aa4e8 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
@@ -115,7 +115,7 @@ class TestPythonExecutor(TestCase):
     tmpstructuredoutfile = tmp_file.name
     tmp_file.close()
 
-    PYTHON_TIMEOUT_SECONDS =  5
+    PYTHON_TIMEOUT_SECONDS = 5
 
     def launch_python_subprocess32_method(command, tmpout, tmperr):
       subproc_mock.tmpout = tmpout
@@ -140,12 +140,12 @@ class TestPythonExecutor(TestCase):
     executor = PythonExecutor("/tmp", AmbariConfig())
 
     executor.python_process_has_been_killed = False
-    self.assertTrue(executor.isSuccessfull(0))
-    self.assertFalse(executor.isSuccessfull(1))
+    self.assertTrue(executor.is_successful(0))
+    self.assertFalse(executor.is_successful(1))
 
     executor.python_process_has_been_killed = True
-    self.assertFalse(executor.isSuccessfull(0))
-    self.assertFalse(executor.isSuccessfull(1))
+    self.assertFalse(executor.is_successful(0))
+    self.assertFalse(executor.is_successful(1))
 
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/hook.py b/ambari-common/src/main/python/resource_management/libraries/script/hook.py
index 26b73a3..63edcbe 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/hook.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/hook.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env ambari-python-wrap
 
-'''
+"""
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -16,7 +16,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-'''
+"""
 
 __all__ = ["Hook"]
 
@@ -24,14 +24,14 @@ from resource_management.libraries.script import Script
 from ambari_commons import subprocess32
 import sys
 
+
 class Hook(Script):
   """
   Executes a hook for a command for custom service. stdout and stderr are written to
   tmpoutfile and to tmperrfile respectively.
   """
 
-  HOOK_METHOD_NAME = "hook" # This method is always executed at hooks
-
+  HOOK_METHOD_NAME = "hook"  # This method is always executed at hooks
 
   def choose_method_to_execute(self, command_name):
     """
@@ -39,28 +39,25 @@ class Hook(Script):
     """
     return super(Hook, self).choose_method_to_execute(self.HOOK_METHOD_NAME)
 
-
   def run_custom_hook(self, command):
     """
     Runs custom hook
     """
     args = sys.argv
     
-    #Hook script to run
+    # Hook script to run
     args[0] = args[0].replace('before-'+args[1], command)
     args[0] = args[0].replace('after-'+args[1], command)
     
-    #Hook script base directory
+    # Hook script base directory
     args[3] = args[3].replace('before-'+args[1], command)
     args[3] = args[3].replace('after-'+args[1], command)
     
     args[1] = command.split("-")[1]
 
-
     cmd = [sys.executable]
     cmd.extend(args)
 
     if subprocess32.call(cmd) != 0:
-      self.fail_with_error("Error: Unable to run the custom hook script " +
-                           cmd.__str__())
+      self.fail_with_error("Error: Unable to run the custom hook script " + cmd.__str__())
 
diff --git a/ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py b/ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py
index 11537f8..92d1b89 100644
--- a/ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py
+++ b/ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py
@@ -117,8 +117,6 @@ class ExecuteUpgradeTasks(Script):
         if task.script and task.function:
           file_cache = FileCache(agent_config)
 
-          server_url_prefix = default('/ambariLevelParams/jdk_location', "")
-
           if service_package_folder and hooks_folder:
             command_paths = {
               "commandParams": {
@@ -126,12 +124,19 @@ class ExecuteUpgradeTasks(Script):
               },
               "clusterLevelParams": {
                    "hooks_folder": hooks_folder
+              },
+              "ambariLevelParams": {
+                "jdk_location": default('/ambariLevelParams/jdk_location', "")
               }
             } 
 
-            base_dir = file_cache.get_service_base_dir(command_paths, server_url_prefix)
+            base_dir = file_cache.get_service_base_dir(command_paths)
           else:
-            base_dir = file_cache.get_custom_actions_base_dir(server_url_prefix)
+            base_dir = file_cache.get_custom_actions_base_dir({
+              "ambariLevelParams": {
+                "jdk_location": default('/ambariLevelParams/jdk_location', "")
+              }
+            })
 
           script_path = os.path.join(base_dir, task.script)
           if not os.path.exists(script_path):
diff --git a/ambari-server/src/main/resources/stack-hooks/after-INSTALL/scripts/hook.py b/ambari-server/src/main/resources/stack-hooks/after-INSTALL/scripts/hook.py
index 8bae9e6..39546b1 100644
--- a/ambari-server/src/main/resources/stack-hooks/after-INSTALL/scripts/hook.py
+++ b/ambari-server/src/main/resources/stack-hooks/after-INSTALL/scripts/hook.py
@@ -22,6 +22,7 @@ from shared_initialization import link_configs
 from shared_initialization import setup_config
 from shared_initialization import setup_stack_symlinks
 
+
 class AfterInstallHook(Hook):
 
   def hook(self, env):
@@ -33,5 +34,6 @@ class AfterInstallHook(Hook):
 
     link_configs(self.stroutfile)
 
+
 if __name__ == "__main__":
   AfterInstallHook().execute()
diff --git a/ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/hook.py b/ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/hook.py
index c34be0b..8b93f7b 100644
--- a/ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/hook.py
+++ b/ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/hook.py
@@ -17,8 +17,10 @@ limitations under the License.
 
 """
 
-from resource_management import *
-from shared_initialization import *
+
+from shared_initialization import setup_users, setup_hadoop_env, setup_java
+from resource_management import Hook
+
 
 class BeforeAnyHook(Hook):
 
@@ -31,6 +33,7 @@ class BeforeAnyHook(Hook):
       setup_hadoop_env()
     setup_java()
 
+
 if __name__ == "__main__":
   BeforeAnyHook().execute()
 
diff --git a/ambari-server/src/main/resources/stack-hooks/before-INSTALL/scripts/hook.py b/ambari-server/src/main/resources/stack-hooks/before-INSTALL/scripts/hook.py
index ce17776..c470965 100644
--- a/ambari-server/src/main/resources/stack-hooks/before-INSTALL/scripts/hook.py
+++ b/ambari-server/src/main/resources/stack-hooks/before-INSTALL/scripts/hook.py
@@ -16,11 +16,10 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+from resource_management import Hook
+from shared_initialization import install_packages
+from repo_initialization import install_repos
 
-import sys
-from resource_management import *
-from shared_initialization import *
-from repo_initialization import *
 
 class BeforeInstallHook(Hook):
 
@@ -33,5 +32,6 @@ class BeforeInstallHook(Hook):
     install_repos()
     install_packages()
 
+
 if __name__ == "__main__":
   BeforeInstallHook().execute()
diff --git a/ambari-server/src/main/resources/stack-hooks/before-RESTART/scripts/hook.py b/ambari-server/src/main/resources/stack-hooks/before-RESTART/scripts/hook.py
index 14b9d99..f7f4f1c 100644
--- a/ambari-server/src/main/resources/stack-hooks/before-RESTART/scripts/hook.py
+++ b/ambari-server/src/main/resources/stack-hooks/before-RESTART/scripts/hook.py
@@ -16,14 +16,15 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+from resource_management import Hook
 
-from resource_management import *
 
 class BeforeRestartHook(Hook):
 
   def hook(self, env):
     self.run_custom_hook('before-START')
 
+
 if __name__ == "__main__":
   BeforeRestartHook().execute()
 
diff --git a/ambari-server/src/main/resources/stack-hooks/before-SET_KEYTAB/scripts/hook.py b/ambari-server/src/main/resources/stack-hooks/before-SET_KEYTAB/scripts/hook.py
index 4d028fc..289475b 100644
--- a/ambari-server/src/main/resources/stack-hooks/before-SET_KEYTAB/scripts/hook.py
+++ b/ambari-server/src/main/resources/stack-hooks/before-SET_KEYTAB/scripts/hook.py
@@ -16,8 +16,8 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+from resource_management import Hook
 
-from resource_management import *
 
 class BeforeSetKeytabHook(Hook):
 
@@ -33,6 +33,7 @@ class BeforeSetKeytabHook(Hook):
     """
     self.run_custom_hook('before-ANY')
 
+
 if __name__ == "__main__":
   BeforeSetKeytabHook().execute()
 
diff --git a/ambari-server/src/main/resources/stack-hooks/before-START/scripts/hook.py b/ambari-server/src/main/resources/stack-hooks/before-START/scripts/hook.py
index 4cb276a..2f68cb1 100644
--- a/ambari-server/src/main/resources/stack-hooks/before-START/scripts/hook.py
+++ b/ambari-server/src/main/resources/stack-hooks/before-START/scripts/hook.py
@@ -16,13 +16,12 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
-
-import sys
-from resource_management import *
 from rack_awareness import create_topology_script_and_mapping
-from shared_initialization import setup_hadoop, setup_configs, create_javahome_symlink, setup_unlimited_key_jce_policy
+from shared_initialization import setup_hadoop, setup_configs, create_javahome_symlink, setup_unlimited_key_jce_policy, \
+  Hook
 from custom_extensions import setup_extensions
 
+
 class BeforeStartHook(Hook):
 
   def hook(self, env):
@@ -39,5 +38,6 @@ class BeforeStartHook(Hook):
     if params.stack_supports_hadoop_custom_extensions:
       setup_extensions()
 
+
 if __name__ == "__main__":
   BeforeStartHook().execute()