You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2014/08/01 15:22:08 UTC
[3/5] AMBARI-6682. Add generate_config method to Script (aonishuk)
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-agent/src/main/python/resource_management/libraries/script/script.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/resource_management/libraries/script/script.py b/ambari-agent/src/main/python/resource_management/libraries/script/script.py
deleted file mode 100644
index bf5bae5..0000000
--- a/ambari-agent/src/main/python/resource_management/libraries/script/script.py
+++ /dev/null
@@ -1,229 +0,0 @@
-#!/usr/bin/env python
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-__all__ = ["Script"]
-
-import os
-import sys
-import json
-import logging
-
-from resource_management.core.environment import Environment
-from resource_management.core.exceptions import Fail, ClientComponentHasNoStatus, ComponentIsNotRunning
-from resource_management.core.resources.packaging import Package
-from resource_management.libraries.script.config_dictionary import ConfigDictionary
-
-USAGE = """Usage: {0} <COMMAND> <JSON_CONFIG> <BASEDIR> <STROUTPUT> <LOGGING_LEVEL> <TMP_DIR>
-
-<COMMAND> command type (INSTALL/CONFIGURE/START/STOP/SERVICE_CHECK...)
-<JSON_CONFIG> path to command json file. Ex: /var/lib/ambari-agent/data/command-2.json
-<BASEDIR> path to service metadata dir. Ex: /var/lib/ambari-agent/cache/stacks/HDP/2.0.6/services/HDFS
-<STROUTPUT> path to file with structured command output (file will be created). Ex:/tmp/my.txt
-<LOGGING_LEVEL> log level for stdout. Ex:DEBUG,INFO
-<TMP_DIR> temporary directory for executable scripts. Ex: /var/lib/ambari-agent/data/tmp
-"""
-
-class Script(object):
- """
- Executes a command for custom service. stdout and stderr are written to
- tmpoutfile and to tmperrfile respectively.
- Script instances share configuration as a class parameter and therefore
- different Script instances can not be used from different threads at
- the same time within a single python process
-
- Accepted command line arguments mapping:
- 1 command type (START/STOP/...)
- 2 path to command json file
- 3 path to service metadata dir (Directory "package" inside service directory)
- 4 path to file with structured command output (file will be created)
- """
- structuredOut = {}
-
- def put_structured_out(self, sout):
- Script.structuredOut.update(sout)
- try:
- with open(self.stroutfile, 'w') as fp:
- json.dump(Script.structuredOut, fp)
- except IOError:
- Script.structuredOut.update({"errMsg" : "Unable to write to " + self.stroutfile})
-
- def execute(self):
- """
- Sets up logging;
- Parses command parameters and executes method relevant to command type
- """
- # set up logging (two separate loggers for stderr and stdout with different loglevels)
- logger = logging.getLogger('resource_management')
- logger.setLevel(logging.DEBUG)
- formatter = logging.Formatter('%(asctime)s - %(message)s')
- chout = logging.StreamHandler(sys.stdout)
- chout.setLevel(logging.INFO)
- chout.setFormatter(formatter)
- cherr = logging.StreamHandler(sys.stderr)
- cherr.setLevel(logging.ERROR)
- cherr.setFormatter(formatter)
- logger.addHandler(cherr)
- logger.addHandler(chout)
-
- # parse arguments
- if len(sys.argv) < 7:
- logger.error("Script expects at least 6 arguments")
- print USAGE.format(os.path.basename(sys.argv[0])) # print to stdout
- sys.exit(1)
-
- command_name = str.lower(sys.argv[1])
- command_data_file = sys.argv[2]
- basedir = sys.argv[3]
- self.stroutfile = sys.argv[4]
- logging_level = sys.argv[5]
- Script.tmp_dir = sys.argv[6]
-
- logging_level_str = logging._levelNames[logging_level]
- chout.setLevel(logging_level_str)
- logger.setLevel(logging_level_str)
-
- try:
- with open(command_data_file, "r") as f:
- pass
- Script.config = ConfigDictionary(json.load(f))
- except IOError:
- logger.exception("Can not read json file with command parameters: ")
- sys.exit(1)
- # Run class method depending on a command type
- try:
- method = self.choose_method_to_execute(command_name)
- with Environment(basedir) as env:
- method(env)
- except ClientComponentHasNoStatus or ComponentIsNotRunning:
- # Support of component status checks.
- # Non-zero exit code is interpreted as an INSTALLED status of a component
- sys.exit(1)
- except Fail:
- logger.exception("Error while executing command '{0}':".format(command_name))
- sys.exit(1)
-
-
- def choose_method_to_execute(self, command_name):
- """
- Returns a callable object that should be executed for a given command.
- """
- self_methods = dir(self)
- if not command_name in self_methods:
- raise Fail("Script '{0}' has no method '{1}'".format(sys.argv[0], command_name))
- method = getattr(self, command_name)
- return method
-
-
- @staticmethod
- def get_config():
- """
- HACK. Uses static field to store configuration. This is a workaround for
- "circular dependency" issue when importing params.py file and passing to
- it a configuration instance.
- """
- return Script.config
-
-
- @staticmethod
- def get_tmp_dir():
- """
- HACK. Uses static field to avoid "circular dependency" issue when
- importing params.py.
- """
- return Script.tmp_dir
-
-
- def install(self, env):
- """
- Default implementation of install command is to install all packages
- from a list, received from the server.
- Feel free to override install() method with your implementation. It
- usually makes sense to call install_packages() manually in this case
- """
- self.install_packages(env)
-
-
- def install_packages(self, env, exclude_packages=[]):
- """
- List of packages that are required< by service is received from the server
- as a command parameter. The method installs all packages
- from this list
- """
- config = self.get_config()
-
- try:
- package_list_str = config['hostLevelParams']['package_list']
- if isinstance(package_list_str,basestring) and len(package_list_str) > 0:
- package_list = json.loads(package_list_str)
- for package in package_list:
- if not package['name'] in exclude_packages:
- name = package['name']
- Package(name)
- except KeyError:
- pass # No reason to worry
-
- #RepoInstaller.remove_repos(config)
-
-
-
- def fail_with_error(self, message):
- """
- Prints error message and exits with non-zero exit code
- """
- print("Error: " + message)
- sys.stderr.write("Error: " + message)
- sys.exit(1)
-
- def start(self, env):
- """
- To be overridden by subclasses
- """
- self.fail_with_error('start method isn\'t implemented')
-
- def stop(self, env):
- """
- To be overridden by subclasses
- """
- self.fail_with_error('stop method isn\'t implemented')
-
- def restart(self, env):
- """
- Default implementation of restart command is to call stop and start methods
- Feel free to override restart() method with your implementation.
- For client components we call install
- """
- config = self.get_config()
- componentCategory = None
- try :
- componentCategory = config['roleParams']['component_category']
- except KeyError:
- pass
-
- if componentCategory and componentCategory.strip().lower() == 'CLIENT'.lower():
- self.install(env)
- else:
- self.stop(env)
- self.start(env)
-
- def configure(self, env):
- """
- To be overridden by subclasses
- """
- self.fail_with_error('configure method isn\'t implemented')
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/__init__.py b/ambari-common/src/main/python/resource_management/__init__.py
new file mode 100644
index 0000000..fee91fd
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/__init__.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.libraries import *
+from resource_management.core import *
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/__init__.py b/ambari-common/src/main/python/resource_management/core/__init__.py
new file mode 100644
index 0000000..1af793b
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/__init__.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.base import *
+from resource_management.core.environment import *
+from resource_management.core.exceptions import *
+from resource_management.core.providers import *
+from resource_management.core.resources import *
+from resource_management.core.source import *
+from resource_management.core.system import *
+from resource_management.core.shell import *
+from resource_management.core.logger import *
+
+__version__ = "0.4.1"
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/base.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/base.py b/ambari-common/src/main/python/resource_management/core/base.py
new file mode 100644
index 0000000..52f1dff
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/base.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["Resource", "ResourceArgument", "ForcedListArgument",
+ "BooleanArgument"]
+
+from resource_management.core.exceptions import Fail, InvalidArgument
+from resource_management.core.environment import Environment
+from resource_management.core.logger import Logger
+
+class ResourceArgument(object):
+ def __init__(self, default=None, required=False):
+ self.required = False # Prevents the initial validate from failing
+ if hasattr(default, '__call__'):
+ self.default = default
+ else:
+ self.default = self.validate(default)
+ self.required = required
+
+ def validate(self, value):
+ if self.required and value is None:
+ raise InvalidArgument("Required argument %s missing" % self.name)
+ return value
+
+
+class ForcedListArgument(ResourceArgument):
+ def validate(self, value):
+ value = super(ForcedListArgument, self).validate(value)
+ if not isinstance(value, (tuple, list)):
+ value = [value]
+ return value
+
+
+class BooleanArgument(ResourceArgument):
+ def validate(self, value):
+ value = super(BooleanArgument, self).validate(value)
+ if not value in (True, False):
+ raise InvalidArgument(
+ "Expected a boolean for %s received %r" % (self.name, value))
+ return value
+
+
+class Accessor(object):
+ def __init__(self, name):
+ self.name = name
+
+ def __get__(self, obj, cls):
+ try:
+ return obj.arguments[self.name]
+ except KeyError:
+ val = obj._arguments[self.name].default
+ if hasattr(val, '__call__'):
+ val = val(obj)
+ return val
+
+ def __set__(self, obj, value):
+ obj.arguments[self.name] = obj._arguments[self.name].validate(value)
+
+
+class ResourceMetaclass(type):
+ # def __new__(cls, name, bases, attrs):
+ # super_new = super(ResourceMetaclass, cls).__new__
+ # return super_new(cls, name, bases, attrs)
+
+ def __init__(mcs, _name, bases, attrs):
+ mcs._arguments = getattr(bases[0], '_arguments', {}).copy()
+ for key, value in list(attrs.items()):
+ if isinstance(value, ResourceArgument):
+ value.name = key
+ mcs._arguments[key] = value
+ setattr(mcs, key, Accessor(key))
+
+
+class Resource(object):
+ __metaclass__ = ResourceMetaclass
+
+ action = ForcedListArgument(default="nothing")
+ ignore_failures = BooleanArgument(default=False)
+ not_if = ResourceArgument() # pass command e.g. not_if = ('ls','/root/jdk')
+ only_if = ResourceArgument() # pass command
+ initial_wait = ResourceArgument() # in seconds
+
+ actions = ["nothing"]
+
+ def __new__(cls, name, env=None, provider=None, **kwargs):
+ if isinstance(name, list):
+ while len(name) != 1:
+ cls(name.pop(0), env, provider, **kwargs)
+
+ name = name[0]
+
+ env = env or Environment.get_instance()
+ provider = provider or getattr(cls, 'provider', None)
+
+ r_type = cls.__name__
+ if r_type not in env.resources:
+ env.resources[r_type] = {}
+
+ obj = super(Resource, cls).__new__(cls)
+ env.resources[r_type][name] = obj
+ env.resource_list.append(obj)
+ return obj
+
+ def __init__(self, name, env=None, provider=None, **kwargs):
+ if isinstance(name, list):
+ name = name.pop(0)
+
+ if hasattr(self, 'name'):
+ return
+
+ self.env = env or Environment.get_instance()
+ self.name = name
+
+ self.provider = provider or getattr(self, 'provider', None)
+
+ self.arguments = {}
+ for key, value in kwargs.items():
+ try:
+ arg = self._arguments[key]
+ except KeyError:
+ raise Fail("%s received unsupported argument %s" % (self, key))
+ else:
+ try:
+ self.arguments[key] = arg.validate(value)
+ except InvalidArgument, exc:
+ raise InvalidArgument("%s %s" % (self, exc))
+
+ if not self.env.test_mode:
+ self.env.run()
+
+ def validate(self):
+ pass
+
+ def __repr__(self):
+ return "%s['%s']" % (self.__class__.__name__, self.name)
+
+ def __unicode__(self):
+ return u"%s['%s']" % (self.__class__.__name__, self.name)
+
+ def __getstate__(self):
+ return dict(
+ name=self.name,
+ provider=self.provider,
+ arguments=self.arguments,
+ env=self.env,
+ )
+
+ def __setstate__(self, state):
+ self.name = state['name']
+ self.provider = state['provider']
+ self.arguments = state['arguments']
+ self.env = state['env']
+
+ self.validate()
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/environment.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/environment.py b/ambari-common/src/main/python/resource_management/core/environment.py
new file mode 100644
index 0000000..8f0ec27
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/environment.py
@@ -0,0 +1,198 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["Environment"]
+
+import os
+import shutil
+import time
+from datetime import datetime
+
+from resource_management.core import shell
+from resource_management.core.exceptions import Fail
+from resource_management.core.providers import find_provider
+from resource_management.core.utils import AttributeDictionary
+from resource_management.core.system import System
+from resource_management.core.logger import Logger
+
+
+class Environment(object):
+ _instances = []
+
+ def __init__(self, basedir=None, test_mode=False):
+ """
+ @param basedir: basedir/files, basedir/templates are the places where templates / static files
+ are looked up
+ @param test_mode: if this is enabled, resources won't be executed until manualy running env.run().
+ """
+ self.reset(basedir, test_mode)
+
+ def reset(self, basedir, test_mode):
+ self.system = System.get_instance()
+ self.config = AttributeDictionary()
+ self.resources = {}
+ self.resource_list = []
+ self.delayed_actions = set()
+ self.test_mode = test_mode
+ self.update_config({
+ # current time
+ 'date': datetime.now(),
+ # backups here files which were rewritten while executing File resource
+ 'backup.path': '/tmp/resource_management/backup',
+ # prefix for this files
+ 'backup.prefix': datetime.now().strftime("%Y%m%d%H%M%S"),
+ # dir where templates,failes dirs are
+ 'basedir': basedir,
+ # variables, which can be used in templates
+ 'params': {},
+ })
+
+ def backup_file(self, path):
+ if self.config.backup:
+ if not os.path.exists(self.config.backup.path):
+ os.makedirs(self.config.backup.path, 0700)
+ new_name = self.config.backup.prefix + path.replace('/', '-')
+ backup_path = os.path.join(self.config.backup.path, new_name)
+ Logger.info("backing up %s to %s" % (path, backup_path))
+ shutil.copy(path, backup_path)
+
+ def update_config(self, attributes, overwrite=True):
+ for key, value in attributes.items():
+ attr = self.config
+ path = key.split('.')
+ for pth in path[:-1]:
+ if pth not in attr:
+ attr[pth] = AttributeDictionary()
+ attr = attr[pth]
+ if overwrite or path[-1] not in attr:
+ attr[path[-1]] = value
+
+ def set_params(self, arg):
+ """
+ @param arg: is a dictionary of configurations, or a module with the configurations
+ """
+ if isinstance(arg, dict):
+ variables = arg
+ else:
+ variables = dict((var, getattr(arg, var)) for var in dir(arg))
+
+ for variable, value in variables.iteritems():
+ # don't include system variables, methods, classes, modules
+ if not variable.startswith("__") and \
+ not hasattr(value, '__call__')and \
+ not hasattr(value, '__file__'):
+ self.config.params[variable] = value
+
+ def run_action(self, resource, action):
+ Logger.debug("Performing action %s on %s" % (action, resource))
+
+ provider_class = find_provider(self, resource.__class__.__name__,
+ resource.provider)
+ provider = provider_class(resource)
+ try:
+ provider_action = getattr(provider, 'action_%s' % action)
+ except AttributeError:
+ raise Fail("%r does not implement action %s" % (provider, action))
+ provider_action()
+
+ def _check_condition(self, cond):
+ if hasattr(cond, '__call__'):
+ return cond()
+
+ if isinstance(cond, basestring):
+ ret, out = shell.call(cond)
+ return ret == 0
+
+ raise Exception("Unknown condition type %r" % cond)
+
+ def run(self):
+ with self:
+ # Run resource actions
+ while self.resource_list:
+ resource = self.resource_list.pop(0)
+ Logger.info_resource(resource)
+
+ if resource.initial_wait:
+ time.sleep(resource.initial_wait)
+
+ if resource.not_if is not None and self._check_condition(
+ resource.not_if):
+ Logger.info("Skipping %s due to not_if" % resource)
+ continue
+
+ if resource.only_if is not None and not self._check_condition(
+ resource.only_if):
+ Logger.info("Skipping %s due to only_if" % resource)
+ continue
+
+ for action in resource.action:
+ if not resource.ignore_failures:
+ self.run_action(resource, action)
+ else:
+ try:
+ self.run_action(resource, action)
+ except Exception as ex:
+ Logger.info("Skipping failure of %s due to ignore_failures. Failure reason: %s" % (resource, str(ex)))
+ pass
+
+ # Run delayed actions
+ while self.delayed_actions:
+ action, resource = self.delayed_actions.pop()
+ self.run_action(resource, action)
+
+ @classmethod
+ def get_instance(cls):
+ return cls._instances[-1]
+
+ @classmethod
+ def get_instance_copy(cls):
+ """
+ Copy only configurations, but not resources execution state
+ """
+ old_instance = cls.get_instance()
+ new_instance = Environment()
+ new_instance.config = old_instance.config.copy()
+
+ return new_instance
+
+ def __enter__(self):
+ self.__class__._instances.append(self)
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.__class__._instances.pop()
+ return False
+
+ def __getstate__(self):
+ return dict(
+ config=self.config,
+ resources=self.resources,
+ resource_list=self.resource_list,
+ delayed_actions=self.delayed_actions,
+ )
+
+ def __setstate__(self, state):
+ self.__init__()
+ self.config = state['config']
+ self.resources = state['resources']
+ self.resource_list = state['resource_list']
+ self.delayed_actions = state['delayed_actions']
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/exceptions.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/exceptions.py b/ambari-common/src/main/python/resource_management/core/exceptions.py
new file mode 100644
index 0000000..3c001cc
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/exceptions.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+class Fail(Exception):
+ pass
+
+class ExecuteTimeoutException(Exception):
+ pass
+
+class InvalidArgument(Fail):
+ pass
+
+class ClientComponentHasNoStatus(Fail):
+ """
+ Thrown when status() method is called for a CLIENT component.
+ The only valid status for CLIENT component is installed,
+ that's why exception is thrown and later silently processed at script.py
+ """
+ pass
+
+class ComponentIsNotRunning(Fail):
+ """
+ Thrown when status() method is called for a component (only
+ in situations when component process is not running).
+ Later exception is silently processed at script.py
+ """
+ pass
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/logger.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/logger.py b/ambari-common/src/main/python/resource_management/core/logger.py
new file mode 100644
index 0000000..da64f6a
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/logger.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["Logger"]
+import logging
+from resource_management.libraries.script.config_dictionary import UnknownConfiguration
+
+class Logger:
+ logger = logging.getLogger("resource_management")
+
+ # unprotected_strings : protected_strings map
+ sensitive_strings = {}
+
+ @staticmethod
+ def info(text):
+ Logger.logger.info(Logger.get_protected_text(text))
+
+ @staticmethod
+ def debug(text):
+ Logger.logger.debug(Logger.get_protected_text(text))
+
+ @staticmethod
+ def info_resource(resource):
+ Logger.info(Logger.get_protected_text(Logger._get_resource_repr(resource)))
+
+ @staticmethod
+ def debug_resource(resource):
+ Logger.debug(Logger.get_protected_text(Logger._get_resource_repr(resource)))
+
+ @staticmethod
+ def get_protected_text(text):
+ """
+ Replace passwords with [PROTECTED]
+ """
+ for unprotected_string, protected_string in Logger.sensitive_strings.iteritems():
+ text = text.replace(unprotected_string, protected_string)
+
+ return text
+
+ @staticmethod
+ def _get_resource_repr(resource):
+ MESSAGE_MAX_LEN = 256
+ logger_level = logging._levelNames[Logger.logger.level]
+
+ arguments_str = ""
+ for x,y in resource.arguments.iteritems():
+
+ # strip unicode 'u' sign
+ if isinstance(y, unicode):
+ # don't show long messages
+ if len(y) > MESSAGE_MAX_LEN:
+ y = '...'
+ val = repr(y).lstrip('u')
+ # don't show dicts of configurations
+ # usually too long
+ elif logger_level != 'DEBUG' and isinstance(y, dict):
+ val = "..."
+ # for configs which didn't come
+ elif isinstance(y, UnknownConfiguration):
+ val = "[EMPTY]"
+ # correctly output 'mode' (as they are octal values like 0755)
+ elif y and x == 'mode':
+ val = oct(y)
+ else:
+ val = repr(y)
+
+
+ arguments_str += "'{0}': {1}, ".format(x, val)
+
+ if arguments_str:
+ arguments_str = arguments_str[:-2]
+
+ return "{0} {{{1}}}".format(resource, arguments_str)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/providers/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/__init__.py b/ambari-common/src/main/python/resource_management/core/providers/__init__.py
new file mode 100644
index 0000000..f22dc74
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/__init__.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["Provider", "find_provider"]
+
+from resource_management.core.exceptions import Fail
+from resource_management.libraries.providers import PROVIDERS as LIBRARY_PROVIDERS
+
+
+class Provider(object):
+ def __init__(self, resource):
+ self.resource = resource
+
+ def action_nothing(self):
+ pass
+
+ def __repr__(self):
+ return self.__unicode__()
+
+ def __unicode__(self):
+ return u"%s[%s]" % (self.__class__.__name__, self.resource)
+
+
+PROVIDERS = dict(
+ redhat=dict(
+ Package="resource_management.core.providers.package.yumrpm.YumProvider",
+ ),
+ suse=dict(
+ Package="resource_management.core.providers.package.zypper.ZypperProvider",
+ ),
+ debian=dict(
+ Package="resource_management.core.providers.package.apt.AptProvider",
+ ),
+ default=dict(
+ File="resource_management.core.providers.system.FileProvider",
+ Directory="resource_management.core.providers.system.DirectoryProvider",
+ Link="resource_management.core.providers.system.LinkProvider",
+ Execute="resource_management.core.providers.system.ExecuteProvider",
+ ExecuteScript="resource_management.core.providers.system.ExecuteScriptProvider",
+ Mount="resource_management.core.providers.mount.MountProvider",
+ User="resource_management.core.providers.accounts.UserProvider",
+ Group="resource_management.core.providers.accounts.GroupProvider",
+ Service="resource_management.core.providers.service.ServiceProvider",
+ ),
+)
+
+
+def find_provider(env, resource, class_path=None):
+ if not class_path:
+ providers = [PROVIDERS, LIBRARY_PROVIDERS]
+ for provider in providers:
+ if resource in provider[env.system.os_family]:
+ class_path = provider[env.system.os_family][resource]
+ break
+ if resource in provider["default"]:
+ class_path = provider["default"][resource]
+ break
+
+ try:
+ mod_path, class_name = class_path.rsplit('.', 1)
+ except ValueError:
+ raise Fail("Unable to find provider for %s as %s" % (resource, class_path))
+ mod = __import__(mod_path, {}, {}, [class_name])
+ return getattr(mod, class_name)
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/providers/accounts.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/accounts.py b/ambari-common/src/main/python/resource_management/core/providers/accounts.py
new file mode 100644
index 0000000..92a528b
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/accounts.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from __future__ import with_statement
+
+import grp
+import pwd
+from resource_management.core import shell
+from resource_management.core.providers import Provider
+from resource_management.core.logger import Logger
+
+
+class UserProvider(Provider):
+ def action_create(self):
+ if not self.user:
+ command = ['useradd', "-m"]
+ Logger.info("Adding user %s" % self.resource)
+ else:
+ command = ['usermod']
+ Logger.info("Modifying user %s" % (self.resource.username))
+
+ options = dict(
+ comment="-c",
+ gid="-g",
+ uid="-u",
+ shell="-s",
+ password="-p",
+ home="-d",
+ )
+
+ if self.resource.system and not self.user:
+ command.append("--system")
+
+ if self.resource.groups:
+ command += ["-G", ",".join(self.resource.groups)]
+
+ for option_name, option_flag in options.items():
+ option_value = getattr(self.resource, option_name)
+ if option_flag and option_value:
+ command += [option_flag, str(option_value)]
+
+ command.append(self.resource.username)
+
+ shell.checked_call(command)
+
+ def action_remove(self):
+ if self.user:
+ command = ['userdel', self.resource.username]
+ shell.checked_call(command)
+ Logger.info("Removed user %s" % self.resource)
+
+ @property
+ def user(self):
+ try:
+ return pwd.getpwnam(self.resource.username)
+ except KeyError:
+ return None
+
+
+class GroupProvider(Provider):
+ def action_create(self):
+ group = self.group
+ if not group:
+ command = ['groupadd']
+ Logger.info("Adding group %s" % self.resource)
+ else:
+ command = ['groupmod']
+ Logger.info("Modifying group %s" % (self.resource.group_name))
+
+ options = dict(
+ gid="-g",
+ password="-p",
+ )
+
+ for option_name, option_flag in options.items():
+ option_value = getattr(self.resource, option_name)
+ if option_flag and option_value:
+ command += [option_flag, str(option_value)]
+
+ command.append(self.resource.group_name)
+
+ shell.checked_call(command)
+
+ group = self.group
+
+ def action_remove(self):
+ if self.group:
+ command = ['groupdel', self.resource.group_name]
+ shell.checked_call(command)
+ Logger.info("Removed group %s" % self.resource)
+
+ @property
+ def group(self):
+ try:
+ return grp.getgrnam(self.resource.group_name)
+ except KeyError:
+ return None
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/providers/mount.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/mount.py b/ambari-common/src/main/python/resource_management/core/providers/mount.py
new file mode 100644
index 0000000..dc6d7d9
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/mount.py
@@ -0,0 +1,137 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from __future__ import with_statement
+
+import os
+import re
+from resource_management.core.base import Fail
+from resource_management.core.providers import Provider
+from resource_management.core.logger import Logger
+
+
+class MountProvider(Provider):
+ def action_mount(self):
+ if not os.path.exists(self.resource.mount_point):
+ os.makedirs(self.resource.mount_point)
+
+ if self.is_mounted():
+ Logger.debug("%s already mounted" % self)
+ else:
+ args = ["mount"]
+ if self.resource.fstype:
+ args += ["-t", self.resource.fstype]
+ if self.resource.options:
+ args += ["-o", ",".join(self.resource.options)]
+ if self.resource.device:
+ args.append(self.resource.device)
+ args.append(self.resource.mount_point)
+
+ check_call(args)
+
+ Logger.info("%s mounted" % self)
+
+ def action_umount(self):
+ if self.is_mounted():
+ check_call(["umount", self.resource.mount_point])
+
+ Logger.info("%s unmounted" % self)
+ else:
+ Logger.debug("%s is not mounted" % self)
+
+ def action_enable(self):
+ if self.is_enabled():
+ Logger.debug("%s already enabled" % self)
+ else:
+ if not self.resource.device:
+ raise Fail("[%s] device not set but required for enable action" % self)
+ if not self.resource.fstype:
+ raise Fail("[%s] fstype not set but required for enable action" % self)
+
+ with open("/etc/fstab", "a") as fp:
+ fp.write("%s %s %s %s %d %d\n" % (
+ self.resource.device,
+ self.resource.mount_point,
+ self.resource.fstype,
+ ",".join(self.resource.options or ["defaults"]),
+ self.resource.dump,
+ self.resource.passno,
+ ))
+
+ Logger.info("%s enabled" % self)
+
+ def action_disable(self):
+ pass # TODO
+
+ def is_mounted(self):
+ if not os.path.exists(self.resource.mount_point):
+ return False
+
+ if self.resource.device and not os.path.exists(self.resource.device):
+ raise Fail("%s Device %s does not exist" % (self, self.resource.device))
+
+ mounts = self.get_mounted()
+ for m in mounts:
+ if m['mount_point'] == self.resource.mount_point:
+ return True
+
+ return False
+
+ def is_enabled(self):
+ mounts = self.get_fstab()
+ for m in mounts:
+ if m['mount_point'] == self.resource.mount_point:
+ return True
+
+ return False
+
+ def get_mounted(self):
+ p = Popen("mount", stdout=PIPE, stderr=STDOUT, shell=True)
+ out = p.communicate()[0]
+ if p.wait() != 0:
+ raise Fail("[%s] Getting list of mounts (calling mount) failed" % self)
+
+ mounts = [x.split(' ') for x in out.strip().split('\n')]
+
+ return [dict(
+ device=m[0],
+ mount_point=m[2],
+ fstype=m[4],
+ options=m[5][1:-1].split(','),
+ ) for m in mounts if m[1] == "on" and m[3] == "type"]
+
+ def get_fstab(self):
+ mounts = []
+ with open("/etc/fstab", "r") as fp:
+ for line in fp:
+ line = line.split('#', 1)[0].strip()
+ mount = re.split('\s+', line)
+ if len(mount) == 6:
+ mounts.append(dict(
+ device=mount[0],
+ mount_point=mount[1],
+ fstype=mount[2],
+ options=mount[3].split(","),
+ dump=int(mount[4]),
+ passno=int(mount[5]),
+ ))
+ return mounts
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/providers/package/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/package/__init__.py b/ambari-common/src/main/python/resource_management/core/providers/package/__init__.py
new file mode 100644
index 0000000..5ab2b27
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/package/__init__.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.base import Fail
+from resource_management.core.providers import Provider
+
+
+class PackageProvider(Provider):
+ def __init__(self, *args, **kwargs):
+ super(PackageProvider, self).__init__(*args, **kwargs)
+
+ def install_package(self, name, version):
+ raise NotImplementedError()
+ def remove_package(self, name):
+ raise NotImplementedError()
+ def upgrade_package(self, name, version):
+ raise NotImplementedError()
+
+ def action_install(self):
+ package_name = self.get_package_name_with_version()
+ self.install_package(package_name)
+
+ def action_upgrade(self):
+ package_name = self.get_package_name_with_version()
+ self.upgrade_package(package_name)
+
+ def action_remove(self):
+ package_name = self.get_package_name_with_version()
+ self.remove_package(package_name)
+
+ def get_package_name_with_version(self):
+ if self.resource.version:
+ return self.resource.package_name + '-' + self.resource.version
+ else:
+ return self.resource.package_name
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/providers/package/apt.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/package/apt.py b/ambari-common/src/main/python/resource_management/core/providers/package/apt.py
new file mode 100644
index 0000000..4c6e2dd
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/package/apt.py
@@ -0,0 +1,60 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.providers.package import PackageProvider
+from resource_management.core import shell
+from resource_management.core.logger import Logger
+
+INSTALL_CMD = "DEBIAN_FRONTEND=noninteractive /usr/bin/apt-get -q -o Dpkg::Options::='--force-confdef' --allow-unauthenticated --assume-yes install %s"
+REPO_UPDATE_CMD = "apt-get update -qq"
+REMOVE_CMD = "/usr/bin/apt-get -y -q remove %s"
+CHECK_CMD = "dpkg --get-selections %s | grep -v deinstall"
+
+class AptProvider(PackageProvider):
+ def install_package(self, name):
+ if not self._check_existence(name):
+ cmd = INSTALL_CMD % (name)
+ Logger.info("Installing package %s ('%s')" % (name, cmd))
+ code = shell.call(cmd)[0]
+
+ # apt-get update wasn't done too long
+ if code:
+ Logger.info("Failed to install package %s. Executing `apt-get update`" % (name))
+ shell.checked_call(REPO_UPDATE_CMD)
+ Logger.info("Retrying to install package %s" % (name))
+ shell.checked_call(cmd)
+ else:
+ Logger.info("Skipping installing existent package %s" % (name))
+
+ def upgrade_package(self, name):
+ return self.install_package(name)
+
+ def remove_package(self, name):
+ if self._check_existence(name):
+ cmd = REMOVE_CMD % (name)
+ Logger.info("Removing package %s ('%s')" % (name, cmd))
+ shell.checked_call(cmd)
+ else:
+ Logger.info("Skipping removing non-existent package %s" % (name))
+
+ def _check_existence(self, name):
+ code, out = shell.call(CHECK_CMD % name)
+ return not bool(code)
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/providers/package/yumrpm.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/package/yumrpm.py b/ambari-common/src/main/python/resource_management/core/providers/package/yumrpm.py
new file mode 100644
index 0000000..7b729f8
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/package/yumrpm.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.providers.package import PackageProvider
+from resource_management.core import shell
+from resource_management.core.logger import Logger
+
+INSTALL_CMD = "/usr/bin/yum -d 0 -e 0 -y install %s"
+REMOVE_CMD = "/usr/bin/yum -d 0 -e 0 -y erase %s"
+CHECK_CMD = "rpm -q --quiet %s"
+
+class YumProvider(PackageProvider):
+ def install_package(self, name):
+ if not self._check_existence(name):
+ cmd = INSTALL_CMD % (name)
+ Logger.info("Installing package %s ('%s')" % (name, cmd))
+ shell.checked_call(cmd)
+ else:
+ Logger.info("Skipping installing existent package %s" % (name))
+
+ def upgrade_package(self, name):
+ return self.install_package(name)
+
+ def remove_package(self, name):
+ if self._check_existence(name):
+ cmd = REMOVE_CMD % (name)
+ Logger.info("Removing package %s ('%s')" % (name, cmd))
+ shell.checked_call(cmd)
+ else:
+ Logger.info("Skipping removing non-existent package %s" % (name))
+
+ def _check_existence(self, name):
+ code, out = shell.call(CHECK_CMD % name)
+ return not bool(code)
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/providers/package/zypper.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/package/zypper.py b/ambari-common/src/main/python/resource_management/core/providers/package/zypper.py
new file mode 100644
index 0000000..6577c47
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/package/zypper.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.providers.package import PackageProvider
+from resource_management.core import shell
+from resource_management.core.logger import Logger
+
+INSTALL_CMD = "/usr/bin/zypper --quiet install --auto-agree-with-licenses --no-confirm %s"
+REMOVE_CMD = "/usr/bin/zypper --quiet remove --no-confirm %s"
+CHECK_CMD = "rpm -q --quiet %s"
+
+class ZypperProvider(PackageProvider):
+ def install_package(self, name):
+ if not self._check_existence(name):
+ cmd = INSTALL_CMD % (name)
+ Logger.info("Installing package %s ('%s')" % (name, cmd))
+ shell.checked_call(cmd)
+ else:
+ Logger.info("Skipping installing existent package %s" % (name))
+
+ def upgrade_package(self, name):
+ return self.install_package(name)
+
+ def remove_package(self, name):
+ if self._check_existence(name):
+ cmd = REMOVE_CMD % (name)
+ Logger.info("Removing package %s ('%s')" % (name, cmd))
+ shell.checked_call(cmd)
+ else:
+ Logger.info("Skipping removing non-existent package %s" % (name))
+
+ def _check_existence(self, name):
+ code, out = shell.call(CHECK_CMD % name)
+ return not bool(code)
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/providers/service.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/service.py b/ambari-common/src/main/python/resource_management/core/providers/service.py
new file mode 100644
index 0000000..23b1b3a
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/service.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+import os
+
+from resource_management.core import shell
+from resource_management.core.base import Fail
+from resource_management.core.providers import Provider
+from resource_management.core.logger import Logger
+
+
+class ServiceProvider(Provider):
+ def action_start(self):
+ if not self.status():
+ self._exec_cmd("start", 0)
+
+ def action_stop(self):
+ if self.status():
+ self._exec_cmd("stop", 0)
+
+ def action_restart(self):
+ if not self.status():
+ self._exec_cmd("start", 0)
+ else:
+ self._exec_cmd("restart", 0)
+
+ def action_reload(self):
+ if not self.status():
+ self._exec_cmd("start", 0)
+ else:
+ self._exec_cmd("reload", 0)
+
+ def status(self):
+ return self._exec_cmd("status") == 0
+
+ def _exec_cmd(self, command, expect=None):
+ if command != "status":
+ Logger.info("%s command '%s'" % (self.resource, command))
+
+ custom_cmd = getattr(self.resource, "%s_command" % command, None)
+ if custom_cmd:
+ Logger.debug("%s executing '%s'" % (self.resource, custom_cmd))
+ if hasattr(custom_cmd, "__call__"):
+ if custom_cmd():
+ ret = 0
+ else:
+ ret = 1
+ else:
+ ret,out = shell.call(custom_cmd)
+ else:
+ ret = self._init_cmd(command)
+
+ if expect is not None and expect != ret:
+ raise Fail("%r command %s for service %s failed with return code: %d. %s" % (
+ self, command, self.resource.service_name, ret, out))
+ return ret
+
+ def _init_cmd(self, command):
+ if self._upstart:
+ if command == "status":
+ ret,out = shell.call(["/sbin/" + command, self.resource.service_name])
+ _proc, state = out.strip().split(' ', 1)
+ ret = 0 if state != "stop/waiting" else 1
+ else:
+ ret,out = shell.call(["/sbin/" + command, self.resource.service_name])
+ else:
+ ret,out = shell.call(["/etc/init.d/%s" % self.resource.service_name, command])
+ return ret
+
+ @property
+ def _upstart(self):
+ try:
+ return self.__upstart
+ except AttributeError:
+ self.__upstart = os.path.exists("/sbin/start") \
+ and os.path.exists("/etc/init/%s.conf" % self.resource.service_name)
+ return self.__upstart
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/providers/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/system.py b/ambari-common/src/main/python/resource_management/core/providers/system.py
new file mode 100644
index 0000000..33b9ad9
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/providers/system.py
@@ -0,0 +1,265 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from __future__ import with_statement
+
+import grp
+import os
+import pwd
+import time
+import shutil
+from resource_management.core import shell
+from resource_management.core.base import Fail
+from resource_management.core import ExecuteTimeoutException
+from resource_management.core.providers import Provider
+from resource_management.core.logger import Logger
+
+
+def _coerce_uid(user):
+ try:
+ uid = int(user)
+ except ValueError:
+ try:
+ uid = pwd.getpwnam(user).pw_uid
+ except KeyError:
+ raise Fail("User %s doesn't exist." % user)
+ return uid
+
+
+def _coerce_gid(group):
+ try:
+ gid = int(group)
+ except ValueError:
+ try:
+ gid = grp.getgrnam(group).gr_gid
+ except KeyError:
+ raise Fail("Group %s doesn't exist." % group)
+ return gid
+
+
+def _ensure_metadata(path, user, group, mode=None):
+ stat = os.stat(path)
+
+ if mode:
+ existing_mode = stat.st_mode & 07777
+ if existing_mode != mode:
+ Logger.info("Changing permission for %s from %o to %o" % (
+ path, existing_mode, mode))
+ os.chmod(path, mode)
+
+ if user:
+ uid = _coerce_uid(user)
+ if stat.st_uid != uid:
+ Logger.info(
+ "Changing owner for %s from %d to %s" % (path, stat.st_uid, user))
+ os.chown(path, uid, -1)
+
+ if group:
+ gid = _coerce_gid(group)
+ if stat.st_gid != gid:
+ Logger.info(
+ "Changing group for %s from %d to %s" % (path, stat.st_gid, group))
+ os.chown(path, -1, gid)
+
+
+class FileProvider(Provider):
+ def action_create(self):
+ path = self.resource.path
+
+ if os.path.isdir(path):
+ raise Fail("Applying %s failed, directory with name %s exists" % (self.resource, path))
+
+ dirname = os.path.dirname(path)
+ if not os.path.isdir(dirname):
+ raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
+
+ write = False
+ content = self._get_content()
+ if not os.path.exists(path):
+ write = True
+ reason = "it doesn't exist"
+ elif self.resource.replace:
+ if content is not None:
+ with open(path, "rb") as fp:
+ old_content = fp.read()
+ if content != old_content:
+ write = True
+ reason = "contents don't match"
+ if self.resource.backup:
+ self.resource.env.backup_file(path)
+
+ if write:
+ Logger.info("Writing %s because %s" % (self.resource, reason))
+ with open(path, "wb") as fp:
+ if content:
+ fp.write(content)
+
+ _ensure_metadata(self.resource.path, self.resource.owner,
+ self.resource.group, mode=self.resource.mode)
+
+ def action_delete(self):
+ path = self.resource.path
+
+ if os.path.isdir(path):
+ raise Fail("Applying %s failed, %s is directory not file!" % (self.resource, path))
+
+ if os.path.exists(path):
+ Logger.info("Deleting %s" % self.resource)
+ os.unlink(path)
+
+ def _get_content(self):
+ content = self.resource.content
+ if content is None:
+ return None
+ elif isinstance(content, basestring):
+ return content
+ elif hasattr(content, "__call__"):
+ return content()
+ raise Fail("Unknown source type for %s: %r" % (self, content))
+
+
+class DirectoryProvider(Provider):
+ def action_create(self):
+ path = self.resource.path
+ if not os.path.exists(path):
+ Logger.info("Creating directory %s" % self.resource)
+ if self.resource.recursive:
+ os.makedirs(path, self.resource.mode or 0755)
+ else:
+ dirname = os.path.dirname(path)
+ if not os.path.isdir(dirname):
+ raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
+
+ os.mkdir(path, self.resource.mode or 0755)
+
+ if not os.path.isdir(path):
+ raise Fail("Applying %s failed, file %s already exists" % (self.resource, path))
+
+ _ensure_metadata(path, self.resource.owner, self.resource.group,
+ mode=self.resource.mode)
+
+ def action_delete(self):
+ path = self.resource.path
+ if os.path.exists(path):
+ if not os.path.isdir(path):
+ raise Fail("Applying %s failed, %s is not a directory" % (self.resource, path))
+
+ Logger.info("Removing directory %s and all its content" % self.resource)
+ shutil.rmtree(path)
+
+
+class LinkProvider(Provider):
+ def action_create(self):
+ path = self.resource.path
+
+ if os.path.lexists(path):
+ oldpath = os.path.realpath(path)
+ if oldpath == self.resource.to:
+ return
+ if not os.path.islink(path):
+ raise Fail(
+ "%s trying to create a symlink with the same name as an existing file or directory" % self)
+ Logger.info("%s replacing old symlink to %s" % (self.resource, oldpath))
+ os.unlink(path)
+
+ if self.resource.hard:
+ if not os.path.exists(self.resource.to):
+ raise Fail("Failed to apply %s, linking to nonexistent location %s" % (self.resource, self.resource.to))
+ if os.path.isdir(self.resource.to):
+ raise Fail("Failed to apply %s, cannot create hard link to a directory (%s)" % (self.resource, self.resource.to))
+
+ Logger.info("Creating hard %s" % self.resource)
+ os.link(self.resource.to, path)
+ else:
+ if not os.path.exists(self.resource.to):
+ Logger.info("Warning: linking to nonexistent location %s" % self.resource.to)
+
+ Logger.info("Creating symbolic %s" % self.resource)
+ os.symlink(self.resource.to, path)
+
+ def action_delete(self):
+ path = self.resource.path
+ if os.path.exists(path):
+ Logger.info("Deleting %s" % self.resource)
+ os.unlink(path)
+
+
+def _preexec_fn(resource):
+ def preexec():
+ if resource.group:
+ gid = _coerce_gid(resource.group)
+ os.setgid(gid)
+ os.setegid(gid)
+
+ return preexec
+
+
+class ExecuteProvider(Provider):
+ def action_run(self):
+ if self.resource.creates:
+ if os.path.exists(self.resource.creates):
+ return
+
+ Logger.debug("Executing %s" % self.resource)
+
+ if self.resource.path != []:
+ if not self.resource.environment:
+ self.resource.environment = {}
+
+ self.resource.environment['PATH'] = os.pathsep.join(self.resource.path)
+
+ for i in range (0, self.resource.tries):
+ try:
+ shell.checked_call(self.resource.command, logoutput=self.resource.logoutput,
+ cwd=self.resource.cwd, env=self.resource.environment,
+ preexec_fn=_preexec_fn(self.resource), user=self.resource.user,
+ wait_for_finish=self.resource.wait_for_finish, timeout=self.resource.timeout)
+ break
+ except Fail as ex:
+ if i == self.resource.tries-1: # last try
+ raise ex
+ else:
+ Logger.info("Retrying after %d seconds. Reason: %s" % (self.resource.try_sleep, str(ex)))
+ time.sleep(self.resource.try_sleep)
+ except ExecuteTimeoutException:
+ err_msg = ("Execution of '%s' was killed due timeout after %d seconds") % (self.resource.command, self.resource.timeout)
+
+ if self.resource.on_timeout:
+ Logger.info("Executing '%s'. Reason: %s" % (self.resource.on_timeout, err_msg))
+ shell.checked_call(self.resource.on_timeout)
+ else:
+ raise Fail(err_msg)
+
+
+class ExecuteScriptProvider(Provider):
+ def action_run(self):
+ from tempfile import NamedTemporaryFile
+
+ Logger.info("Running script %s" % self.resource)
+ with NamedTemporaryFile(prefix="resource_management-script", bufsize=0) as tf:
+ tf.write(self.resource.code)
+ tf.flush()
+
+ _ensure_metadata(tf.name, self.resource.user, self.resource.group)
+ shell.call([self.resource.interpreter, tf.name],
+ cwd=self.resource.cwd, env=self.resource.environment,
+ preexec_fn=_preexec_fn(self.resource))
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/resources/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/__init__.py b/ambari-common/src/main/python/resource_management/core/resources/__init__.py
new file mode 100644
index 0000000..d5e903c
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/resources/__init__.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.resources.accounts import *
+from resource_management.core.resources.packaging import *
+from resource_management.core.resources.service import *
+from resource_management.core.resources.system import *
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/resources/accounts.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/accounts.py b/ambari-common/src/main/python/resource_management/core/resources/accounts.py
new file mode 100644
index 0000000..f498db5
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/resources/accounts.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+__all__ = ["Group", "User"]
+
+from resource_management.core.base import Resource, ForcedListArgument, ResourceArgument, BooleanArgument
+
+
+class Group(Resource):
+ action = ForcedListArgument(default="create")
+ group_name = ResourceArgument(default=lambda obj: obj.name)
+ gid = ResourceArgument()
+ password = ResourceArgument()
+
+ actions = Resource.actions + ["create", "remove"]
+
+
+class User(Resource):
+ action = ForcedListArgument(default="create")
+ username = ResourceArgument(default=lambda obj: obj.name)
+ comment = ResourceArgument()
+ uid = ResourceArgument()
+ gid = ResourceArgument()
+ groups = ForcedListArgument(default=[]) # supplementary groups
+ home = ResourceArgument()
+ shell = ResourceArgument(default="/bin/bash")
+ password = ResourceArgument()
+ system = BooleanArgument(default=False)
+
+ actions = Resource.actions + ["create", "remove"]
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/resources/packaging.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/packaging.py b/ambari-common/src/main/python/resource_management/core/resources/packaging.py
new file mode 100644
index 0000000..c2ff20e
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/resources/packaging.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["Package"]
+
+from resource_management.core.base import Resource, ForcedListArgument, ResourceArgument
+
+
+class Package(Resource):
+ action = ForcedListArgument(default="install")
+ package_name = ResourceArgument(default=lambda obj: obj.name)
+ location = ResourceArgument(default=lambda obj: obj.package_name)
+ version = ResourceArgument()
+ actions = ["install", "upgrade", "remove"]
+ build_vars = ForcedListArgument(default=[])
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/resources/service.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/service.py b/ambari-common/src/main/python/resource_management/core/resources/service.py
new file mode 100644
index 0000000..20d5c1b
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/resources/service.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["Service"]
+
+from resource_management.core.base import Resource, ResourceArgument, ForcedListArgument
+
+
+class Service(Resource):
+ action = ForcedListArgument(default="start")
+ service_name = ResourceArgument(default=lambda obj: obj.name)
+ #enabled = ResourceArgument() # Maybe add support to put in/out autostart.
+ start_command = ResourceArgument()
+ stop_command = ResourceArgument()
+ restart_command = ResourceArgument()
+ reload_command = ResourceArgument() # reload the config file without interrupting pending operations
+ status_command = ResourceArgument()
+
+ actions = ["nothing", "start", "stop", "restart", "reload"]
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/resources/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/system.py b/ambari-common/src/main/python/resource_management/core/resources/system.py
new file mode 100644
index 0000000..0952c48
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/resources/system.py
@@ -0,0 +1,128 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["File", "Directory", "Link", "Execute", "ExecuteScript", "Mount"]
+
+from resource_management.core.base import Resource, ForcedListArgument, ResourceArgument, BooleanArgument
+
+
+class File(Resource):
+ action = ForcedListArgument(default="create")
+ path = ResourceArgument(default=lambda obj: obj.name)
+ backup = ResourceArgument()
+ mode = ResourceArgument()
+ owner = ResourceArgument()
+ group = ResourceArgument()
+ content = ResourceArgument()
+ # whether to replace files with different content
+ replace = ResourceArgument(default=True)
+
+ actions = Resource.actions + ["create", "delete"]
+
+
+class Directory(Resource):
+ action = ForcedListArgument(default="create")
+ path = ResourceArgument(default=lambda obj: obj.name)
+ mode = ResourceArgument()
+ owner = ResourceArgument()
+ group = ResourceArgument()
+ recursive = BooleanArgument(default=False) # this work for 'create', 'delete' is anyway recursive
+
+ actions = Resource.actions + ["create", "delete"]
+
+
+class Link(Resource):
+ action = ForcedListArgument(default="create")
+ path = ResourceArgument(default=lambda obj: obj.name)
+ to = ResourceArgument(required=True)
+ hard = BooleanArgument(default=False)
+
+ actions = Resource.actions + ["create", "delete"]
+
+
+class Execute(Resource):
+ action = ForcedListArgument(default="run")
+
+ """
+ Recommended:
+ command = ('rm','-f','myfile')
+ Not recommended:
+ command = 'rm -f myfile'
+
+ The first one helps to stop escaping issues
+ """
+ command = ResourceArgument(default=lambda obj: obj.name)
+
+ creates = ResourceArgument()
+ cwd = ResourceArgument()
+ # this runs command with a specific env variables, env={'JAVA_HOME': '/usr/jdk'}
+ environment = ResourceArgument()
+ user = ResourceArgument()
+ group = ResourceArgument()
+ returns = ForcedListArgument(default=0)
+ tries = ResourceArgument(default=1)
+ try_sleep = ResourceArgument(default=0) # seconds
+ path = ForcedListArgument(default=[])
+ actions = Resource.actions + ["run"]
+ logoutput = BooleanArgument(default=False)
+ """
+ if on_timeout is not set leads to failing after x seconds,
+ otherwise calls on_timeout
+ """
+ timeout = ResourceArgument() # seconds
+ on_timeout = ResourceArgument()
+ """
+ Wait for command to finish or not.
+
+ NOTE:
+ In case of False, since any command results are skipped, it disables some functionality:
+ - non-zero return code failure
+ - logoutput
+ - tries
+ - try_sleep
+ """
+ wait_for_finish = BooleanArgument(default=True)
+
+
+class ExecuteScript(Resource):
+ action = ForcedListArgument(default="run")
+ code = ResourceArgument(required=True)
+ cwd = ResourceArgument()
+ environment = ResourceArgument()
+ interpreter = ResourceArgument(default="/bin/bash")
+ user = ResourceArgument()
+ group = ResourceArgument()
+
+ actions = Resource.actions + ["run"]
+
+
+class Mount(Resource):
+ action = ForcedListArgument(default="mount")
+ mount_point = ResourceArgument(default=lambda obj: obj.name)
+ device = ResourceArgument()
+ fstype = ResourceArgument()
+ options = ResourceArgument(default=["defaults"])
+ dump = ResourceArgument(default=0)
+ passno = ResourceArgument(default=2)
+
+ actions = Resource.actions + ["mount", "umount", "remount", "enable",
+ "disable"]
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/shell.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/shell.py b/ambari-common/src/main/python/resource_management/core/shell.py
new file mode 100644
index 0000000..80e2a38
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/shell.py
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["checked_call", "call", "quote_bash_args"]
+
+import string
+import subprocess
+import threading
+from multiprocessing import Queue
+from exceptions import Fail
+from exceptions import ExecuteTimeoutException
+from resource_management.core.logger import Logger
+
+def checked_call(command, logoutput=False,
+ cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None):
+ return _call(command, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout)
+
+def call(command, logoutput=False,
+ cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None):
+ return _call(command, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout)
+
+def _call(command, logoutput=False, throw_on_failure=True,
+ cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None):
+ """
+ Execute shell command
+
+ @param command: list/tuple of arguments (recommended as more safe - don't need to escape)
+ or string of the command to execute
+ @param logoutput: boolean, whether command output should be logged of not
+ @param throw_on_failure: if true, when return code is not zero exception is thrown
+
+ @return: retrun_code, stdout
+ """
+ # convert to string and escape
+ if isinstance(command, (list, tuple)):
+ command = ' '.join(quote_bash_args(x) for x in command)
+
+ if user:
+ command = ["su", "-", user, "-c", command]
+ else:
+ command = ["/bin/bash","--login","-c", command]
+
+ proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ cwd=cwd, env=env, shell=False,
+ preexec_fn=preexec_fn)
+
+ if not wait_for_finish:
+ return None, None
+
+ if timeout:
+ q = Queue()
+ t = threading.Timer( timeout, on_timeout, [proc, q] )
+ t.start()
+
+ out = proc.communicate()[0].strip('\n')
+
+ if timeout:
+ if q.empty():
+ t.cancel()
+ # timeout occurred
+ else:
+ raise ExecuteTimeoutException()
+
+ code = proc.returncode
+
+ if logoutput and out:
+ Logger.info(out)
+
+ if throw_on_failure and code:
+ err_msg = Logger.get_protected_text(("Execution of '%s' returned %d. %s") % (command[-1], code, out))
+ raise Fail(err_msg)
+
+ return code, out
+
+def on_timeout(proc, q):
+ q.put(True)
+ if proc.poll() == None:
+ try:
+ proc.terminate()
+ except:
+ pass
+
+def quote_bash_args(command):
+ if not command:
+ return "''"
+ valid = set(string.ascii_letters + string.digits + '@%_-+=:,./')
+ for char in command:
+ if char not in valid:
+ return "'" + command.replace("'", "'\"'\"'") + "'"
+ return command
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/55bbcae4/ambari-common/src/main/python/resource_management/core/source.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/source.py b/ambari-common/src/main/python/resource_management/core/source.py
new file mode 100644
index 0000000..c2a4c24
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/source.py
@@ -0,0 +1,171 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from __future__ import with_statement
+from resource_management.core.environment import Environment
+from resource_management.core.utils import checked_unite
+
+__all__ = ["Source", "Template", "InlineTemplate", "StaticFile", "DownloadSource"]
+
+import hashlib
+import os
+import urllib2
+import urlparse
+
+
+class Source(object):
+ def __init__(self, name):
+ self.env = Environment.get_instance()
+ self.name = name
+
+ def get_content(self):
+ raise NotImplementedError()
+
+ def get_checksum(self):
+ return None
+
+ def __call__(self):
+ return self.get_content()
+
+ def __repr__(self):
+ return self.__class__.__name__+"('"+self.name+"')"
+
+ def __eq__(self, other):
+ return (isinstance(other, self.__class__)
+ and self.get_content() == other.get_content())
+
+
+class StaticFile(Source):
+ def __init__(self, name):
+ super(StaticFile, self).__init__(name)
+
+ def get_content(self):
+ # absolute path
+ if self.name.startswith(os.path.sep):
+ path = self.name
+ # relative path
+ else:
+ basedir = self.env.config.basedir
+ path = os.path.join(basedir, "files", self.name)
+
+ with open(path, "rb") as fp:
+ return fp.read()
+
+
+try:
+ from jinja2 import Environment as JinjaEnvironment, BaseLoader, TemplateNotFound, FunctionLoader, StrictUndefined
+except ImportError:
+ class Template(Source):
+ def __init__(self, name, variables=None, env=None):
+ raise Exception("Jinja2 required for Template/InlineTemplate")
+
+ class InlineTemplate(Source):
+ def __init__(self, name, variables=None, env=None):
+ raise Exception("Jinja2 required for Template/InlineTemplate")
+else:
+ class TemplateLoader(BaseLoader):
+ def __init__(self, env=None):
+ self.env = env or Environment.get_instance()
+
+ def get_source(self, environment, template_name):
+ # absolute path
+ if template_name.startswith(os.path.sep):
+ path = template_name
+ # relative path
+ else:
+ basedir = self.env.config.basedir
+ path = os.path.join(basedir, "templates", template_name)
+
+ if not os.path.exists(path):
+ raise TemplateNotFound("%s at %s" % (template_name, path))
+ mtime = os.path.getmtime(path)
+ with open(path, "rb") as fp:
+ source = fp.read().decode('utf-8')
+ return source, path, lambda: mtime == os.path.getmtime(path)
+
+ class Template(Source):
+ def __init__(self, name, extra_imports=[], **kwargs):
+ """
+ @param kwargs: Additional variables passed to template
+ """
+ super(Template, self).__init__(name)
+ params = self.env.config.params
+ variables = checked_unite(params, kwargs)
+ self.imports_dict = dict((module.__name__, module) for module in extra_imports)
+ self.context = variables.copy() if variables else {}
+ if not hasattr(self, 'template_env'):
+ self.template_env = JinjaEnvironment(loader=TemplateLoader(self.env),
+ autoescape=False, undefined=StrictUndefined, trim_blocks=True)
+
+ self.template = self.template_env.get_template(self.name)
+
+ def get_content(self):
+ default_variables = { 'env':self.env, 'repr':repr, 'str':str, 'bool':bool }
+ variables = checked_unite(default_variables, self.imports_dict)
+ self.context.update(variables)
+
+ rendered = self.template.render(self.context)
+ return rendered + "\n" if not rendered.endswith('\n') else rendered
+
+ class InlineTemplate(Template):
+ def __init__(self, name, extra_imports=[], **kwargs):
+ self.template_env = JinjaEnvironment(loader=FunctionLoader(lambda text: text))
+ super(InlineTemplate, self).__init__(name, extra_imports, **kwargs)
+
+ def __repr__(self):
+ return "InlineTemplate(...)"
+
+
+class DownloadSource(Source):
+ def __init__(self, name, cache=True, md5sum=None):
+ super(DownloadSource, self).__init__(name)
+ self.url = self.name
+ self.md5sum = md5sum
+ self.cache = cache
+ if not 'download_path' in self.env.config:
+ self.env.config.download_path = '/var/tmp/downloads'
+ if not os.path.exists(self.env.config.download_path):
+ os.makedirs(self.env.config.download_path)
+
+ def get_content(self):
+ filepath = os.path.basename(urlparse.urlparse(self.url).path)
+ content = None
+ if not self.cache or not os.path.exists(
+ os.path.join(self.env.config.download_path, filepath)):
+ web_file = urllib2.urlopen(self.url)
+ content = web_file.read()
+ else:
+ update = False
+ with open(os.path.join(self.env.config.download_path, filepath)) as fp:
+ content = fp.read()
+ if self.md5sum:
+ m = hashlib.md5(content)
+ md5 = m.hexdigest()
+ if md5 != self.md5sum:
+ web_file = urllib2.urlopen(self.url)
+ content = web_file.read()
+ update = True
+ if self.cache and update:
+ with open(os.path.join(self.env.config.download_path, filepath),
+ 'w') as fp:
+ fp.write(content)
+ return content