You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2014/08/18 17:33:07 UTC
[1/2] AMBARI-6887. Alerts: groundwork for alert collection (ncole)
Repository: ambari
Updated Branches:
refs/heads/branch-alerts-dev 26b162af8 -> 9e529eddc
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/simple.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/simple.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/simple.py
new file mode 100644
index 0000000..ea61b3f
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/simple.py
@@ -0,0 +1,17 @@
+from apscheduler.util import convert_to_datetime
+
+
+class SimpleTrigger(object):
+ def __init__(self, run_date):
+ self.run_date = convert_to_datetime(run_date)
+
+ def get_next_fire_time(self, start_date):
+ if self.run_date >= start_date:
+ return self.run_date
+
+ def __str__(self):
+ return 'date[%s]' % str(self.run_date)
+
+ def __repr__(self):
+ return '<%s (run_date=%s)>' % (
+ self.__class__.__name__, repr(self.run_date))
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/util.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/util.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/util.py
new file mode 100644
index 0000000..dcede4c
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/util.py
@@ -0,0 +1,230 @@
+"""
+This module contains several handy functions primarily meant for internal use.
+"""
+
+from datetime import date, datetime, timedelta
+from time import mktime
+import re
+import sys
+
+__all__ = ('asint', 'asbool', 'convert_to_datetime', 'timedelta_seconds',
+ 'time_difference', 'datetime_ceil', 'combine_opts',
+ 'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref',
+ 'to_unicode', 'iteritems', 'itervalues', 'xrange')
+
+
+def asint(text):
+ """
+ Safely converts a string to an integer, returning None if the string
+ is None.
+
+ :type text: str
+ :rtype: int
+ """
+ if text is not None:
+ return int(text)
+
+
+def asbool(obj):
+ """
+ Interprets an object as a boolean value.
+
+ :rtype: bool
+ """
+ if isinstance(obj, str):
+ obj = obj.strip().lower()
+ if obj in ('true', 'yes', 'on', 'y', 't', '1'):
+ return True
+ if obj in ('false', 'no', 'off', 'n', 'f', '0'):
+ return False
+ raise ValueError('Unable to interpret value "%s" as boolean' % obj)
+ return bool(obj)
+
+
+_DATE_REGEX = re.compile(
+ r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})'
+ r'(?: (?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})'
+ r'(?:\.(?P<microsecond>\d{1,6}))?)?')
+
+
+def convert_to_datetime(input):
+ """
+ Converts the given object to a datetime object, if possible.
+ If an actual datetime object is passed, it is returned unmodified.
+ If the input is a string, it is parsed as a datetime.
+
+ Date strings are accepted in three different forms: date only (Y-m-d),
+ date with time (Y-m-d H:M:S) or with date+time with microseconds
+ (Y-m-d H:M:S.micro).
+
+ :rtype: datetime
+ """
+ if isinstance(input, datetime):
+ return input
+ elif isinstance(input, date):
+ return datetime.fromordinal(input.toordinal())
+ elif isinstance(input, basestring):
+ m = _DATE_REGEX.match(input)
+ if not m:
+ raise ValueError('Invalid date string')
+ values = [(k, int(v or 0)) for k, v in m.groupdict().items()]
+ values = dict(values)
+ return datetime(**values)
+ raise TypeError('Unsupported input type: %s' % type(input))
+
+
+def timedelta_seconds(delta):
+ """
+ Converts the given timedelta to seconds.
+
+ :type delta: timedelta
+ :rtype: float
+ """
+ return delta.days * 24 * 60 * 60 + delta.seconds + \
+ delta.microseconds / 1000000.0
+
+
+def time_difference(date1, date2):
+ """
+ Returns the time difference in seconds between the given two
+ datetime objects. The difference is calculated as: date1 - date2.
+
+ :param date1: the later datetime
+ :type date1: datetime
+ :param date2: the earlier datetime
+ :type date2: datetime
+ :rtype: float
+ """
+ later = mktime(date1.timetuple()) + date1.microsecond / 1000000.0
+ earlier = mktime(date2.timetuple()) + date2.microsecond / 1000000.0
+ return later - earlier
+
+
+def datetime_ceil(dateval):
+ """
+ Rounds the given datetime object upwards.
+
+ :type dateval: datetime
+ """
+ if dateval.microsecond > 0:
+ return dateval + timedelta(seconds=1,
+ microseconds=-dateval.microsecond)
+ return dateval
+
+
+def combine_opts(global_config, prefix, local_config={}):
+ """
+ Returns a subdictionary from keys and values of ``global_config`` where
+ the key starts with the given prefix, combined with options from
+ local_config. The keys in the subdictionary have the prefix removed.
+
+ :type global_config: dict
+ :type prefix: str
+ :type local_config: dict
+ :rtype: dict
+ """
+ prefixlen = len(prefix)
+ subconf = {}
+ for key, value in global_config.items():
+ if key.startswith(prefix):
+ key = key[prefixlen:]
+ subconf[key] = value
+ subconf.update(local_config)
+ return subconf
+
+
+def get_callable_name(func):
+ """
+ Returns the best available display name for the given function/callable.
+ """
+ f_self = getattr(func, '__self__', None) or getattr(func, 'im_self', None)
+
+ if f_self and hasattr(func, '__name__'):
+ if isinstance(f_self, type):
+ # class method
+ clsname = getattr(f_self, '__qualname__', None) or f_self.__name__
+ return '%s.%s' % (clsname, func.__name__)
+ # bound method
+ return '%s.%s' % (f_self.__class__.__name__, func.__name__)
+
+ if hasattr(func, '__call__'):
+ if hasattr(func, '__name__'):
+ # function, unbound method or a class with a __call__ method
+ return func.__name__
+ # instance of a class with a __call__ method
+ return func.__class__.__name__
+
+ raise TypeError('Unable to determine a name for %s -- '
+ 'maybe it is not a callable?' % repr(func))
+
+
+def obj_to_ref(obj):
+ """
+ Returns the path to the given object.
+ """
+ ref = '%s:%s' % (obj.__module__, get_callable_name(obj))
+ try:
+ obj2 = ref_to_obj(ref)
+ if obj != obj2:
+ raise ValueError
+ except Exception:
+ raise ValueError('Cannot determine the reference to %s' % repr(obj))
+
+ return ref
+
+
+def ref_to_obj(ref):
+ """
+ Returns the object pointed to by ``ref``.
+ """
+ if not isinstance(ref, basestring):
+ raise TypeError('References must be strings')
+ if not ':' in ref:
+ raise ValueError('Invalid reference')
+
+ modulename, rest = ref.split(':', 1)
+ try:
+ obj = __import__(modulename)
+ except ImportError:
+ raise LookupError('Error resolving reference %s: '
+ 'could not import module' % ref)
+
+ try:
+ for name in modulename.split('.')[1:] + rest.split('.'):
+ obj = getattr(obj, name)
+ return obj
+ except Exception:
+ raise LookupError('Error resolving reference %s: '
+ 'error looking up object' % ref)
+
+
+def maybe_ref(ref):
+ """
+ Returns the object that the given reference points to, if it is indeed
+ a reference. If it is not a reference, the object is returned as-is.
+ """
+ if not isinstance(ref, str):
+ return ref
+ return ref_to_obj(ref)
+
+
+def to_unicode(string, encoding='ascii'):
+ """
+ Safely converts a string to a unicode representation on any
+ Python version.
+ """
+ if hasattr(string, 'decode'):
+ return string.decode(encoding, 'ignore')
+ return string # pragma: nocover
+
+
+if sys.version_info < (3, 0): # pragma: nocover
+ iteritems = lambda d: d.iteritems()
+ itervalues = lambda d: d.itervalues()
+ xrange = xrange
+ basestring = basestring
+else: # pragma: nocover
+ iteritems = lambda d: d.items()
+ itervalues = lambda d: d.values()
+ xrange = range
+ basestring = str
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
new file mode 100644
index 0000000..51c3af9
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import sys
+from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
+from ambari_agent.apscheduler.scheduler import Scheduler
+from ambari_agent.alerts.port_alert import PortAlert
+from mock.mock import patch
+from unittest import TestCase
+
+class TestAlerts(TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ sys.stdout == sys.__stdout__
+
+ @patch.object(Scheduler, "add_interval_job")
+ def test_build(self, aps_add_interval_job_mock):
+ test_file_path = os.path.join('ambari_agent', 'dummy_files', 'alert_definitions.json')
+
+ ash = AlertSchedulerHandler(test_file_path)
+
+ self.assertTrue(aps_add_interval_job_mock.called)
+
+ def test_port_alert(self):
+ json = { "name": "namenode_process",
+ "service": "HDFS",
+ "component": "NAMENODE",
+ "label": "NameNode process",
+ "interval": 6,
+ "scope": "host",
+ "source": {
+ "type": "PORT",
+ "uri": "http://c6401.ambari.apache.org:50070",
+ "default_port": 50070,
+ "reporting": {
+ "ok": {
+ "text": "TCP OK - {0:.4f} response time on port {1}"
+ },
+ "critical": {
+ "text": "Could not load process info: {0}"
+ }
+ }
+ }
+ }
+
+ pa = PortAlert(json, json['source'])
+ self.assertEquals(6, pa.interval())
+
+ res = pa.collect()
+
+ pass
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json
new file mode 100644
index 0000000..6c55966
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json
@@ -0,0 +1,46 @@
+{
+ "c1": [
+ {
+ "name": "namenode_cpu",
+ "label": "NameNode host CPU Utilization",
+ "scope": "host",
+ "source": {
+ "type": "METRIC",
+ "jmx": "java.lang:type=OperatingSystem/SystemCpuLoad",
+ "host": "{{hdfs-site/dfs.namenode.secondary.http-address}}"
+ }
+ },
+ {
+ "name": "namenode_process",
+ "service": "HDFS",
+ "component": "NAMENODE",
+ "label": "NameNode process",
+ "interval": 6,
+ "scope": "host",
+ "source": {
+ "type": "PORT",
+ "uri": "http://c6401.ambari.apache.org:50070",
+ "default_port": 50070,
+ "reporting": {
+ "ok": {
+ "text": "TCP OK - {0:.4f} response time on port {1}"
+ },
+ "critical": {
+ "text": "Could not load process info: {0}"
+ }
+ }
+ }
+ },
+ {
+ "name": "hdfs_last_checkpoint",
+ "label": "Last Checkpoint Time",
+ "interval": 1,
+ "scope": "service",
+ "enabled": false,
+ "source": {
+ "type": "SCRIPT",
+ "path": "scripts/alerts/last_checkpoint.py"
+ }
+ }
+ ]
+}
[2/2] git commit: AMBARI-6887. Alerts: groundwork for alert
collection (ncole)
Posted by nc...@apache.org.
AMBARI-6887. Alerts: groundwork for alert collection (ncole)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9e529edd
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9e529edd
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9e529edd
Branch: refs/heads/branch-alerts-dev
Commit: 9e529eddcf0423293046f6c2d85bc59f66b52c6a
Parents: 26b162a
Author: Nate Cole <nc...@hortonworks.com>
Authored: Sun Aug 17 19:26:50 2014 -0400
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Mon Aug 18 11:10:59 2014 -0400
----------------------------------------------------------------------
ambari-agent/pom.xml | 1 +
.../ambari_agent/AlertSchedulerHandler.py | 127 ++++
.../src/main/python/ambari_agent/Controller.py | 12 +-
.../main/python/ambari_agent/alerts/__init__.py | 18 +
.../python/ambari_agent/alerts/base_alert.py | 72 +++
.../python/ambari_agent/alerts/port_alert.py | 96 +++
.../python/ambari_agent/apscheduler/__init__.py | 3 +
.../python/ambari_agent/apscheduler/events.py | 64 ++
.../main/python/ambari_agent/apscheduler/job.py | 137 +++++
.../apscheduler/jobstores/__init__.py | 0
.../ambari_agent/apscheduler/jobstores/base.py | 25 +
.../apscheduler/jobstores/mongodb_store.py | 84 +++
.../apscheduler/jobstores/ram_store.py | 25 +
.../apscheduler/jobstores/redis_store.py | 91 +++
.../apscheduler/jobstores/shelve_store.py | 74 +++
.../apscheduler/jobstores/sqlalchemy_store.py | 91 +++
.../ambari_agent/apscheduler/scheduler.py | 607 +++++++++++++++++++
.../ambari_agent/apscheduler/threadpool.py | 133 ++++
.../apscheduler/triggers/__init__.py | 3 +
.../apscheduler/triggers/cron/__init__.py | 144 +++++
.../apscheduler/triggers/cron/expressions.py | 194 ++++++
.../apscheduler/triggers/cron/fields.py | 100 +++
.../apscheduler/triggers/interval.py | 39 ++
.../ambari_agent/apscheduler/triggers/simple.py | 17 +
.../python/ambari_agent/apscheduler/util.py | 230 +++++++
.../src/test/python/ambari_agent/TestAlerts.py | 73 +++
.../dummy_files/alert_definitions.json | 46 ++
27 files changed, 2505 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml
index c98af34..2475b22 100644
--- a/ambari-agent/pom.xml
+++ b/ambari-agent/pom.xml
@@ -624,6 +624,7 @@
<exclude>src/test/python/ambari_agent/dummy_files/*</exclude>
<exclude>src/test/python/ambari_agent/dummy*.txt</exclude>
<exclude>src/main/python/ambari_agent/imports.txt</exclude>
+ <exclude>src/main/python/ambari_agent/apscheduler/**</exclude>
<exclude>**/*.erb</exclude>
<exclude>**/*.json</exclude>
<exclude>**/*.pydevproject</exclude>
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
new file mode 100644
index 0000000..cd0605f
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+'''
+http://apscheduler.readthedocs.org/en/v2.1.2
+'''
+from apscheduler.scheduler import Scheduler
+from alerts.port_alert import PortAlert
+import json
+import logging
+import sys
+import time
+
+logger = logging.getLogger()
+
+class AlertSchedulerHandler():
+
+ def __init__(self, filename, in_minutes=True):
+ self.filename = filename
+
+ config = {
+ 'threadpool.core_threads': 3,
+ 'coalesce': True,
+ 'standalone': False
+ }
+
+ self.scheduler = Scheduler(config)
+
+ alert_callables = self.__load_alerts()
+
+ for _callable in alert_callables:
+ if in_minutes:
+ self.scheduler.add_interval_job(self.__make_function(_callable),
+ minutes=_callable.interval())
+ else:
+ self.scheduler.add_interval_job(self.__make_function(_callable),
+ seconds=_callable.interval())
+
+ def __make_function(self, alert_def):
+ return lambda: alert_def.collect()
+
+ def start(self):
+ if not self.scheduler is None:
+ self.scheduler.start()
+
+ def stop(self):
+ if not self.scheduler is None:
+ self.scheduler.shutdown(wait=False)
+ self.scheduler = None
+
+ def __load_alerts(self):
+ definitions = []
+ try:
+ # FIXME make location configurable
+ with open(self.filename) as fp:
+ cluster_defs = json.load(fp)
+ for deflist in cluster_defs.values():
+ for definition in deflist:
+ obj = self.__json_to_callable(definition)
+ if obj is not None:
+ definitions.append(obj)
+ except:
+ import traceback
+ traceback.print_exc()
+ pass
+ return definitions
+
+ def __json_to_callable(self, json_definition):
+ source = json_definition['source']
+ source_type = source.get('type', '')
+
+ alert = None
+
+ if source_type == 'METRIC':
+ pass
+ elif source_type == 'PORT':
+ alert = PortAlert(json_definition, source)
+ elif type == 'SCRIPT':
+ pass
+
+ return alert
+
+ def __json_to_meta(self, json_definition):
+ pass
+
+def main():
+ args = list(sys.argv)
+ del args[0]
+
+ try:
+ logger.setLevel(logger.debug)
+ except TypeError:
+ logger.setLevel(12)
+
+ ash = AlertSchedulerHandler(args[0], False)
+ ash.start()
+
+ i = 0
+ try:
+ while i < 10:
+ time.sleep(1)
+ i += 1
+ except KeyboardInterrupt:
+ pass
+ ash.stop()
+
+if __name__ == "__main__":
+ main()
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 87af939..3be54c2 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -39,7 +39,7 @@ import security
from NetUtil import NetUtil
import ssl
from LiveStatus import LiveStatus
-
+from AlertSchedulerHandler import AlertSchedulerHandler
logger = logging.getLogger()
@@ -73,6 +73,14 @@ class Controller(threading.Thread):
self.heartbeat_wait_event = threading.Event()
# List of callbacks that are called at agent registration
self.registration_listeners = []
+
+ # pull config directory out of config
+ cache_dir = config.get('agent', 'cache_dir')
+ if cache_dir is None:
+ cache_dir = '/var/lib/ambari-agent/cache'
+
+ self.alert_scheduler_handler = AlertSchedulerHandler(
+ os.path.join(cache_dir, 'alerts', 'alert_definitions.json'))
def __del__(self):
@@ -317,6 +325,8 @@ class Controller(threading.Thread):
message = registerResponse['response']
logger.info("Registration response from %s was %s", self.serverHostname, message)
+ self.alert_scheduler_handler.start()
+
if self.isRegistered:
# Clearing command queue to stop executing "stale" commands
# after registration
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py b/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py
new file mode 100644
index 0000000..0a0e1ca
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py
@@ -0,0 +1,18 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
new file mode 100644
index 0000000..e102d56
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+
+logger = logging.getLogger()
+
+class BaseAlert(object):
+ RESULT_OK = 'OK'
+ RESULT_WARNING = 'WARNING'
+ RESULT_CRITICAL = 'CRITICAL'
+ RESULT_UNKNOWN = 'UNKNOWN'
+
+ def __init__(self, alert_meta, alert_source_meta):
+ self.alert_meta = alert_meta
+ self.alert_source_meta = alert_source_meta
+
+ def interval(self):
+ if not self.alert_meta.has_key('interval'):
+ return 1
+ else:
+ return self.alert_meta['interval']
+
+ def collect(self):
+ res = (BaseAlert.RESULT_UNKNOWN, [])
+ try:
+ res = self._collect()
+ except Exception as e:
+ res = (BaseAlert.RESULT_CRITICAL, [str(e)])
+
+ res_base_text = self.alert_source_meta['reporting'][res[0].lower()]['text']
+
+ data = {}
+ data['name'] = self._find_value('name')
+ data['state'] = res[0]
+ data['text'] = res_base_text.format(*res[1])
+ # data['cluster'] = self._find_value('cluster') # not sure how to get this yet
+ data['service'] = self._find_value('service')
+ data['component'] = self._find_value('component')
+
+ print str(data)
+
+ def _find_value(self, meta_key):
+ if self.alert_meta.has_key(meta_key):
+ return self.alert_meta[meta_key]
+ else:
+ return None
+
+ def _collect(self):
+ '''
+ Low level function to collect alert data. The result is a tuple as:
+ res[0] = the result code
+ res[1] = the list of arguments supplied to the reporting text for the result code
+ '''
+ raise NotImplementedError
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
new file mode 100644
index 0000000..165f890
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import re
+import socket
+import time
+import traceback
+from alerts.base_alert import BaseAlert
+from resource_management.libraries.functions.get_port_from_url import get_port_from_url
+
+logger = logging.getLogger()
+
+class PortAlert(BaseAlert):
+
+ def __init__(self, alert_meta, alert_source_meta):
+ super(PortAlert, self).__init__(alert_meta, alert_source_meta)
+
+ default_port = alert_source_meta['default_port']
+ uri = alert_source_meta['uri']
+
+ self.port = default_port
+ self.host = get_host_from_url(uri)
+
+ try:
+ self.port = int(get_port_from_url(uri))
+ except:
+ traceback.print_exc()
+ pass
+
+
+ def _collect(self):
+ s = None
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.settimeout(1.5)
+ t = time.time()
+ s.connect((self.host, self.port))
+ millis = time.time() - t
+ return (self.RESULT_OK, [millis/1000, self.port])
+ finally:
+ if s is not None:
+ try:
+ s.close()
+ except:
+ pass
+
+'''
+See RFC3986, Appendix B
+Tested on the following cases:
+ "192.168.54.1"
+ "192.168.54.2:7661
+ "hdfs://192.168.54.3/foo/bar"
+ "ftp://192.168.54.4:7842/foo/bar"
+'''
+def get_host_from_url(uri):
+ # RFC3986, Appendix B
+ parts = re.findall('^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?' , uri)
+
+ # index of parts
+ # scheme = 1
+ # authority = 3
+ # path = 4
+ # query = 6
+ # fragment = 8
+
+ host_and_port = uri
+ if 0 == len(parts[0][1]):
+ host_and_port = parts[0][4]
+ elif 0 == len(parts[0][2]):
+ host_and_port = parts[0][1]
+ elif parts[0][2].startswith("//"):
+ host_and_port = parts[0][3]
+
+ if -1 == host_and_port.find(':'):
+ return host_and_port
+ else:
+ return host_and_port.split(':')[0]
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py
new file mode 100644
index 0000000..71cc53d
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py
@@ -0,0 +1,3 @@
+version_info = (2, 1, 2)
+version = '.'.join(str(n) for n in version_info[:3])
+release = '.'.join(str(n) for n in version_info)
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py
new file mode 100644
index 0000000..80bde8e
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py
@@ -0,0 +1,64 @@
+__all__ = ('EVENT_SCHEDULER_START', 'EVENT_SCHEDULER_SHUTDOWN',
+ 'EVENT_JOBSTORE_ADDED', 'EVENT_JOBSTORE_REMOVED',
+ 'EVENT_JOBSTORE_JOB_ADDED', 'EVENT_JOBSTORE_JOB_REMOVED',
+ 'EVENT_JOB_EXECUTED', 'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED',
+ 'EVENT_ALL', 'SchedulerEvent', 'JobStoreEvent', 'JobEvent')
+
+
+EVENT_SCHEDULER_START = 1 # The scheduler was started
+EVENT_SCHEDULER_SHUTDOWN = 2 # The scheduler was shut down
+EVENT_JOBSTORE_ADDED = 4 # A job store was added to the scheduler
+EVENT_JOBSTORE_REMOVED = 8 # A job store was removed from the scheduler
+EVENT_JOBSTORE_JOB_ADDED = 16 # A job was added to a job store
+EVENT_JOBSTORE_JOB_REMOVED = 32 # A job was removed from a job store
+EVENT_JOB_EXECUTED = 64 # A job was executed successfully
+EVENT_JOB_ERROR = 128 # A job raised an exception during execution
+EVENT_JOB_MISSED = 256 # A job's execution was missed
+EVENT_ALL = (EVENT_SCHEDULER_START | EVENT_SCHEDULER_SHUTDOWN |
+ EVENT_JOBSTORE_ADDED | EVENT_JOBSTORE_REMOVED |
+ EVENT_JOBSTORE_JOB_ADDED | EVENT_JOBSTORE_JOB_REMOVED |
+ EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
+
+
+class SchedulerEvent(object):
+ """
+ An event that concerns the scheduler itself.
+
+ :var code: the type code of this event
+ """
+ def __init__(self, code):
+ self.code = code
+
+
+class JobStoreEvent(SchedulerEvent):
+ """
+ An event that concerns job stores.
+
+ :var alias: the alias of the job store involved
+ :var job: the new job if a job was added
+ """
+ def __init__(self, code, alias, job=None):
+ SchedulerEvent.__init__(self, code)
+ self.alias = alias
+ if job:
+ self.job = job
+
+
+class JobEvent(SchedulerEvent):
+ """
+ An event that concerns the execution of individual jobs.
+
+ :var job: the job instance in question
+ :var scheduled_run_time: the time when the job was scheduled to be run
+ :var retval: the return value of the successfully executed job
+ :var exception: the exception raised by the job
+ :var traceback: the traceback object associated with the exception
+ """
+ def __init__(self, code, job, scheduled_run_time, retval=None,
+ exception=None, traceback=None):
+ SchedulerEvent.__init__(self, code)
+ self.job = job
+ self.scheduled_run_time = scheduled_run_time
+ self.retval = retval
+ self.exception = exception
+ self.traceback = traceback
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py
new file mode 100644
index 0000000..cfc09a2
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py
@@ -0,0 +1,137 @@
+"""
+Jobs represent scheduled tasks.
+"""
+
+from threading import Lock
+from datetime import timedelta
+
+from apscheduler.util import to_unicode, ref_to_obj, get_callable_name,\
+ obj_to_ref
+
+
+class MaxInstancesReachedError(Exception):
+ pass
+
+
+class Job(object):
+ """
+ Encapsulates the actual Job along with its metadata. Job instances
+ are created by the scheduler when adding jobs, and should not be
+ directly instantiated. These options can be set when adding jobs
+ to the scheduler (see :ref:`job_options`).
+
+ :var trigger: trigger that determines the execution times
+ :var func: callable to call when the trigger is triggered
+ :var args: list of positional arguments to call func with
+ :var kwargs: dict of keyword arguments to call func with
+ :var name: name of the job
+ :var misfire_grace_time: seconds after the designated run time that
+ the job is still allowed to be run
+ :var coalesce: run once instead of many times if the scheduler determines
+ that the job should be run more than once in succession
+ :var max_runs: maximum number of times this job is allowed to be
+ triggered
+ :var max_instances: maximum number of concurrently running
+ instances allowed for this job
+ :var runs: number of times this job has been triggered
+ :var instances: number of concurrently running instances of this job
+ """
+ id = None
+ next_run_time = None
+
+ def __init__(self, trigger, func, args, kwargs, misfire_grace_time,
+ coalesce, name=None, max_runs=None, max_instances=1):
+ if not trigger:
+ raise ValueError('The trigger must not be None')
+ if not hasattr(func, '__call__'):
+ raise TypeError('func must be callable')
+ if not hasattr(args, '__getitem__'):
+ raise TypeError('args must be a list-like object')
+ if not hasattr(kwargs, '__getitem__'):
+ raise TypeError('kwargs must be a dict-like object')
+ if misfire_grace_time <= 0:
+ raise ValueError('misfire_grace_time must be a positive value')
+ if max_runs is not None and max_runs <= 0:
+ raise ValueError('max_runs must be a positive value')
+ if max_instances <= 0:
+ raise ValueError('max_instances must be a positive value')
+
+ self._lock = Lock()
+
+ self.trigger = trigger
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+ self.name = to_unicode(name or get_callable_name(func))
+ self.misfire_grace_time = misfire_grace_time
+ self.coalesce = coalesce
+ self.max_runs = max_runs
+ self.max_instances = max_instances
+ self.runs = 0
+ self.instances = 0
+
+ def compute_next_run_time(self, now):
+ if self.runs == self.max_runs:
+ self.next_run_time = None
+ else:
+ self.next_run_time = self.trigger.get_next_fire_time(now)
+
+ return self.next_run_time
+
+ def get_run_times(self, now):
+ """
+ Computes the scheduled run times between ``next_run_time`` and ``now``.
+ """
+ run_times = []
+ run_time = self.next_run_time
+ increment = timedelta(microseconds=1)
+ while ((not self.max_runs or self.runs < self.max_runs) and
+ run_time and run_time <= now):
+ run_times.append(run_time)
+ run_time = self.trigger.get_next_fire_time(run_time + increment)
+
+ return run_times
+
+ def add_instance(self):
+ self._lock.acquire()
+ try:
+ if self.instances == self.max_instances:
+ raise MaxInstancesReachedError
+ self.instances += 1
+ finally:
+ self._lock.release()
+
+ def remove_instance(self):
+ self._lock.acquire()
+ try:
+ assert self.instances > 0, 'Already at 0 instances'
+ self.instances -= 1
+ finally:
+ self._lock.release()
+
+ def __getstate__(self):
+ # Prevents the unwanted pickling of transient or unpicklable variables
+ state = self.__dict__.copy()
+ state.pop('instances', None)
+ state.pop('func', None)
+ state.pop('_lock', None)
+ state['func_ref'] = obj_to_ref(self.func)
+ return state
+
+ def __setstate__(self, state):
+ state['instances'] = 0
+ state['func'] = ref_to_obj(state.pop('func_ref'))
+ state['_lock'] = Lock()
+ self.__dict__ = state
+
+ def __eq__(self, other):
+ if isinstance(other, Job):
+ return self.id is not None and other.id == self.id or self is other
+ return NotImplemented
+
+ def __repr__(self):
+ return '<Job (name=%s, trigger=%s)>' % (self.name, repr(self.trigger))
+
+ def __str__(self):
+ return '%s (trigger: %s, next run at: %s)' % (
+ self.name, str(self.trigger), str(self.next_run_time))
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py
new file mode 100644
index 0000000..f0a16dd
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py
@@ -0,0 +1,25 @@
+"""
+Abstract base class that provides the interface needed by all job stores.
+Job store methods are also documented here.
+"""
+
+
+class JobStore(object):
+ def add_job(self, job):
+ """Adds the given job from this store."""
+ raise NotImplementedError
+
+ def update_job(self, job):
+ """Persists the running state of the given job."""
+ raise NotImplementedError
+
+ def remove_job(self, job):
+ """Removes the given jobs from this store."""
+ raise NotImplementedError
+
+ def load_jobs(self):
+ """Loads jobs from this store into memory."""
+ raise NotImplementedError
+
+ def close(self):
+ """Frees any resources still bound to this job store."""
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py
new file mode 100644
index 0000000..3f522c2
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py
@@ -0,0 +1,84 @@
+"""
+Stores jobs in a MongoDB database.
+"""
+import logging
+
+from apscheduler.jobstores.base import JobStore
+from apscheduler.job import Job
+
+try:
+ import cPickle as pickle
+except ImportError: # pragma: nocover
+ import pickle
+
+try:
+ from bson.binary import Binary
+ from pymongo.connection import Connection
+except ImportError: # pragma: nocover
+ raise ImportError('MongoDBJobStore requires PyMongo installed')
+
+logger = logging.getLogger(__name__)
+
+
+class MongoDBJobStore(JobStore):
+ def __init__(self, database='apscheduler', collection='jobs',
+ connection=None, pickle_protocol=pickle.HIGHEST_PROTOCOL,
+ **connect_args):
+ self.jobs = []
+ self.pickle_protocol = pickle_protocol
+
+ if not database:
+ raise ValueError('The "database" parameter must not be empty')
+ if not collection:
+ raise ValueError('The "collection" parameter must not be empty')
+
+ if connection:
+ self.connection = connection
+ else:
+ self.connection = Connection(**connect_args)
+
+ self.collection = self.connection[database][collection]
+
+ def add_job(self, job):
+ job_dict = job.__getstate__()
+ job_dict['trigger'] = Binary(pickle.dumps(job.trigger,
+ self.pickle_protocol))
+ job_dict['args'] = Binary(pickle.dumps(job.args,
+ self.pickle_protocol))
+ job_dict['kwargs'] = Binary(pickle.dumps(job.kwargs,
+ self.pickle_protocol))
+ job.id = self.collection.insert(job_dict)
+ self.jobs.append(job)
+
+ def remove_job(self, job):
+ self.collection.remove(job.id)
+ self.jobs.remove(job)
+
+ def load_jobs(self):
+ jobs = []
+ for job_dict in self.collection.find():
+ try:
+ job = Job.__new__(Job)
+ job_dict['id'] = job_dict.pop('_id')
+ job_dict['trigger'] = pickle.loads(job_dict['trigger'])
+ job_dict['args'] = pickle.loads(job_dict['args'])
+ job_dict['kwargs'] = pickle.loads(job_dict['kwargs'])
+ job.__setstate__(job_dict)
+ jobs.append(job)
+ except Exception:
+ job_name = job_dict.get('name', '(unknown)')
+ logger.exception('Unable to restore job "%s"', job_name)
+ self.jobs = jobs
+
+ def update_job(self, job):
+ spec = {'_id': job.id}
+ document = {'$set': {'next_run_time': job.next_run_time},
+ '$inc': {'runs': 1}}
+ self.collection.update(spec, document)
+
+ def close(self):
+ self.connection.disconnect()
+
+ def __repr__(self):
+ connection = self.collection.database.connection
+ return '<%s (connection=%s)>' % (self.__class__.__name__, connection)
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py
new file mode 100644
index 0000000..60458fb
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py
@@ -0,0 +1,25 @@
+"""
+Stores jobs in an array in RAM. Provides no persistence support.
+"""
+
+from apscheduler.jobstores.base import JobStore
+
+
+class RAMJobStore(JobStore):
+ def __init__(self):
+ self.jobs = []
+
+ def add_job(self, job):
+ self.jobs.append(job)
+
+ def update_job(self, job):
+ pass
+
+ def remove_job(self, job):
+ self.jobs.remove(job)
+
+ def load_jobs(self):
+ pass
+
+ def __repr__(self):
+ return '<%s>' % (self.__class__.__name__)
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py
new file mode 100644
index 0000000..5eabf4b
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py
@@ -0,0 +1,91 @@
+"""
+Stores jobs in a Redis database.
+"""
+from uuid import uuid4
+from datetime import datetime
+import logging
+
+from apscheduler.jobstores.base import JobStore
+from apscheduler.job import Job
+
+try:
+ import cPickle as pickle
+except ImportError: # pragma: nocover
+ import pickle
+
+try:
+ from redis import StrictRedis
+except ImportError: # pragma: nocover
+ raise ImportError('RedisJobStore requires redis installed')
+
+try:
+ long = long
+except NameError:
+ long = int
+
+logger = logging.getLogger(__name__)
+
+
+class RedisJobStore(JobStore):
+ def __init__(self, db=0, key_prefix='jobs.',
+ pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
+ self.jobs = []
+ self.pickle_protocol = pickle_protocol
+ self.key_prefix = key_prefix
+
+ if db is None:
+ raise ValueError('The "db" parameter must not be empty')
+ if not key_prefix:
+ raise ValueError('The "key_prefix" parameter must not be empty')
+
+ self.redis = StrictRedis(db=db, **connect_args)
+
+ def add_job(self, job):
+ job.id = str(uuid4())
+ job_state = job.__getstate__()
+ job_dict = {
+ 'job_state': pickle.dumps(job_state, self.pickle_protocol),
+ 'runs': '0',
+ 'next_run_time': job_state.pop('next_run_time').isoformat()}
+ self.redis.hmset(self.key_prefix + job.id, job_dict)
+ self.jobs.append(job)
+
+ def remove_job(self, job):
+ self.redis.delete(self.key_prefix + job.id)
+ self.jobs.remove(job)
+
+ def load_jobs(self):
+ jobs = []
+ keys = self.redis.keys(self.key_prefix + '*')
+ pipeline = self.redis.pipeline()
+ for key in keys:
+ pipeline.hgetall(key)
+ results = pipeline.execute()
+
+ for job_dict in results:
+ job_state = {}
+ try:
+ job = Job.__new__(Job)
+ job_state = pickle.loads(job_dict['job_state'.encode()])
+ job_state['runs'] = long(job_dict['runs'.encode()])
+ dateval = job_dict['next_run_time'.encode()].decode()
+ job_state['next_run_time'] = datetime.strptime(
+ dateval, '%Y-%m-%dT%H:%M:%S')
+ job.__setstate__(job_state)
+ jobs.append(job)
+ except Exception:
+ job_name = job_state.get('name', '(unknown)')
+ logger.exception('Unable to restore job "%s"', job_name)
+ self.jobs = jobs
+
+ def update_job(self, job):
+ attrs = {
+ 'next_run_time': job.next_run_time.isoformat(),
+ 'runs': job.runs}
+ self.redis.hmset(self.key_prefix + job.id, attrs)
+
+ def close(self):
+ self.redis.connection_pool.disconnect()
+
+ def __repr__(self):
+ return '<%s>' % self.__class__.__name__
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py
new file mode 100644
index 0000000..d1be58f
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py
@@ -0,0 +1,74 @@
+"""
+Stores jobs in a file governed by the :mod:`shelve` module.
+"""
+
+import shelve
+import pickle
+import random
+import logging
+
+from apscheduler.jobstores.base import JobStore
+from apscheduler.job import Job
+from apscheduler.util import itervalues
+
+logger = logging.getLogger(__name__)
+
+
+class ShelveJobStore(JobStore):
+ MAX_ID = 1000000
+
+ def __init__(self, path, pickle_protocol=pickle.HIGHEST_PROTOCOL):
+ self.jobs = []
+ self.path = path
+ self.pickle_protocol = pickle_protocol
+ self._open_store()
+
+ def _open_store(self):
+ self.store = shelve.open(self.path, 'c', self.pickle_protocol)
+
+ def _generate_id(self):
+ id = None
+ while not id:
+ id = str(random.randint(1, self.MAX_ID))
+ if not id in self.store:
+ return id
+
+ def add_job(self, job):
+ job.id = self._generate_id()
+ self.store[job.id] = job.__getstate__()
+ self.store.close()
+ self._open_store()
+ self.jobs.append(job)
+
+ def update_job(self, job):
+ job_dict = self.store[job.id]
+ job_dict['next_run_time'] = job.next_run_time
+ job_dict['runs'] = job.runs
+ self.store[job.id] = job_dict
+ self.store.close()
+ self._open_store()
+
+ def remove_job(self, job):
+ del self.store[job.id]
+ self.store.close()
+ self._open_store()
+ self.jobs.remove(job)
+
+ def load_jobs(self):
+ jobs = []
+ for job_dict in itervalues(self.store):
+ try:
+ job = Job.__new__(Job)
+ job.__setstate__(job_dict)
+ jobs.append(job)
+ except Exception:
+ job_name = job_dict.get('name', '(unknown)')
+ logger.exception('Unable to restore job "%s"', job_name)
+
+ self.jobs = jobs
+
+ def close(self):
+ self.store.close()
+
+ def __repr__(self):
+ return '<%s (path=%s)>' % (self.__class__.__name__, self.path)
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py
new file mode 100644
index 0000000..5b64a35
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py
@@ -0,0 +1,91 @@
+"""
+Stores jobs in a database table using SQLAlchemy.
+"""
+import pickle
+import logging
+
+import sqlalchemy
+
+from apscheduler.jobstores.base import JobStore
+from apscheduler.job import Job
+
+try:
+ from sqlalchemy import *
+except ImportError: # pragma: nocover
+ raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')
+
+logger = logging.getLogger(__name__)
+
+
+class SQLAlchemyJobStore(JobStore):
+ def __init__(self, url=None, engine=None, tablename='apscheduler_jobs',
+ metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL):
+ self.jobs = []
+ self.pickle_protocol = pickle_protocol
+
+ if engine:
+ self.engine = engine
+ elif url:
+ self.engine = create_engine(url)
+ else:
+ raise ValueError('Need either "engine" or "url" defined')
+
+ if sqlalchemy.__version__ < '0.7':
+ pickle_coltype = PickleType(pickle_protocol, mutable=False)
+ else:
+ pickle_coltype = PickleType(pickle_protocol)
+ self.jobs_t = Table(
+ tablename, metadata or MetaData(),
+ Column('id', Integer,
+ Sequence(tablename + '_id_seq', optional=True),
+ primary_key=True),
+ Column('trigger', pickle_coltype, nullable=False),
+ Column('func_ref', String(1024), nullable=False),
+ Column('args', pickle_coltype, nullable=False),
+ Column('kwargs', pickle_coltype, nullable=False),
+ Column('name', Unicode(1024)),
+ Column('misfire_grace_time', Integer, nullable=False),
+ Column('coalesce', Boolean, nullable=False),
+ Column('max_runs', Integer),
+ Column('max_instances', Integer),
+ Column('next_run_time', DateTime, nullable=False),
+ Column('runs', BigInteger))
+
+ self.jobs_t.create(self.engine, True)
+
+ def add_job(self, job):
+ job_dict = job.__getstate__()
+ result = self.engine.execute(self.jobs_t.insert().values(**job_dict))
+ job.id = result.inserted_primary_key[0]
+ self.jobs.append(job)
+
+ def remove_job(self, job):
+ delete = self.jobs_t.delete().where(self.jobs_t.c.id == job.id)
+ self.engine.execute(delete)
+ self.jobs.remove(job)
+
+ def load_jobs(self):
+ jobs = []
+ for row in self.engine.execute(select([self.jobs_t])):
+ try:
+ job = Job.__new__(Job)
+ job_dict = dict(row.items())
+ job.__setstate__(job_dict)
+ jobs.append(job)
+ except Exception:
+ job_name = job_dict.get('name', '(unknown)')
+ logger.exception('Unable to restore job "%s"', job_name)
+ self.jobs = jobs
+
+ def update_job(self, job):
+ job_dict = job.__getstate__()
+ update = self.jobs_t.update().where(self.jobs_t.c.id == job.id).\
+ values(next_run_time=job_dict['next_run_time'],
+ runs=job_dict['runs'])
+ self.engine.execute(update)
+
+ def close(self):
+ self.engine.dispose()
+
+ def __repr__(self):
+ return '<%s (url=%s)>' % (self.__class__.__name__, self.engine.url)
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py
new file mode 100644
index 0000000..319037a
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py
@@ -0,0 +1,607 @@
+"""
+This module is the main part of the library. It houses the Scheduler class
+and related exceptions.
+"""
+
+from threading import Thread, Event, Lock
+from datetime import datetime, timedelta
+from logging import getLogger
+import os
+import sys
+
+from apscheduler.util import *
+from apscheduler.triggers import SimpleTrigger, IntervalTrigger, CronTrigger
+from apscheduler.jobstores.ram_store import RAMJobStore
+from apscheduler.job import Job, MaxInstancesReachedError
+from apscheduler.events import *
+from apscheduler.threadpool import ThreadPool
+
+logger = getLogger(__name__)
+
+
+class SchedulerAlreadyRunningError(Exception):
+ """
+ Raised when attempting to start or configure the scheduler when it's
+ already running.
+ """
+
+ def __str__(self):
+ return 'Scheduler is already running'
+
+
+class Scheduler(object):
+ """
+ This class is responsible for scheduling jobs and triggering
+ their execution.
+ """
+
+ _stopped = True
+ _thread = None
+
+ def __init__(self, gconfig={}, **options):
+ self._wakeup = Event()
+ self._jobstores = {}
+ self._jobstores_lock = Lock()
+ self._listeners = []
+ self._listeners_lock = Lock()
+ self._pending_jobs = []
+ self.configure(gconfig, **options)
+
+ def configure(self, gconfig={}, **options):
+ """
+ Reconfigures the scheduler with the given options. Can only be done
+ when the scheduler isn't running.
+ """
+ if self.running:
+ raise SchedulerAlreadyRunningError
+
+ # Set general options
+ config = combine_opts(gconfig, 'apscheduler.', options)
+ self.misfire_grace_time = int(config.pop('misfire_grace_time', 1))
+ self.coalesce = asbool(config.pop('coalesce', True))
+ self.daemonic = asbool(config.pop('daemonic', True))
+ self.standalone = asbool(config.pop('standalone', False))
+
+ # Configure the thread pool
+ if 'threadpool' in config:
+ self._threadpool = maybe_ref(config['threadpool'])
+ else:
+ threadpool_opts = combine_opts(config, 'threadpool.')
+ self._threadpool = ThreadPool(**threadpool_opts)
+
+ # Configure job stores
+ jobstore_opts = combine_opts(config, 'jobstore.')
+ jobstores = {}
+ for key, value in jobstore_opts.items():
+ store_name, option = key.split('.', 1)
+ opts_dict = jobstores.setdefault(store_name, {})
+ opts_dict[option] = value
+
+ for alias, opts in jobstores.items():
+ classname = opts.pop('class')
+ cls = maybe_ref(classname)
+ jobstore = cls(**opts)
+ self.add_jobstore(jobstore, alias, True)
+
+ def start(self):
+ """
+ Starts the scheduler in a new thread.
+
+ In threaded mode (the default), this method will return immediately
+ after starting the scheduler thread.
+
+ In standalone mode, this method will block until there are no more
+ scheduled jobs.
+ """
+ if self.running:
+ raise SchedulerAlreadyRunningError
+
+ # Create a RAMJobStore as the default if there is no default job store
+ if not 'default' in self._jobstores:
+ self.add_jobstore(RAMJobStore(), 'default', True)
+
+ # Schedule all pending jobs
+ for job, jobstore in self._pending_jobs:
+ self._real_add_job(job, jobstore, False)
+ del self._pending_jobs[:]
+
+ self._stopped = False
+ if self.standalone:
+ self._main_loop()
+ else:
+ self._thread = Thread(target=self._main_loop, name='APScheduler')
+ self._thread.setDaemon(self.daemonic)
+ self._thread.start()
+
+ def shutdown(self, wait=True, shutdown_threadpool=True,
+ close_jobstores=True):
+ """
+ Shuts down the scheduler and terminates the thread.
+ Does not interrupt any currently running jobs.
+
+ :param wait: ``True`` to wait until all currently executing jobs have
+ finished (if ``shutdown_threadpool`` is also ``True``)
+ :param shutdown_threadpool: ``True`` to shut down the thread pool
+ :param close_jobstores: ``True`` to close all job stores after shutdown
+ """
+ if not self.running:
+ return
+
+ self._stopped = True
+ self._wakeup.set()
+
+ # Shut down the thread pool
+ if shutdown_threadpool:
+ self._threadpool.shutdown(wait)
+
+ # Wait until the scheduler thread terminates
+ if self._thread:
+ self._thread.join()
+
+ # Close all job stores
+ if close_jobstores:
+ for jobstore in itervalues(self._jobstores):
+ jobstore.close()
+
+ @property
+ def running(self):
+ thread_alive = self._thread and self._thread.isAlive()
+ standalone = getattr(self, 'standalone', False)
+ return not self._stopped and (standalone or thread_alive)
+
+ def add_jobstore(self, jobstore, alias, quiet=False):
+ """
+ Adds a job store to this scheduler.
+
+ :param jobstore: job store to be added
+ :param alias: alias for the job store
+ :param quiet: True to suppress scheduler thread wakeup
+ :type jobstore: instance of
+ :class:`~apscheduler.jobstores.base.JobStore`
+ :type alias: str
+ """
+ self._jobstores_lock.acquire()
+ try:
+ if alias in self._jobstores:
+ raise KeyError('Alias "%s" is already in use' % alias)
+ self._jobstores[alias] = jobstore
+ jobstore.load_jobs()
+ finally:
+ self._jobstores_lock.release()
+
+ # Notify listeners that a new job store has been added
+ self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias))
+
+ # Notify the scheduler so it can scan the new job store for jobs
+ if not quiet:
+ self._wakeup.set()
+
+ def remove_jobstore(self, alias, close=True):
+ """
+ Removes the job store by the given alias from this scheduler.
+
+ :param close: ``True`` to close the job store after removing it
+ :type alias: str
+ """
+ self._jobstores_lock.acquire()
+ try:
+ jobstore = self._jobstores.pop(alias)
+ if not jobstore:
+ raise KeyError('No such job store: %s' % alias)
+ finally:
+ self._jobstores_lock.release()
+
+ # Close the job store if requested
+ if close:
+ jobstore.close()
+
+ # Notify listeners that a job store has been removed
+ self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_REMOVED, alias))
+
+ def add_listener(self, callback, mask=EVENT_ALL):
+ """
+ Adds a listener for scheduler events. When a matching event occurs,
+ ``callback`` is executed with the event object as its sole argument.
+ If the ``mask`` parameter is not provided, the callback will receive
+ events of all types.
+
+ :param callback: any callable that takes one argument
+ :param mask: bitmask that indicates which events should be listened to
+ """
+ self._listeners_lock.acquire()
+ try:
+ self._listeners.append((callback, mask))
+ finally:
+ self._listeners_lock.release()
+
+ def remove_listener(self, callback):
+ """
+ Removes a previously added event listener.
+ """
+ self._listeners_lock.acquire()
+ try:
+ for i, (cb, _) in enumerate(self._listeners):
+ if callback == cb:
+ del self._listeners[i]
+ finally:
+ self._listeners_lock.release()
+
+ def _notify_listeners(self, event):
+ self._listeners_lock.acquire()
+ try:
+ listeners = tuple(self._listeners)
+ finally:
+ self._listeners_lock.release()
+
+ for cb, mask in listeners:
+ if event.code & mask:
+ try:
+ cb(event)
+ except:
+ logger.exception('Error notifying listener')
+
+ def _real_add_job(self, job, jobstore, wakeup):
+ job.compute_next_run_time(datetime.now())
+ if not job.next_run_time:
+ raise ValueError('Not adding job since it would never be run')
+
+ self._jobstores_lock.acquire()
+ try:
+ try:
+ store = self._jobstores[jobstore]
+ except KeyError:
+ raise KeyError('No such job store: %s' % jobstore)
+ store.add_job(job)
+ finally:
+ self._jobstores_lock.release()
+
+ # Notify listeners that a new job has been added
+ event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
+ self._notify_listeners(event)
+
+ logger.info('Added job "%s" to job store "%s"', job, jobstore)
+
+ # Notify the scheduler about the new job
+ if wakeup:
+ self._wakeup.set()
+
+ def add_job(self, trigger, func, args, kwargs, jobstore='default',
+ **options):
+ """
+ Adds the given job to the job list and notifies the scheduler thread.
+ Any extra keyword arguments are passed along to the constructor of the
+ :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+
+ :param trigger: trigger that determines when ``func`` is called
+ :param func: callable to run at the given time
+ :param args: list of positional arguments to call func with
+ :param kwargs: dict of keyword arguments to call func with
+ :param jobstore: alias of the job store to store the job in
+ :rtype: :class:`~apscheduler.job.Job`
+ """
+ job = Job(trigger, func, args or [], kwargs or {},
+ options.pop('misfire_grace_time', self.misfire_grace_time),
+ options.pop('coalesce', self.coalesce), **options)
+ if not self.running:
+ self._pending_jobs.append((job, jobstore))
+ logger.info('Adding job tentatively -- it will be properly '
+ 'scheduled when the scheduler starts')
+ else:
+ self._real_add_job(job, jobstore, True)
+ return job
+
+ def _remove_job(self, job, alias, jobstore):
+ jobstore.remove_job(job)
+
+ # Notify listeners that a job has been removed
+ event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job)
+ self._notify_listeners(event)
+
+ logger.info('Removed job "%s"', job)
+
+ def add_date_job(self, func, date, args=None, kwargs=None, **options):
+ """
+ Schedules a job to be completed on a specific date and time.
+ Any extra keyword arguments are passed along to the constructor of the
+ :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+
+ :param func: callable to run at the given time
+ :param date: the date/time to run the job at
+ :param name: name of the job
+ :param jobstore: stored the job in the named (or given) job store
+ :param misfire_grace_time: seconds after the designated run time that
+ the job is still allowed to be run
+ :type date: :class:`datetime.date`
+ :rtype: :class:`~apscheduler.job.Job`
+ """
+ trigger = SimpleTrigger(date)
+ return self.add_job(trigger, func, args, kwargs, **options)
+
+ def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0,
+ seconds=0, start_date=None, args=None, kwargs=None,
+ **options):
+ """
+ Schedules a job to be completed on specified intervals.
+ Any extra keyword arguments are passed along to the constructor of the
+ :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+
+ :param func: callable to run
+ :param weeks: number of weeks to wait
+ :param days: number of days to wait
+ :param hours: number of hours to wait
+ :param minutes: number of minutes to wait
+ :param seconds: number of seconds to wait
+ :param start_date: when to first execute the job and start the
+ counter (default is after the given interval)
+ :param args: list of positional arguments to call func with
+ :param kwargs: dict of keyword arguments to call func with
+ :param name: name of the job
+ :param jobstore: alias of the job store to add the job to
+ :param misfire_grace_time: seconds after the designated run time that
+ the job is still allowed to be run
+ :rtype: :class:`~apscheduler.job.Job`
+ """
+ interval = timedelta(weeks=weeks, days=days, hours=hours,
+ minutes=minutes, seconds=seconds)
+ trigger = IntervalTrigger(interval, start_date)
+ return self.add_job(trigger, func, args, kwargs, **options)
+
+ def add_cron_job(self, func, year=None, month=None, day=None, week=None,
+ day_of_week=None, hour=None, minute=None, second=None,
+ start_date=None, args=None, kwargs=None, **options):
+ """
+ Schedules a job to be completed on times that match the given
+ expressions.
+ Any extra keyword arguments are passed along to the constructor of the
+ :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+
+ :param func: callable to run
+ :param year: year to run on
+ :param month: month to run on
+ :param day: day of month to run on
+ :param week: week of the year to run on
+ :param day_of_week: weekday to run on (0 = Monday)
+ :param hour: hour to run on
+ :param second: second to run on
+ :param args: list of positional arguments to call func with
+ :param kwargs: dict of keyword arguments to call func with
+ :param name: name of the job
+ :param jobstore: alias of the job store to add the job to
+ :param misfire_grace_time: seconds after the designated run time that
+ the job is still allowed to be run
+ :return: the scheduled job
+ :rtype: :class:`~apscheduler.job.Job`
+ """
+ trigger = CronTrigger(year=year, month=month, day=day, week=week,
+ day_of_week=day_of_week, hour=hour,
+ minute=minute, second=second,
+ start_date=start_date)
+ return self.add_job(trigger, func, args, kwargs, **options)
+
+ def cron_schedule(self, **options):
+ """
+ Decorator version of :meth:`add_cron_job`.
+ This decorator does not wrap its host function.
+ Unscheduling decorated functions is possible by passing the ``job``
+ attribute of the scheduled function to :meth:`unschedule_job`.
+ Any extra keyword arguments are passed along to the constructor of the
+ :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+ """
+ def inner(func):
+ func.job = self.add_cron_job(func, **options)
+ return func
+ return inner
+
+ def interval_schedule(self, **options):
+ """
+ Decorator version of :meth:`add_interval_job`.
+ This decorator does not wrap its host function.
+ Unscheduling decorated functions is possible by passing the ``job``
+ attribute of the scheduled function to :meth:`unschedule_job`.
+ Any extra keyword arguments are passed along to the constructor of the
+ :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+ """
+ def inner(func):
+ func.job = self.add_interval_job(func, **options)
+ return func
+ return inner
+
+ def get_jobs(self):
+ """
+ Returns a list of all scheduled jobs.
+
+ :return: list of :class:`~apscheduler.job.Job` objects
+ """
+ self._jobstores_lock.acquire()
+ try:
+ jobs = []
+ for jobstore in itervalues(self._jobstores):
+ jobs.extend(jobstore.jobs)
+ return jobs
+ finally:
+ self._jobstores_lock.release()
+
+ def unschedule_job(self, job):
+ """
+ Removes a job, preventing it from being run any more.
+ """
+ self._jobstores_lock.acquire()
+ try:
+ for alias, jobstore in iteritems(self._jobstores):
+ if job in list(jobstore.jobs):
+ self._remove_job(job, alias, jobstore)
+ return
+ finally:
+ self._jobstores_lock.release()
+
+ raise KeyError('Job "%s" is not scheduled in any job store' % job)
+
+ def unschedule_func(self, func):
+ """
+ Removes all jobs that would execute the given function.
+ """
+ found = False
+ self._jobstores_lock.acquire()
+ try:
+ for alias, jobstore in iteritems(self._jobstores):
+ for job in list(jobstore.jobs):
+ if job.func == func:
+ self._remove_job(job, alias, jobstore)
+ found = True
+ finally:
+ self._jobstores_lock.release()
+
+ if not found:
+ raise KeyError('The given function is not scheduled in this '
+ 'scheduler')
+
+ def print_jobs(self, out=None):
+ """
+ Prints out a textual listing of all jobs currently scheduled on this
+ scheduler.
+
+ :param out: a file-like object to print to (defaults to **sys.stdout**
+ if nothing is given)
+ """
+ out = out or sys.stdout
+ job_strs = []
+ self._jobstores_lock.acquire()
+ try:
+ for alias, jobstore in iteritems(self._jobstores):
+ job_strs.append('Jobstore %s:' % alias)
+ if jobstore.jobs:
+ for job in jobstore.jobs:
+ job_strs.append(' %s' % job)
+ else:
+ job_strs.append(' No scheduled jobs')
+ finally:
+ self._jobstores_lock.release()
+
+ out.write(os.linesep.join(job_strs) + os.linesep)
+
+ def _run_job(self, job, run_times):
+ """
+ Acts as a harness that runs the actual job code in a thread.
+ """
+ for run_time in run_times:
+ # See if the job missed its run time window, and handle possible
+ # misfires accordingly
+ difference = datetime.now() - run_time
+ grace_time = timedelta(seconds=job.misfire_grace_time)
+ if difference > grace_time:
+ # Notify listeners about a missed run
+ event = JobEvent(EVENT_JOB_MISSED, job, run_time)
+ self._notify_listeners(event)
+ logger.warning('Run time of job "%s" was missed by %s',
+ job, difference)
+ else:
+ try:
+ job.add_instance()
+ except MaxInstancesReachedError:
+ event = JobEvent(EVENT_JOB_MISSED, job, run_time)
+ self._notify_listeners(event)
+ logger.warning('Execution of job "%s" skipped: '
+ 'maximum number of running instances '
+ 'reached (%d)', job, job.max_instances)
+ break
+
+ logger.info('Running job "%s" (scheduled at %s)', job,
+ run_time)
+
+ try:
+ retval = job.func(*job.args, **job.kwargs)
+ except:
+ # Notify listeners about the exception
+ exc, tb = sys.exc_info()[1:]
+ event = JobEvent(EVENT_JOB_ERROR, job, run_time,
+ exception=exc, traceback=tb)
+ self._notify_listeners(event)
+
+ logger.exception('Job "%s" raised an exception', job)
+ else:
+ # Notify listeners about successful execution
+ event = JobEvent(EVENT_JOB_EXECUTED, job, run_time,
+ retval=retval)
+ self._notify_listeners(event)
+
+ logger.info('Job "%s" executed successfully', job)
+
+ job.remove_instance()
+
+ # If coalescing is enabled, don't attempt any further runs
+ if job.coalesce:
+ break
+
+ def _process_jobs(self, now):
+ """
+ Iterates through jobs in every jobstore, starts pending jobs
+ and figures out the next wakeup time.
+ """
+ next_wakeup_time = None
+ self._jobstores_lock.acquire()
+ try:
+ for alias, jobstore in iteritems(self._jobstores):
+ for job in tuple(jobstore.jobs):
+ run_times = job.get_run_times(now)
+ if run_times:
+ self._threadpool.submit(self._run_job, job, run_times)
+
+ # Increase the job's run count
+ if job.coalesce:
+ job.runs += 1
+ else:
+ job.runs += len(run_times)
+
+ # Update the job, but don't keep finished jobs around
+ if job.compute_next_run_time(
+ now + timedelta(microseconds=1)):
+ jobstore.update_job(job)
+ else:
+ self._remove_job(job, alias, jobstore)
+
+ if not next_wakeup_time:
+ next_wakeup_time = job.next_run_time
+ elif job.next_run_time:
+ next_wakeup_time = min(next_wakeup_time,
+ job.next_run_time)
+ return next_wakeup_time
+ finally:
+ self._jobstores_lock.release()
+
+ def _main_loop(self):
+ """Executes jobs on schedule."""
+
+ logger.info('Scheduler started')
+ self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START))
+
+ self._wakeup.clear()
+ while not self._stopped:
+ logger.debug('Looking for jobs to run')
+ now = datetime.now()
+ next_wakeup_time = self._process_jobs(now)
+
+ # Sleep until the next job is scheduled to be run,
+ # a new job is added or the scheduler is stopped
+ if next_wakeup_time is not None:
+ wait_seconds = time_difference(next_wakeup_time, now)
+ logger.debug('Next wakeup is due at %s (in %f seconds)',
+ next_wakeup_time, wait_seconds)
+ try:
+ self._wakeup.wait(wait_seconds)
+ except IOError: # Catch errno 514 on some Linux kernels
+ pass
+ self._wakeup.clear()
+ elif self.standalone:
+ logger.debug('No jobs left; shutting down scheduler')
+ self.shutdown()
+ break
+ else:
+ logger.debug('No jobs; waiting until a job is added')
+ try:
+ self._wakeup.wait()
+ except IOError: # Catch errno 514 on some Linux kernels
+ pass
+ self._wakeup.clear()
+
+ logger.info('Scheduler has been shut down')
+ self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
new file mode 100644
index 0000000..8ec47da
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
@@ -0,0 +1,133 @@
+"""
+Generic thread pool class. Modeled after Java's ThreadPoolExecutor.
+Please note that this ThreadPool does *not* fully implement the PEP 3148
+ThreadPool!
+"""
+
+from threading import Thread, Lock, currentThread
+from weakref import ref
+import logging
+import atexit
+
+try:
+ from queue import Queue, Empty
+except ImportError:
+ from Queue import Queue, Empty
+
+logger = logging.getLogger(__name__)
+_threadpools = set()
+
+
+# Worker threads are daemonic in order to let the interpreter exit without
+# an explicit shutdown of the thread pool. The following trick is necessary
+# to allow worker threads to finish cleanly.
+def _shutdown_all():
+ for pool_ref in tuple(_threadpools):
+ pool = pool_ref()
+ if pool:
+ pool.shutdown()
+
+atexit.register(_shutdown_all)
+
+
+class ThreadPool(object):
+ def __init__(self, core_threads=0, max_threads=20, keepalive=1):
+ """
+ :param core_threads: maximum number of persistent threads in the pool
+ :param max_threads: maximum number of total threads in the pool
+ :param thread_class: callable that creates a Thread object
+ :param keepalive: seconds to keep non-core worker threads waiting
+ for new tasks
+ """
+ self.core_threads = core_threads
+ self.max_threads = max(max_threads, core_threads, 1)
+ self.keepalive = keepalive
+ self._queue = Queue()
+ self._threads_lock = Lock()
+ self._threads = set()
+ self._shutdown = False
+
+ _threadpools.add(ref(self))
+ logger.info('Started thread pool with %d core threads and %s maximum '
+ 'threads', core_threads, max_threads or 'unlimited')
+
+ def _adjust_threadcount(self):
+ self._threads_lock.acquire()
+ try:
+ if self.num_threads < self.max_threads:
+ self._add_thread(self.num_threads < self.core_threads)
+ finally:
+ self._threads_lock.release()
+
+ def _add_thread(self, core):
+ t = Thread(target=self._run_jobs, args=(core,))
+ t.setDaemon(True)
+ t.start()
+ self._threads.add(t)
+
+ def _run_jobs(self, core):
+ logger.debug('Started worker thread')
+ block = True
+ timeout = None
+ if not core:
+ block = self.keepalive > 0
+ timeout = self.keepalive
+
+ while True:
+ try:
+ func, args, kwargs = self._queue.get(block, timeout)
+ except Empty:
+ break
+
+ if self._shutdown:
+ break
+
+ try:
+ func(*args, **kwargs)
+ except:
+ logger.exception('Error in worker thread')
+
+ self._threads_lock.acquire()
+ self._threads.remove(currentThread())
+ self._threads_lock.release()
+
+ logger.debug('Exiting worker thread')
+
+ @property
+ def num_threads(self):
+ return len(self._threads)
+
+ def submit(self, func, *args, **kwargs):
+ if self._shutdown:
+ raise RuntimeError('Cannot schedule new tasks after shutdown')
+
+ self._queue.put((func, args, kwargs))
+ self._adjust_threadcount()
+
+ def shutdown(self, wait=True):
+ if self._shutdown:
+ return
+
+ logging.info('Shutting down thread pool')
+ self._shutdown = True
+ _threadpools.remove(ref(self))
+
+ self._threads_lock.acquire()
+ for _ in range(self.num_threads):
+ self._queue.put((None, None, None))
+ self._threads_lock.release()
+
+ if wait:
+ self._threads_lock.acquire()
+ threads = tuple(self._threads)
+ self._threads_lock.release()
+ for thread in threads:
+ thread.join()
+
+ def __repr__(self):
+ if self.max_threads:
+ threadcount = '%d/%d' % (self.num_threads, self.max_threads)
+ else:
+ threadcount = '%d' % self.num_threads
+
+ return '<ThreadPool at %x; threads=%s>' % (id(self), threadcount)
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py
new file mode 100644
index 0000000..74a9788
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py
@@ -0,0 +1,3 @@
+from apscheduler.triggers.cron import CronTrigger
+from apscheduler.triggers.interval import IntervalTrigger
+from apscheduler.triggers.simple import SimpleTrigger
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py
new file mode 100644
index 0000000..9e69f72
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py
@@ -0,0 +1,144 @@
+from datetime import date, datetime
+
+from apscheduler.triggers.cron.fields import *
+from apscheduler.util import datetime_ceil, convert_to_datetime, iteritems
+
+
+class CronTrigger(object):
+ FIELD_NAMES = ('year', 'month', 'day', 'week', 'day_of_week', 'hour',
+ 'minute', 'second')
+ FIELDS_MAP = {'year': BaseField,
+ 'month': BaseField,
+ 'week': WeekField,
+ 'day': DayOfMonthField,
+ 'day_of_week': DayOfWeekField,
+ 'hour': BaseField,
+ 'minute': BaseField,
+ 'second': BaseField}
+
+ def __init__(self, **values):
+ self.start_date = values.pop('start_date', None)
+ if self.start_date:
+ self.start_date = convert_to_datetime(self.start_date)
+
+ # Check field names and yank out all None valued fields
+ for key, value in list(iteritems(values)):
+ if key not in self.FIELD_NAMES:
+ raise TypeError('Invalid field name: %s' % key)
+ if value is None:
+ del values[key]
+
+ self.fields = []
+ assign_defaults = False
+ for field_name in self.FIELD_NAMES:
+ if field_name in values:
+ exprs = values.pop(field_name)
+ is_default = False
+ assign_defaults = not values
+ elif assign_defaults:
+ exprs = DEFAULT_VALUES[field_name]
+ is_default = True
+ else:
+ exprs = '*'
+ is_default = True
+
+ field_class = self.FIELDS_MAP[field_name]
+ field = field_class(field_name, exprs, is_default)
+ self.fields.append(field)
+
+ def _increment_field_value(self, dateval, fieldnum):
+ """
+ Increments the designated field and resets all less significant fields
+ to their minimum values.
+
+ :type dateval: datetime
+ :type fieldnum: int
+ :type amount: int
+ :rtype: tuple
+ :return: a tuple containing the new date, and the number of the field
+ that was actually incremented
+ """
+ i = 0
+ values = {}
+ while i < len(self.fields):
+ field = self.fields[i]
+ if not field.REAL:
+ if i == fieldnum:
+ fieldnum -= 1
+ i -= 1
+ else:
+ i += 1
+ continue
+
+ if i < fieldnum:
+ values[field.name] = field.get_value(dateval)
+ i += 1
+ elif i > fieldnum:
+ values[field.name] = field.get_min(dateval)
+ i += 1
+ else:
+ value = field.get_value(dateval)
+ maxval = field.get_max(dateval)
+ if value == maxval:
+ fieldnum -= 1
+ i -= 1
+ else:
+ values[field.name] = value + 1
+ i += 1
+
+ return datetime(**values), fieldnum
+
+ def _set_field_value(self, dateval, fieldnum, new_value):
+ values = {}
+ for i, field in enumerate(self.fields):
+ if field.REAL:
+ if i < fieldnum:
+ values[field.name] = field.get_value(dateval)
+ elif i > fieldnum:
+ values[field.name] = field.get_min(dateval)
+ else:
+ values[field.name] = new_value
+
+ return datetime(**values)
+
+ def get_next_fire_time(self, start_date):
+ if self.start_date:
+ start_date = max(start_date, self.start_date)
+ next_date = datetime_ceil(start_date)
+ fieldnum = 0
+ while 0 <= fieldnum < len(self.fields):
+ field = self.fields[fieldnum]
+ curr_value = field.get_value(next_date)
+ next_value = field.get_next_value(next_date)
+
+ if next_value is None:
+ # No valid value was found
+ next_date, fieldnum = self._increment_field_value(
+ next_date, fieldnum - 1)
+ elif next_value > curr_value:
+ # A valid, but higher than the starting value, was found
+ if field.REAL:
+ next_date = self._set_field_value(
+ next_date, fieldnum, next_value)
+ fieldnum += 1
+ else:
+ next_date, fieldnum = self._increment_field_value(
+ next_date, fieldnum)
+ else:
+ # A valid value was found, no changes necessary
+ fieldnum += 1
+
+ if fieldnum >= 0:
+ return next_date
+
+ def __str__(self):
+ options = ["%s='%s'" % (f.name, str(f)) for f in self.fields
+ if not f.is_default]
+ return 'cron[%s]' % (', '.join(options))
+
+ def __repr__(self):
+ options = ["%s='%s'" % (f.name, str(f)) for f in self.fields
+ if not f.is_default]
+ if self.start_date:
+ options.append("start_date='%s'" % self.start_date.isoformat(' '))
+ return '<%s (%s)>' % (self.__class__.__name__, ', '.join(options))
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py
new file mode 100644
index 0000000..b5d2919
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py
@@ -0,0 +1,194 @@
+"""
+This module contains the expressions applicable for CronTrigger's fields.
+"""
+
+from calendar import monthrange
+import re
+
+from apscheduler.util import asint
+
+__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression',
+ 'WeekdayPositionExpression', 'LastDayOfMonthExpression')
+
+
+WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
+
+
+class AllExpression(object):
+ value_re = re.compile(r'\*(?:/(?P<step>\d+))?$')
+
+ def __init__(self, step=None):
+ self.step = asint(step)
+ if self.step == 0:
+ raise ValueError('Increment must be higher than 0')
+
+ def get_next_value(self, date, field):
+ start = field.get_value(date)
+ minval = field.get_min(date)
+ maxval = field.get_max(date)
+ start = max(start, minval)
+
+ if not self.step:
+ next = start
+ else:
+ distance_to_next = (self.step - (start - minval)) % self.step
+ next = start + distance_to_next
+
+ if next <= maxval:
+ return next
+
+ def __str__(self):
+ if self.step:
+ return '*/%d' % self.step
+ return '*'
+
+ def __repr__(self):
+ return "%s(%s)" % (self.__class__.__name__, self.step)
+
+
+class RangeExpression(AllExpression):
+ value_re = re.compile(
+ r'(?P<first>\d+)(?:-(?P<last>\d+))?(?:/(?P<step>\d+))?$')
+
+ def __init__(self, first, last=None, step=None):
+ AllExpression.__init__(self, step)
+ first = asint(first)
+ last = asint(last)
+ if last is None and step is None:
+ last = first
+ if last is not None and first > last:
+ raise ValueError('The minimum value in a range must not be '
+ 'higher than the maximum')
+ self.first = first
+ self.last = last
+
+ def get_next_value(self, date, field):
+ start = field.get_value(date)
+ minval = field.get_min(date)
+ maxval = field.get_max(date)
+
+ # Apply range limits
+ minval = max(minval, self.first)
+ if self.last is not None:
+ maxval = min(maxval, self.last)
+ start = max(start, minval)
+
+ if not self.step:
+ next = start
+ else:
+ distance_to_next = (self.step - (start - minval)) % self.step
+ next = start + distance_to_next
+
+ if next <= maxval:
+ return next
+
+ def __str__(self):
+ if self.last != self.first and self.last is not None:
+ range = '%d-%d' % (self.first, self.last)
+ else:
+ range = str(self.first)
+
+ if self.step:
+ return '%s/%d' % (range, self.step)
+ return range
+
+ def __repr__(self):
+ args = [str(self.first)]
+ if self.last != self.first and self.last is not None or self.step:
+ args.append(str(self.last))
+ if self.step:
+ args.append(str(self.step))
+ return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
+
+
+class WeekdayRangeExpression(RangeExpression):
+ value_re = re.compile(r'(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?',
+ re.IGNORECASE)
+
+ def __init__(self, first, last=None):
+ try:
+ first_num = WEEKDAYS.index(first.lower())
+ except ValueError:
+ raise ValueError('Invalid weekday name "%s"' % first)
+
+ if last:
+ try:
+ last_num = WEEKDAYS.index(last.lower())
+ except ValueError:
+ raise ValueError('Invalid weekday name "%s"' % last)
+ else:
+ last_num = None
+
+ RangeExpression.__init__(self, first_num, last_num)
+
+ def __str__(self):
+ if self.last != self.first and self.last is not None:
+ return '%s-%s' % (WEEKDAYS[self.first], WEEKDAYS[self.last])
+ return WEEKDAYS[self.first]
+
+ def __repr__(self):
+ args = ["'%s'" % WEEKDAYS[self.first]]
+ if self.last != self.first and self.last is not None:
+ args.append("'%s'" % WEEKDAYS[self.last])
+ return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
+
+
+class WeekdayPositionExpression(AllExpression):
+ options = ['1st', '2nd', '3rd', '4th', '5th', 'last']
+ value_re = re.compile(r'(?P<option_name>%s) +(?P<weekday_name>(?:\d+|\w+))'
+ % '|'.join(options), re.IGNORECASE)
+
+ def __init__(self, option_name, weekday_name):
+ try:
+ self.option_num = self.options.index(option_name.lower())
+ except ValueError:
+ raise ValueError('Invalid weekday position "%s"' % option_name)
+
+ try:
+ self.weekday = WEEKDAYS.index(weekday_name.lower())
+ except ValueError:
+ raise ValueError('Invalid weekday name "%s"' % weekday_name)
+
+ def get_next_value(self, date, field):
+ # Figure out the weekday of the month's first day and the number
+ # of days in that month
+ first_day_wday, last_day = monthrange(date.year, date.month)
+
+ # Calculate which day of the month is the first of the target weekdays
+ first_hit_day = self.weekday - first_day_wday + 1
+ if first_hit_day <= 0:
+ first_hit_day += 7
+
+ # Calculate what day of the month the target weekday would be
+ if self.option_num < 5:
+ target_day = first_hit_day + self.option_num * 7
+ else:
+ target_day = first_hit_day + ((last_day - first_hit_day) / 7) * 7
+
+ if target_day <= last_day and target_day >= date.day:
+ return target_day
+
+ def __str__(self):
+ return '%s %s' % (self.options[self.option_num],
+ WEEKDAYS[self.weekday])
+
+ def __repr__(self):
+ return "%s('%s', '%s')" % (self.__class__.__name__,
+ self.options[self.option_num],
+ WEEKDAYS[self.weekday])
+
+
+class LastDayOfMonthExpression(AllExpression):
+ value_re = re.compile(r'last', re.IGNORECASE)
+
+ def __init__(self):
+ pass
+
+ def get_next_value(self, date, field):
+ return monthrange(date.year, date.month)[1]
+
+ def __str__(self):
+ return 'last'
+
+ def __repr__(self):
+ return "%s()" % self.__class__.__name__
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py
new file mode 100644
index 0000000..be5e5e3
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py
@@ -0,0 +1,100 @@
+"""
+Fields represent CronTrigger options which map to :class:`~datetime.datetime`
+fields.
+"""
+
+from calendar import monthrange
+
+from apscheduler.triggers.cron.expressions import *
+
+__all__ = ('MIN_VALUES', 'MAX_VALUES', 'DEFAULT_VALUES', 'BaseField',
+ 'WeekField', 'DayOfMonthField', 'DayOfWeekField')
+
+
+MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1,
+ 'day_of_week': 0, 'hour': 0, 'minute': 0, 'second': 0}
+MAX_VALUES = {'year': 2 ** 63, 'month': 12, 'day:': 31, 'week': 53,
+ 'day_of_week': 6, 'hour': 23, 'minute': 59, 'second': 59}
+DEFAULT_VALUES = {'year': '*', 'month': 1, 'day': 1, 'week': '*',
+ 'day_of_week': '*', 'hour': 0, 'minute': 0, 'second': 0}
+
+
+class BaseField(object):
+ REAL = True
+ COMPILERS = [AllExpression, RangeExpression]
+
+ def __init__(self, name, exprs, is_default=False):
+ self.name = name
+ self.is_default = is_default
+ self.compile_expressions(exprs)
+
+ def get_min(self, dateval):
+ return MIN_VALUES[self.name]
+
+ def get_max(self, dateval):
+ return MAX_VALUES[self.name]
+
+ def get_value(self, dateval):
+ return getattr(dateval, self.name)
+
+ def get_next_value(self, dateval):
+ smallest = None
+ for expr in self.expressions:
+ value = expr.get_next_value(dateval, self)
+ if smallest is None or (value is not None and value < smallest):
+ smallest = value
+
+ return smallest
+
+ def compile_expressions(self, exprs):
+ self.expressions = []
+
+ # Split a comma-separated expression list, if any
+ exprs = str(exprs).strip()
+ if ',' in exprs:
+ for expr in exprs.split(','):
+ self.compile_expression(expr)
+ else:
+ self.compile_expression(exprs)
+
+ def compile_expression(self, expr):
+ for compiler in self.COMPILERS:
+ match = compiler.value_re.match(expr)
+ if match:
+ compiled_expr = compiler(**match.groupdict())
+ self.expressions.append(compiled_expr)
+ return
+
+ raise ValueError('Unrecognized expression "%s" for field "%s"' %
+ (expr, self.name))
+
+ def __str__(self):
+ expr_strings = (str(e) for e in self.expressions)
+ return ','.join(expr_strings)
+
+ def __repr__(self):
+ return "%s('%s', '%s')" % (self.__class__.__name__, self.name,
+ str(self))
+
+
+class WeekField(BaseField):
+ REAL = False
+
+ def get_value(self, dateval):
+ return dateval.isocalendar()[1]
+
+
+class DayOfMonthField(BaseField):
+ COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression,
+ LastDayOfMonthExpression]
+
+ def get_max(self, dateval):
+ return monthrange(dateval.year, dateval.month)[1]
+
+
+class DayOfWeekField(BaseField):
+ REAL = False
+ COMPILERS = BaseField.COMPILERS + [WeekdayRangeExpression]
+
+ def get_value(self, dateval):
+ return dateval.weekday()
http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py
new file mode 100644
index 0000000..dd16d77
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py
@@ -0,0 +1,39 @@
+from datetime import datetime, timedelta
+from math import ceil
+
+from apscheduler.util import convert_to_datetime, timedelta_seconds
+
+
+class IntervalTrigger(object):
+ def __init__(self, interval, start_date=None):
+ if not isinstance(interval, timedelta):
+ raise TypeError('interval must be a timedelta')
+ if start_date:
+ start_date = convert_to_datetime(start_date)
+
+ self.interval = interval
+ self.interval_length = timedelta_seconds(self.interval)
+ if self.interval_length == 0:
+ self.interval = timedelta(seconds=1)
+ self.interval_length = 1
+
+ if start_date is None:
+ self.start_date = datetime.now() + self.interval
+ else:
+ self.start_date = convert_to_datetime(start_date)
+
+ def get_next_fire_time(self, start_date):
+ if start_date < self.start_date:
+ return self.start_date
+
+ timediff_seconds = timedelta_seconds(start_date - self.start_date)
+ next_interval_num = int(ceil(timediff_seconds / self.interval_length))
+ return self.start_date + self.interval * next_interval_num
+
+ def __str__(self):
+ return 'interval[%s]' % str(self.interval)
+
+ def __repr__(self):
+ return "<%s (interval=%s, start_date=%s)>" % (
+ self.__class__.__name__, repr(self.interval),
+ repr(self.start_date))