You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2014/08/18 17:33:07 UTC

[1/2] AMBARI-6887. Alerts: groundwork for alert collection (ncole)

Repository: ambari
Updated Branches:
  refs/heads/branch-alerts-dev 26b162af8 -> 9e529eddc


http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/simple.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/simple.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/simple.py
new file mode 100644
index 0000000..ea61b3f
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/simple.py
@@ -0,0 +1,17 @@
+from apscheduler.util import convert_to_datetime
+
+
+class SimpleTrigger(object):
+    def __init__(self, run_date):
+        self.run_date = convert_to_datetime(run_date)
+
+    def get_next_fire_time(self, start_date):
+        if self.run_date >= start_date:
+            return self.run_date
+
+    def __str__(self):
+        return 'date[%s]' % str(self.run_date)
+
+    def __repr__(self):
+        return '<%s (run_date=%s)>' % (
+            self.__class__.__name__, repr(self.run_date))

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/util.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/util.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/util.py
new file mode 100644
index 0000000..dcede4c
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/util.py
@@ -0,0 +1,230 @@
+"""
+This module contains several handy functions primarily meant for internal use.
+"""
+
+from datetime import date, datetime, timedelta
+from time import mktime
+import re
+import sys
+
+__all__ = ('asint', 'asbool', 'convert_to_datetime', 'timedelta_seconds',
+           'time_difference', 'datetime_ceil', 'combine_opts',
+           'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref',
+           'to_unicode', 'iteritems', 'itervalues', 'xrange')
+
+
+def asint(text):
+    """
+    Safely converts a string to an integer, returning None if the string
+    is None.
+
+    :type text: str
+    :rtype: int
+    """
+    if text is not None:
+        return int(text)
+
+
+def asbool(obj):
+    """
+    Interprets an object as a boolean value.
+
+    :rtype: bool
+    """
+    if isinstance(obj, str):
+        obj = obj.strip().lower()
+        if obj in ('true', 'yes', 'on', 'y', 't', '1'):
+            return True
+        if obj in ('false', 'no', 'off', 'n', 'f', '0'):
+            return False
+        raise ValueError('Unable to interpret value "%s" as boolean' % obj)
+    return bool(obj)
+
+
+_DATE_REGEX = re.compile(
+    r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})'
+    r'(?: (?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})'
+    r'(?:\.(?P<microsecond>\d{1,6}))?)?')
+
+
+def convert_to_datetime(input):
+    """
+    Converts the given object to a datetime object, if possible.
+    If an actual datetime object is passed, it is returned unmodified.
+    If the input is a string, it is parsed as a datetime.
+
+    Date strings are accepted in three different forms: date only (Y-m-d),
+    date with time (Y-m-d H:M:S) or with date+time with microseconds
+    (Y-m-d H:M:S.micro).
+
+    :rtype: datetime
+    """
+    if isinstance(input, datetime):
+        return input
+    elif isinstance(input, date):
+        return datetime.fromordinal(input.toordinal())
+    elif isinstance(input, basestring):
+        m = _DATE_REGEX.match(input)
+        if not m:
+            raise ValueError('Invalid date string')
+        values = [(k, int(v or 0)) for k, v in m.groupdict().items()]
+        values = dict(values)
+        return datetime(**values)
+    raise TypeError('Unsupported input type: %s' % type(input))
+
+
+def timedelta_seconds(delta):
+    """
+    Converts the given timedelta to seconds.
+
+    :type delta: timedelta
+    :rtype: float
+    """
+    return delta.days * 24 * 60 * 60 + delta.seconds + \
+        delta.microseconds / 1000000.0
+
+
+def time_difference(date1, date2):
+    """
+    Returns the time difference in seconds between the given two
+    datetime objects. The difference is calculated as: date1 - date2.
+
+    :param date1: the later datetime
+    :type date1: datetime
+    :param date2: the earlier datetime
+    :type date2: datetime
+    :rtype: float
+    """
+    later = mktime(date1.timetuple()) + date1.microsecond / 1000000.0
+    earlier = mktime(date2.timetuple()) + date2.microsecond / 1000000.0
+    return later - earlier
+
+
+def datetime_ceil(dateval):
+    """
+    Rounds the given datetime object upwards.
+
+    :type dateval: datetime
+    """
+    if dateval.microsecond > 0:
+        return dateval + timedelta(seconds=1,
+                                   microseconds=-dateval.microsecond)
+    return dateval
+
+
+def combine_opts(global_config, prefix, local_config={}):
+    """
+    Returns a subdictionary from keys and values of  ``global_config`` where
+    the key starts with the given prefix, combined with options from
+    local_config. The keys in the subdictionary have the prefix removed.
+
+    :type global_config: dict
+    :type prefix: str
+    :type local_config: dict
+    :rtype: dict
+    """
+    prefixlen = len(prefix)
+    subconf = {}
+    for key, value in global_config.items():
+        if key.startswith(prefix):
+            key = key[prefixlen:]
+            subconf[key] = value
+    subconf.update(local_config)
+    return subconf
+
+
+def get_callable_name(func):
+    """
+    Returns the best available display name for the given function/callable.
+    """
+    f_self = getattr(func, '__self__', None) or getattr(func, 'im_self', None)
+
+    if f_self and hasattr(func, '__name__'):
+        if isinstance(f_self, type):
+            # class method
+            clsname = getattr(f_self, '__qualname__', None) or f_self.__name__
+            return '%s.%s' % (clsname, func.__name__)
+        # bound method
+        return '%s.%s' % (f_self.__class__.__name__, func.__name__)
+
+    if hasattr(func, '__call__'):
+        if hasattr(func, '__name__'):
+            # function, unbound method or a class with a __call__ method
+            return func.__name__
+        # instance of a class with a __call__ method
+        return func.__class__.__name__
+
+    raise TypeError('Unable to determine a name for %s -- '
+                    'maybe it is not a callable?' % repr(func))
+
+
+def obj_to_ref(obj):
+    """
+    Returns the path to the given object.
+    """
+    ref = '%s:%s' % (obj.__module__, get_callable_name(obj))
+    try:
+        obj2 = ref_to_obj(ref)
+        if obj != obj2:
+            raise ValueError
+    except Exception:
+        raise ValueError('Cannot determine the reference to %s' % repr(obj))
+
+    return ref
+
+
+def ref_to_obj(ref):
+    """
+    Returns the object pointed to by ``ref``.
+    """
+    if not isinstance(ref, basestring):
+        raise TypeError('References must be strings')
+    if not ':' in ref:
+        raise ValueError('Invalid reference')
+
+    modulename, rest = ref.split(':', 1)
+    try:
+        obj = __import__(modulename)
+    except ImportError:
+        raise LookupError('Error resolving reference %s: '
+                          'could not import module' % ref)
+
+    try:
+        for name in modulename.split('.')[1:] + rest.split('.'):
+            obj = getattr(obj, name)
+        return obj
+    except Exception:
+        raise LookupError('Error resolving reference %s: '
+                          'error looking up object' % ref)
+
+
+def maybe_ref(ref):
+    """
+    Returns the object that the given reference points to, if it is indeed
+    a reference. If it is not a reference, the object is returned as-is.
+    """
+    if not isinstance(ref, str):
+        return ref
+    return ref_to_obj(ref)
+
+
+def to_unicode(string, encoding='ascii'):
+    """
+    Safely converts a string to a unicode representation on any
+    Python version.
+    """
+    if hasattr(string, 'decode'):
+        return string.decode(encoding, 'ignore')
+    return string  # pragma: nocover
+
+
+if sys.version_info < (3, 0):  # pragma: nocover
+    iteritems = lambda d: d.iteritems()
+    itervalues = lambda d: d.itervalues()
+    xrange = xrange
+    basestring = basestring
+else:  # pragma: nocover
+    iteritems = lambda d: d.items()
+    itervalues = lambda d: d.values()
+    xrange = range
+    basestring = str

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
new file mode 100644
index 0000000..51c3af9
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import sys
+from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
+from ambari_agent.apscheduler.scheduler import Scheduler
+from ambari_agent.alerts.port_alert import PortAlert
+from mock.mock import patch
+from unittest import TestCase
+
+class TestAlerts(TestCase):
+
+  def setUp(self):
+    pass
+
+  def tearDown(self):
+    sys.stdout == sys.__stdout__
+
+  @patch.object(Scheduler, "add_interval_job")
+  def test_build(self, aps_add_interval_job_mock):
+    test_file_path = os.path.join('ambari_agent', 'dummy_files', 'alert_definitions.json')
+
+    ash = AlertSchedulerHandler(test_file_path)
+
+    self.assertTrue(aps_add_interval_job_mock.called)
+
+  def test_port_alert(self):
+    json = { "name": "namenode_process",
+      "service": "HDFS",
+      "component": "NAMENODE",
+      "label": "NameNode process",
+      "interval": 6,
+      "scope": "host",
+      "source": {
+        "type": "PORT",
+        "uri": "http://c6401.ambari.apache.org:50070",
+        "default_port": 50070,
+        "reporting": {
+          "ok": {
+            "text": "TCP OK - {0:.4f} response time on port {1}"
+          },
+          "critical": {
+            "text": "Could not load process info: {0}"
+          }
+        }
+      }
+    }
+
+    pa = PortAlert(json, json['source'])
+    self.assertEquals(6, pa.interval())
+
+    res = pa.collect()
+    
+    pass
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json
new file mode 100644
index 0000000..6c55966
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json
@@ -0,0 +1,46 @@
+{
+  "c1": [
+    {
+      "name": "namenode_cpu",
+      "label": "NameNode host CPU Utilization",
+      "scope": "host",
+      "source": {
+        "type": "METRIC",
+        "jmx": "java.lang:type=OperatingSystem/SystemCpuLoad",
+        "host": "{{hdfs-site/dfs.namenode.secondary.http-address}}"
+      }
+    },
+    {
+      "name": "namenode_process",
+      "service": "HDFS",
+      "component": "NAMENODE",
+      "label": "NameNode process",
+      "interval": 6,
+      "scope": "host",
+      "source": {
+        "type": "PORT",
+        "uri": "http://c6401.ambari.apache.org:50070",
+        "default_port": 50070,
+        "reporting": {
+          "ok": {
+            "text": "TCP OK - {0:.4f} response time on port {1}"
+          },
+          "critical": {
+            "text": "Could not load process info: {0}"
+          }
+        }
+      }
+    },
+    {
+      "name": "hdfs_last_checkpoint",
+      "label": "Last Checkpoint Time",
+      "interval": 1,
+      "scope": "service",
+      "enabled": false,
+      "source": {
+        "type": "SCRIPT",
+        "path": "scripts/alerts/last_checkpoint.py"
+      }
+    }
+  ]
+}


[2/2] git commit: AMBARI-6887. Alerts: groundwork for alert collection (ncole)

Posted by nc...@apache.org.
AMBARI-6887. Alerts:  groundwork for alert collection (ncole)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9e529edd
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9e529edd
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9e529edd

Branch: refs/heads/branch-alerts-dev
Commit: 9e529eddcf0423293046f6c2d85bc59f66b52c6a
Parents: 26b162a
Author: Nate Cole <nc...@hortonworks.com>
Authored: Sun Aug 17 19:26:50 2014 -0400
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Mon Aug 18 11:10:59 2014 -0400

----------------------------------------------------------------------
 ambari-agent/pom.xml                            |   1 +
 .../ambari_agent/AlertSchedulerHandler.py       | 127 ++++
 .../src/main/python/ambari_agent/Controller.py  |  12 +-
 .../main/python/ambari_agent/alerts/__init__.py |  18 +
 .../python/ambari_agent/alerts/base_alert.py    |  72 +++
 .../python/ambari_agent/alerts/port_alert.py    |  96 +++
 .../python/ambari_agent/apscheduler/__init__.py |   3 +
 .../python/ambari_agent/apscheduler/events.py   |  64 ++
 .../main/python/ambari_agent/apscheduler/job.py | 137 +++++
 .../apscheduler/jobstores/__init__.py           |   0
 .../ambari_agent/apscheduler/jobstores/base.py  |  25 +
 .../apscheduler/jobstores/mongodb_store.py      |  84 +++
 .../apscheduler/jobstores/ram_store.py          |  25 +
 .../apscheduler/jobstores/redis_store.py        |  91 +++
 .../apscheduler/jobstores/shelve_store.py       |  74 +++
 .../apscheduler/jobstores/sqlalchemy_store.py   |  91 +++
 .../ambari_agent/apscheduler/scheduler.py       | 607 +++++++++++++++++++
 .../ambari_agent/apscheduler/threadpool.py      | 133 ++++
 .../apscheduler/triggers/__init__.py            |   3 +
 .../apscheduler/triggers/cron/__init__.py       | 144 +++++
 .../apscheduler/triggers/cron/expressions.py    | 194 ++++++
 .../apscheduler/triggers/cron/fields.py         | 100 +++
 .../apscheduler/triggers/interval.py            |  39 ++
 .../ambari_agent/apscheduler/triggers/simple.py |  17 +
 .../python/ambari_agent/apscheduler/util.py     | 230 +++++++
 .../src/test/python/ambari_agent/TestAlerts.py  |  73 +++
 .../dummy_files/alert_definitions.json          |  46 ++
 27 files changed, 2505 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml
index c98af34..2475b22 100644
--- a/ambari-agent/pom.xml
+++ b/ambari-agent/pom.xml
@@ -624,6 +624,7 @@
             <exclude>src/test/python/ambari_agent/dummy_files/*</exclude>
             <exclude>src/test/python/ambari_agent/dummy*.txt</exclude>
             <exclude>src/main/python/ambari_agent/imports.txt</exclude>
+            <exclude>src/main/python/ambari_agent/apscheduler/**</exclude>
             <exclude>**/*.erb</exclude>
             <exclude>**/*.json</exclude>
             <exclude>**/*.pydevproject</exclude>

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
new file mode 100644
index 0000000..cd0605f
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+'''
+http://apscheduler.readthedocs.org/en/v2.1.2
+'''
+from apscheduler.scheduler import Scheduler
+from alerts.port_alert import PortAlert
+import json
+import logging
+import sys
+import time
+
+logger = logging.getLogger()
+
+class AlertSchedulerHandler():
+
+  def __init__(self, filename, in_minutes=True):
+    self.filename = filename
+    
+    config = {
+      'threadpool.core_threads': 3,
+      'coalesce': True,
+      'standalone': False
+    }
+
+    self.scheduler = Scheduler(config)
+
+    alert_callables = self.__load_alerts()
+
+    for _callable in alert_callables:
+      if in_minutes:
+        self.scheduler.add_interval_job(self.__make_function(_callable),
+          minutes=_callable.interval())
+      else:
+        self.scheduler.add_interval_job(self.__make_function(_callable),
+          seconds=_callable.interval())
+      
+  def __make_function(self, alert_def):
+    return lambda: alert_def.collect()
+
+  def start(self):
+    if not self.scheduler is None:
+      self.scheduler.start()
+    
+  def stop(self):
+    if not self.scheduler is None:
+      self.scheduler.shutdown(wait=False)
+      self.scheduler = None
+      
+  def __load_alerts(self):
+    definitions = []
+    try:
+      # FIXME make location configurable
+      with open(self.filename) as fp: 
+        cluster_defs = json.load(fp)
+        for deflist in cluster_defs.values():
+          for definition in deflist:
+            obj = self.__json_to_callable(definition)
+            if obj is not None:
+              definitions.append(obj)
+    except:
+      import traceback
+      traceback.print_exc()
+      pass
+    return definitions
+
+  def __json_to_callable(self, json_definition):
+    source = json_definition['source']
+    source_type = source.get('type', '')
+
+    alert = None
+
+    if source_type == 'METRIC':
+      pass
+    elif source_type == 'PORT':
+      alert = PortAlert(json_definition, source)
+    elif type == 'SCRIPT':
+      pass
+
+    return alert
+    
+  def __json_to_meta(self, json_definition):
+    pass
+
+def main():
+  args = list(sys.argv)
+  del args[0]
+
+  try:
+    logger.setLevel(logger.debug)
+  except TypeError:
+    logger.setLevel(12)
+    
+  ash = AlertSchedulerHandler(args[0], False)
+  ash.start()
+  
+  i = 0
+  try:
+    while i < 10:
+      time.sleep(1)
+      i += 1
+  except KeyboardInterrupt:
+    pass
+  ash.stop()
+    
+if __name__ == "__main__":
+  main()
+  
+      

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 87af939..3be54c2 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -39,7 +39,7 @@ import security
 from NetUtil import NetUtil
 import ssl
 from LiveStatus import LiveStatus
-
+from AlertSchedulerHandler import AlertSchedulerHandler
 
 logger = logging.getLogger()
 
@@ -73,6 +73,14 @@ class Controller(threading.Thread):
     self.heartbeat_wait_event = threading.Event()
     # List of callbacks that are called at agent registration
     self.registration_listeners = []
+    
+    # pull config directory out of config
+    cache_dir = config.get('agent', 'cache_dir')
+    if cache_dir is None:
+      cache_dir = '/var/lib/ambari-agent/cache'
+      
+    self.alert_scheduler_handler = AlertSchedulerHandler(
+     os.path.join(cache_dir, 'alerts', 'alert_definitions.json'))
 
 
   def __del__(self):
@@ -317,6 +325,8 @@ class Controller(threading.Thread):
     message = registerResponse['response']
     logger.info("Registration response from %s was %s", self.serverHostname, message)
 
+    self.alert_scheduler_handler.start()
+
     if self.isRegistered:
       # Clearing command queue to stop executing "stale" commands
       # after registration

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py b/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py
new file mode 100644
index 0000000..0a0e1ca
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py
@@ -0,0 +1,18 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
new file mode 100644
index 0000000..e102d56
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+
+logger = logging.getLogger()
+
+class BaseAlert(object):
+  RESULT_OK = 'OK'
+  RESULT_WARNING = 'WARNING'
+  RESULT_CRITICAL = 'CRITICAL'
+  RESULT_UNKNOWN = 'UNKNOWN'
+  
+  def __init__(self, alert_meta, alert_source_meta):
+    self.alert_meta = alert_meta
+    self.alert_source_meta = alert_source_meta
+    
+  def interval(self):
+    if not self.alert_meta.has_key('interval'):
+      return 1
+    else:
+      return self.alert_meta['interval']
+  
+  def collect(self):
+    res = (BaseAlert.RESULT_UNKNOWN, [])
+    try:
+      res = self._collect()
+    except Exception as e:
+      res = (BaseAlert.RESULT_CRITICAL, [str(e)])
+
+    res_base_text = self.alert_source_meta['reporting'][res[0].lower()]['text']
+      
+    data = {}
+    data['name'] = self._find_value('name')
+    data['state'] = res[0]
+    data['text'] = res_base_text.format(*res[1])
+    # data['cluster'] = self._find_value('cluster') # not sure how to get this yet
+    data['service'] = self._find_value('service')
+    data['component'] = self._find_value('component')
+    
+    print str(data)
+  
+  def _find_value(self, meta_key):
+    if self.alert_meta.has_key(meta_key):
+      return self.alert_meta[meta_key]
+    else:
+      return None
+  
+  def _collect(self):
+    '''
+    Low level function to collect alert data.  The result is a tuple as:
+    res[0] = the result code
+    res[1] = the list of arguments supplied to the reporting text for the result code
+    '''  
+    raise NotImplementedError
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
new file mode 100644
index 0000000..165f890
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import re
+import socket
+import time
+import traceback
+from alerts.base_alert import BaseAlert
+from resource_management.libraries.functions.get_port_from_url import get_port_from_url
+
+logger = logging.getLogger()
+
+class PortAlert(BaseAlert):
+
+  def __init__(self, alert_meta, alert_source_meta):
+    super(PortAlert, self).__init__(alert_meta, alert_source_meta)
+    
+    default_port = alert_source_meta['default_port']
+    uri = alert_source_meta['uri']
+    
+    self.port = default_port
+    self.host = get_host_from_url(uri)
+
+    try:
+      self.port = int(get_port_from_url(uri))
+    except:
+      traceback.print_exc()
+      pass
+
+    
+  def _collect(self):
+    s = None
+    try:
+      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+      s.settimeout(1.5)
+      t = time.time()
+      s.connect((self.host, self.port))
+      millis = time.time() - t
+      return (self.RESULT_OK, [millis/1000, self.port])
+    finally:
+      if s is not None:
+        try:
+          s.close()
+        except:
+          pass
+
+'''
+See RFC3986, Appendix B
+Tested on the following cases:
+  "192.168.54.1"
+  "192.168.54.2:7661
+  "hdfs://192.168.54.3/foo/bar"
+  "ftp://192.168.54.4:7842/foo/bar"
+'''    
+def get_host_from_url(uri):
+  # RFC3986, Appendix B
+  parts = re.findall('^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?' , uri)
+
+  # index of parts
+  # scheme    = 1
+  # authority = 3
+  # path      = 4
+  # query     = 6
+  # fragment  = 8
+ 
+  host_and_port = uri
+  if 0 == len(parts[0][1]):
+    host_and_port = parts[0][4]
+  elif 0 == len(parts[0][2]):
+    host_and_port = parts[0][1]
+  elif parts[0][2].startswith("//"):
+    host_and_port = parts[0][3]
+
+  if -1 == host_and_port.find(':'):
+    return host_and_port
+  else:
+    return host_and_port.split(':')[0]
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py
new file mode 100644
index 0000000..71cc53d
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py
@@ -0,0 +1,3 @@
+version_info = (2, 1, 2)
+version = '.'.join(str(n) for n in version_info[:3])
+release = '.'.join(str(n) for n in version_info)

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py
new file mode 100644
index 0000000..80bde8e
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py
@@ -0,0 +1,64 @@
+__all__ = ('EVENT_SCHEDULER_START', 'EVENT_SCHEDULER_SHUTDOWN',
+           'EVENT_JOBSTORE_ADDED', 'EVENT_JOBSTORE_REMOVED',
+           'EVENT_JOBSTORE_JOB_ADDED', 'EVENT_JOBSTORE_JOB_REMOVED',
+           'EVENT_JOB_EXECUTED', 'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED',
+           'EVENT_ALL', 'SchedulerEvent', 'JobStoreEvent', 'JobEvent')
+
+
+EVENT_SCHEDULER_START = 1        # The scheduler was started
+EVENT_SCHEDULER_SHUTDOWN = 2     # The scheduler was shut down
+EVENT_JOBSTORE_ADDED = 4         # A job store was added to the scheduler
+EVENT_JOBSTORE_REMOVED = 8       # A job store was removed from the scheduler
+EVENT_JOBSTORE_JOB_ADDED = 16    # A job was added to a job store
+EVENT_JOBSTORE_JOB_REMOVED = 32  # A job was removed from a job store
+EVENT_JOB_EXECUTED = 64          # A job was executed successfully
+EVENT_JOB_ERROR = 128            # A job raised an exception during execution
+EVENT_JOB_MISSED = 256           # A job's execution was missed
+EVENT_ALL = (EVENT_SCHEDULER_START | EVENT_SCHEDULER_SHUTDOWN |
+             EVENT_JOBSTORE_ADDED | EVENT_JOBSTORE_REMOVED |
+             EVENT_JOBSTORE_JOB_ADDED | EVENT_JOBSTORE_JOB_REMOVED |
+             EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
+
+
+class SchedulerEvent(object):
+    """
+    An event that concerns the scheduler itself.
+
+    :var code: the type code of this event
+    """
+    def __init__(self, code):
+        self.code = code
+
+
+class JobStoreEvent(SchedulerEvent):
+    """
+    An event that concerns job stores.
+
+    :var alias: the alias of the job store involved
+    :var job: the new job if a job was added
+    """
+    def __init__(self, code, alias, job=None):
+        SchedulerEvent.__init__(self, code)
+        self.alias = alias
+        if job:
+            self.job = job
+
+
+class JobEvent(SchedulerEvent):
+    """
+    An event that concerns the execution of individual jobs.
+
+    :var job: the job instance in question
+    :var scheduled_run_time: the time when the job was scheduled to be run
+    :var retval: the return value of the successfully executed job
+    :var exception: the exception raised by the job
+    :var traceback: the traceback object associated with the exception
+    """
+    def __init__(self, code, job, scheduled_run_time, retval=None,
+                 exception=None, traceback=None):
+        SchedulerEvent.__init__(self, code)
+        self.job = job
+        self.scheduled_run_time = scheduled_run_time
+        self.retval = retval
+        self.exception = exception
+        self.traceback = traceback

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py
new file mode 100644
index 0000000..cfc09a2
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py
@@ -0,0 +1,137 @@
+"""
+Jobs represent scheduled tasks.
+"""
+
+from threading import Lock
+from datetime import timedelta
+
+from apscheduler.util import to_unicode, ref_to_obj, get_callable_name,\
+    obj_to_ref
+
+
+class MaxInstancesReachedError(Exception):
+    pass
+
+
+class Job(object):
+    """
+    Encapsulates the actual Job along with its metadata. Job instances
+    are created by the scheduler when adding jobs, and should not be
+    directly instantiated. These options can be set when adding jobs
+    to the scheduler (see :ref:`job_options`).
+
+    :var trigger: trigger that determines the execution times
+    :var func: callable to call when the trigger is triggered
+    :var args: list of positional arguments to call func with
+    :var kwargs: dict of keyword arguments to call func with
+    :var name: name of the job
+    :var misfire_grace_time: seconds after the designated run time that
+        the job is still allowed to be run
+    :var coalesce: run once instead of many times if the scheduler determines
+        that the job should be run more than once in succession
+    :var max_runs: maximum number of times this job is allowed to be
+        triggered
+    :var max_instances: maximum number of concurrently running
+        instances allowed for this job
+    :var runs: number of times this job has been triggered
+    :var instances: number of concurrently running instances of this job
+    """
+    id = None
+    next_run_time = None
+
+    def __init__(self, trigger, func, args, kwargs, misfire_grace_time,
+                 coalesce, name=None, max_runs=None, max_instances=1):
+        if not trigger:
+            raise ValueError('The trigger must not be None')
+        if not hasattr(func, '__call__'):
+            raise TypeError('func must be callable')
+        if not hasattr(args, '__getitem__'):
+            raise TypeError('args must be a list-like object')
+        if not hasattr(kwargs, '__getitem__'):
+            raise TypeError('kwargs must be a dict-like object')
+        if misfire_grace_time <= 0:
+            raise ValueError('misfire_grace_time must be a positive value')
+        if max_runs is not None and max_runs <= 0:
+            raise ValueError('max_runs must be a positive value')
+        if max_instances <= 0:
+            raise ValueError('max_instances must be a positive value')
+
+        self._lock = Lock()
+
+        self.trigger = trigger
+        self.func = func
+        self.args = args
+        self.kwargs = kwargs
+        self.name = to_unicode(name or get_callable_name(func))
+        self.misfire_grace_time = misfire_grace_time
+        self.coalesce = coalesce
+        self.max_runs = max_runs
+        self.max_instances = max_instances
+        self.runs = 0
+        self.instances = 0
+
+    def compute_next_run_time(self, now):
+        if self.runs == self.max_runs:
+            self.next_run_time = None
+        else:
+            self.next_run_time = self.trigger.get_next_fire_time(now)
+
+        return self.next_run_time
+
+    def get_run_times(self, now):
+        """
+        Computes the scheduled run times between ``next_run_time`` and ``now``.
+        """
+        run_times = []
+        run_time = self.next_run_time
+        increment = timedelta(microseconds=1)
+        while ((not self.max_runs or self.runs < self.max_runs) and
+               run_time and run_time <= now):
+            run_times.append(run_time)
+            run_time = self.trigger.get_next_fire_time(run_time + increment)
+
+        return run_times
+
+    def add_instance(self):
+        self._lock.acquire()
+        try:
+            if self.instances == self.max_instances:
+                raise MaxInstancesReachedError
+            self.instances += 1
+        finally:
+            self._lock.release()
+
+    def remove_instance(self):
+        self._lock.acquire()
+        try:
+            assert self.instances > 0, 'Already at 0 instances'
+            self.instances -= 1
+        finally:
+            self._lock.release()
+
+    def __getstate__(self):
+        # Prevents the unwanted pickling of transient or unpicklable variables
+        state = self.__dict__.copy()
+        state.pop('instances', None)
+        state.pop('func', None)
+        state.pop('_lock', None)
+        state['func_ref'] = obj_to_ref(self.func)
+        return state
+
+    def __setstate__(self, state):
+        state['instances'] = 0
+        state['func'] = ref_to_obj(state.pop('func_ref'))
+        state['_lock'] = Lock()
+        self.__dict__ = state
+
+    def __eq__(self, other):
+        if isinstance(other, Job):
+            return self.id is not None and other.id == self.id or self is other
+        return NotImplemented
+
+    def __repr__(self):
+        return '<Job (name=%s, trigger=%s)>' % (self.name, repr(self.trigger))
+
+    def __str__(self):
+        return '%s (trigger: %s, next run at: %s)' % (
+            self.name, str(self.trigger), str(self.next_run_time))

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py
new file mode 100644
index 0000000..f0a16dd
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py
@@ -0,0 +1,25 @@
+"""
+Abstract base class that provides the interface needed by all job stores.
+Job store methods are also documented here.
+"""
+
+
+class JobStore(object):
+    def add_job(self, job):
+        """Adds the given job from this store."""
+        raise NotImplementedError
+
+    def update_job(self, job):
+        """Persists the running state of the given job."""
+        raise NotImplementedError
+
+    def remove_job(self, job):
+        """Removes the given jobs from this store."""
+        raise NotImplementedError
+
+    def load_jobs(self):
+        """Loads jobs from this store into memory."""
+        raise NotImplementedError
+
+    def close(self):
+        """Frees any resources still bound to this job store."""

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py
new file mode 100644
index 0000000..3f522c2
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py
@@ -0,0 +1,84 @@
+"""
+Stores jobs in a MongoDB database.
+"""
+import logging
+
+from apscheduler.jobstores.base import JobStore
+from apscheduler.job import Job
+
+try:
+    import cPickle as pickle
+except ImportError:  # pragma: nocover
+    import pickle
+
+try:
+    from bson.binary import Binary
+    from pymongo.connection import Connection
+except ImportError:  # pragma: nocover
+    raise ImportError('MongoDBJobStore requires PyMongo installed')
+
+logger = logging.getLogger(__name__)
+
+
+class MongoDBJobStore(JobStore):
+    def __init__(self, database='apscheduler', collection='jobs',
+                 connection=None, pickle_protocol=pickle.HIGHEST_PROTOCOL,
+                 **connect_args):
+        self.jobs = []
+        self.pickle_protocol = pickle_protocol
+
+        if not database:
+            raise ValueError('The "database" parameter must not be empty')
+        if not collection:
+            raise ValueError('The "collection" parameter must not be empty')
+
+        if connection:
+            self.connection = connection
+        else:
+            self.connection = Connection(**connect_args)
+
+        self.collection = self.connection[database][collection]
+
+    def add_job(self, job):
+        job_dict = job.__getstate__()
+        job_dict['trigger'] = Binary(pickle.dumps(job.trigger,
+                                                  self.pickle_protocol))
+        job_dict['args'] = Binary(pickle.dumps(job.args,
+                                               self.pickle_protocol))
+        job_dict['kwargs'] = Binary(pickle.dumps(job.kwargs,
+                                                 self.pickle_protocol))
+        job.id = self.collection.insert(job_dict)
+        self.jobs.append(job)
+
+    def remove_job(self, job):
+        self.collection.remove(job.id)
+        self.jobs.remove(job)
+
+    def load_jobs(self):
+        jobs = []
+        for job_dict in self.collection.find():
+            try:
+                job = Job.__new__(Job)
+                job_dict['id'] = job_dict.pop('_id')
+                job_dict['trigger'] = pickle.loads(job_dict['trigger'])
+                job_dict['args'] = pickle.loads(job_dict['args'])
+                job_dict['kwargs'] = pickle.loads(job_dict['kwargs'])
+                job.__setstate__(job_dict)
+                jobs.append(job)
+            except Exception:
+                job_name = job_dict.get('name', '(unknown)')
+                logger.exception('Unable to restore job "%s"', job_name)
+        self.jobs = jobs
+
+    def update_job(self, job):
+        spec = {'_id': job.id}
+        document = {'$set': {'next_run_time': job.next_run_time},
+                    '$inc': {'runs': 1}}
+        self.collection.update(spec, document)
+
+    def close(self):
+        self.connection.disconnect()
+
+    def __repr__(self):
+        connection = self.collection.database.connection
+        return '<%s (connection=%s)>' % (self.__class__.__name__, connection)

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py
new file mode 100644
index 0000000..60458fb
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py
@@ -0,0 +1,25 @@
+"""
+Stores jobs in an array in RAM. Provides no persistence support.
+"""
+
+from apscheduler.jobstores.base import JobStore
+
+
+class RAMJobStore(JobStore):
+    def __init__(self):
+        self.jobs = []
+
+    def add_job(self, job):
+        self.jobs.append(job)
+
+    def update_job(self, job):
+        pass
+
+    def remove_job(self, job):
+        self.jobs.remove(job)
+
+    def load_jobs(self):
+        pass
+
+    def __repr__(self):
+        return '<%s>' % (self.__class__.__name__)

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py
new file mode 100644
index 0000000..5eabf4b
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py
@@ -0,0 +1,91 @@
+"""
+Stores jobs in a Redis database.
+"""
+from uuid import uuid4
+from datetime import datetime
+import logging
+
+from apscheduler.jobstores.base import JobStore
+from apscheduler.job import Job
+
+try:
+    import cPickle as pickle
+except ImportError:  # pragma: nocover
+    import pickle
+
+try:
+    from redis import StrictRedis
+except ImportError:  # pragma: nocover
+    raise ImportError('RedisJobStore requires redis installed')
+
+try:
+    long = long
+except NameError:
+    long = int
+
+logger = logging.getLogger(__name__)
+
+
+class RedisJobStore(JobStore):
+    def __init__(self, db=0, key_prefix='jobs.',
+                 pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
+        self.jobs = []
+        self.pickle_protocol = pickle_protocol
+        self.key_prefix = key_prefix
+
+        if db is None:
+            raise ValueError('The "db" parameter must not be empty')
+        if not key_prefix:
+            raise ValueError('The "key_prefix" parameter must not be empty')
+
+        self.redis = StrictRedis(db=db, **connect_args)
+
+    def add_job(self, job):
+        job.id = str(uuid4())
+        job_state = job.__getstate__()
+        job_dict = {
+            'job_state': pickle.dumps(job_state, self.pickle_protocol),
+            'runs': '0',
+            'next_run_time': job_state.pop('next_run_time').isoformat()}
+        self.redis.hmset(self.key_prefix + job.id, job_dict)
+        self.jobs.append(job)
+
+    def remove_job(self, job):
+        self.redis.delete(self.key_prefix + job.id)
+        self.jobs.remove(job)
+
+    def load_jobs(self):
+        jobs = []
+        keys = self.redis.keys(self.key_prefix + '*')
+        pipeline = self.redis.pipeline()
+        for key in keys:
+            pipeline.hgetall(key)
+        results = pipeline.execute()
+
+        for job_dict in results:
+            job_state = {}
+            try:
+                job = Job.__new__(Job)
+                job_state = pickle.loads(job_dict['job_state'.encode()])
+                job_state['runs'] = long(job_dict['runs'.encode()])
+                dateval = job_dict['next_run_time'.encode()].decode()
+                job_state['next_run_time'] = datetime.strptime(
+                    dateval, '%Y-%m-%dT%H:%M:%S')
+                job.__setstate__(job_state)
+                jobs.append(job)
+            except Exception:
+                job_name = job_state.get('name', '(unknown)')
+                logger.exception('Unable to restore job "%s"', job_name)
+        self.jobs = jobs
+
+    def update_job(self, job):
+        attrs = {
+            'next_run_time': job.next_run_time.isoformat(),
+            'runs': job.runs}
+        self.redis.hmset(self.key_prefix + job.id, attrs)
+
+    def close(self):
+        self.redis.connection_pool.disconnect()
+
+    def __repr__(self):
+        return '<%s>' % self.__class__.__name__

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py
new file mode 100644
index 0000000..d1be58f
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py
@@ -0,0 +1,74 @@
+"""
+Stores jobs in a file governed by the :mod:`shelve` module.
+"""
+
+import shelve
+import pickle
+import random
+import logging
+
+from apscheduler.jobstores.base import JobStore
+from apscheduler.job import Job
+from apscheduler.util import itervalues
+
+logger = logging.getLogger(__name__)
+
+
+class ShelveJobStore(JobStore):
+    MAX_ID = 1000000
+
+    def __init__(self, path, pickle_protocol=pickle.HIGHEST_PROTOCOL):
+        self.jobs = []
+        self.path = path
+        self.pickle_protocol = pickle_protocol
+        self._open_store()
+
+    def _open_store(self):
+        self.store = shelve.open(self.path, 'c', self.pickle_protocol)
+
+    def _generate_id(self):
+        id = None
+        while not id:
+            id = str(random.randint(1, self.MAX_ID))
+            if not id in self.store:
+                return id
+
+    def add_job(self, job):
+        job.id = self._generate_id()
+        self.store[job.id] = job.__getstate__()
+        self.store.close()
+        self._open_store()
+        self.jobs.append(job)
+
+    def update_job(self, job):
+        job_dict = self.store[job.id]
+        job_dict['next_run_time'] = job.next_run_time
+        job_dict['runs'] = job.runs
+        self.store[job.id] = job_dict
+        self.store.close()
+        self._open_store()
+
+    def remove_job(self, job):
+        del self.store[job.id]
+        self.store.close()
+        self._open_store()
+        self.jobs.remove(job)
+
+    def load_jobs(self):
+        jobs = []
+        for job_dict in itervalues(self.store):
+            try:
+                job = Job.__new__(Job)
+                job.__setstate__(job_dict)
+                jobs.append(job)
+            except Exception:
+                job_name = job_dict.get('name', '(unknown)')
+                logger.exception('Unable to restore job "%s"', job_name)
+
+        self.jobs = jobs
+
+    def close(self):
+        self.store.close()
+
+    def __repr__(self):
+        return '<%s (path=%s)>' % (self.__class__.__name__, self.path)

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py
new file mode 100644
index 0000000..5b64a35
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py
@@ -0,0 +1,91 @@
+"""
+Stores jobs in a database table using SQLAlchemy.
+"""
+import pickle
+import logging
+
+import sqlalchemy
+
+from apscheduler.jobstores.base import JobStore
+from apscheduler.job import Job
+
+try:
+    from sqlalchemy import *
+except ImportError:  # pragma: nocover
+    raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')
+
+logger = logging.getLogger(__name__)
+
+
+class SQLAlchemyJobStore(JobStore):
+    def __init__(self, url=None, engine=None, tablename='apscheduler_jobs',
+                 metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL):
+        self.jobs = []
+        self.pickle_protocol = pickle_protocol
+
+        if engine:
+            self.engine = engine
+        elif url:
+            self.engine = create_engine(url)
+        else:
+            raise ValueError('Need either "engine" or "url" defined')
+
+        if sqlalchemy.__version__ < '0.7':
+            pickle_coltype = PickleType(pickle_protocol, mutable=False)
+        else:
+            pickle_coltype = PickleType(pickle_protocol)
+        self.jobs_t = Table(
+            tablename, metadata or MetaData(),
+            Column('id', Integer,
+                   Sequence(tablename + '_id_seq', optional=True),
+                   primary_key=True),
+            Column('trigger', pickle_coltype, nullable=False),
+            Column('func_ref', String(1024), nullable=False),
+            Column('args', pickle_coltype, nullable=False),
+            Column('kwargs', pickle_coltype, nullable=False),
+            Column('name', Unicode(1024)),
+            Column('misfire_grace_time', Integer, nullable=False),
+            Column('coalesce', Boolean, nullable=False),
+            Column('max_runs', Integer),
+            Column('max_instances', Integer),
+            Column('next_run_time', DateTime, nullable=False),
+            Column('runs', BigInteger))
+
+        self.jobs_t.create(self.engine, True)
+
+    def add_job(self, job):
+        job_dict = job.__getstate__()
+        result = self.engine.execute(self.jobs_t.insert().values(**job_dict))
+        job.id = result.inserted_primary_key[0]
+        self.jobs.append(job)
+
+    def remove_job(self, job):
+        delete = self.jobs_t.delete().where(self.jobs_t.c.id == job.id)
+        self.engine.execute(delete)
+        self.jobs.remove(job)
+
+    def load_jobs(self):
+        jobs = []
+        for row in self.engine.execute(select([self.jobs_t])):
+            try:
+                job = Job.__new__(Job)
+                job_dict = dict(row.items())
+                job.__setstate__(job_dict)
+                jobs.append(job)
+            except Exception:
+                job_name = job_dict.get('name', '(unknown)')
+                logger.exception('Unable to restore job "%s"', job_name)
+        self.jobs = jobs
+
+    def update_job(self, job):
+        job_dict = job.__getstate__()
+        update = self.jobs_t.update().where(self.jobs_t.c.id == job.id).\
+            values(next_run_time=job_dict['next_run_time'],
+                   runs=job_dict['runs'])
+        self.engine.execute(update)
+
+    def close(self):
+        self.engine.dispose()
+
+    def __repr__(self):
+        return '<%s (url=%s)>' % (self.__class__.__name__, self.engine.url)

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py
new file mode 100644
index 0000000..319037a
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py
@@ -0,0 +1,607 @@
+"""
+This module is the main part of the library. It houses the Scheduler class
+and related exceptions.
+"""
+
+from threading import Thread, Event, Lock
+from datetime import datetime, timedelta
+from logging import getLogger
+import os
+import sys
+
+from apscheduler.util import *
+from apscheduler.triggers import SimpleTrigger, IntervalTrigger, CronTrigger
+from apscheduler.jobstores.ram_store import RAMJobStore
+from apscheduler.job import Job, MaxInstancesReachedError
+from apscheduler.events import *
+from apscheduler.threadpool import ThreadPool
+
+logger = getLogger(__name__)
+
+
+class SchedulerAlreadyRunningError(Exception):
+    """
+    Raised when attempting to start or configure the scheduler when it's
+    already running.
+    """
+
+    def __str__(self):
+        return 'Scheduler is already running'
+
+
+class Scheduler(object):
+    """
+    This class is responsible for scheduling jobs and triggering
+    their execution.
+    """
+
+    _stopped = True
+    _thread = None
+
+    def __init__(self, gconfig={}, **options):
+        self._wakeup = Event()
+        self._jobstores = {}
+        self._jobstores_lock = Lock()
+        self._listeners = []
+        self._listeners_lock = Lock()
+        self._pending_jobs = []
+        self.configure(gconfig, **options)
+
+    def configure(self, gconfig={}, **options):
+        """
+        Reconfigures the scheduler with the given options. Can only be done
+        when the scheduler isn't running.
+        """
+        if self.running:
+            raise SchedulerAlreadyRunningError
+
+        # Set general options
+        config = combine_opts(gconfig, 'apscheduler.', options)
+        self.misfire_grace_time = int(config.pop('misfire_grace_time', 1))
+        self.coalesce = asbool(config.pop('coalesce', True))
+        self.daemonic = asbool(config.pop('daemonic', True))
+        self.standalone = asbool(config.pop('standalone', False))
+
+        # Configure the thread pool
+        if 'threadpool' in config:
+            self._threadpool = maybe_ref(config['threadpool'])
+        else:
+            threadpool_opts = combine_opts(config, 'threadpool.')
+            self._threadpool = ThreadPool(**threadpool_opts)
+
+        # Configure job stores
+        jobstore_opts = combine_opts(config, 'jobstore.')
+        jobstores = {}
+        for key, value in jobstore_opts.items():
+            store_name, option = key.split('.', 1)
+            opts_dict = jobstores.setdefault(store_name, {})
+            opts_dict[option] = value
+
+        for alias, opts in jobstores.items():
+            classname = opts.pop('class')
+            cls = maybe_ref(classname)
+            jobstore = cls(**opts)
+            self.add_jobstore(jobstore, alias, True)
+
+    def start(self):
+        """
+        Starts the scheduler in a new thread.
+
+        In threaded mode (the default), this method will return immediately
+        after starting the scheduler thread.
+
+        In standalone mode, this method will block until there are no more
+        scheduled jobs.
+        """
+        if self.running:
+            raise SchedulerAlreadyRunningError
+
+        # Create a RAMJobStore as the default if there is no default job store
+        if not 'default' in self._jobstores:
+            self.add_jobstore(RAMJobStore(), 'default', True)
+
+        # Schedule all pending jobs
+        for job, jobstore in self._pending_jobs:
+            self._real_add_job(job, jobstore, False)
+        del self._pending_jobs[:]
+
+        self._stopped = False
+        if self.standalone:
+            self._main_loop()
+        else:
+            self._thread = Thread(target=self._main_loop, name='APScheduler')
+            self._thread.setDaemon(self.daemonic)
+            self._thread.start()
+
+    def shutdown(self, wait=True, shutdown_threadpool=True,
+                 close_jobstores=True):
+        """
+        Shuts down the scheduler and terminates the thread.
+        Does not interrupt any currently running jobs.
+
+        :param wait: ``True`` to wait until all currently executing jobs have
+                     finished (if ``shutdown_threadpool`` is also ``True``)
+        :param shutdown_threadpool: ``True`` to shut down the thread pool
+        :param close_jobstores: ``True`` to close all job stores after shutdown
+        """
+        if not self.running:
+            return
+
+        self._stopped = True
+        self._wakeup.set()
+
+        # Shut down the thread pool
+        if shutdown_threadpool:
+            self._threadpool.shutdown(wait)
+
+        # Wait until the scheduler thread terminates
+        if self._thread:
+            self._thread.join()
+
+        # Close all job stores
+        if close_jobstores:
+            for jobstore in itervalues(self._jobstores):
+                jobstore.close()
+
+    @property
+    def running(self):
+        thread_alive = self._thread and self._thread.isAlive()
+        standalone = getattr(self, 'standalone', False)
+        return not self._stopped and (standalone or thread_alive)
+
+    def add_jobstore(self, jobstore, alias, quiet=False):
+        """
+        Adds a job store to this scheduler.
+
+        :param jobstore: job store to be added
+        :param alias: alias for the job store
+        :param quiet: True to suppress scheduler thread wakeup
+        :type jobstore: instance of
+            :class:`~apscheduler.jobstores.base.JobStore`
+        :type alias: str
+        """
+        self._jobstores_lock.acquire()
+        try:
+            if alias in self._jobstores:
+                raise KeyError('Alias "%s" is already in use' % alias)
+            self._jobstores[alias] = jobstore
+            jobstore.load_jobs()
+        finally:
+            self._jobstores_lock.release()
+
+        # Notify listeners that a new job store has been added
+        self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias))
+
+        # Notify the scheduler so it can scan the new job store for jobs
+        if not quiet:
+            self._wakeup.set()
+
+    def remove_jobstore(self, alias, close=True):
+        """
+        Removes the job store by the given alias from this scheduler.
+
+        :param close: ``True`` to close the job store after removing it
+        :type alias: str
+        """
+        self._jobstores_lock.acquire()
+        try:
+            jobstore = self._jobstores.pop(alias)
+            if not jobstore:
+                raise KeyError('No such job store: %s' % alias)
+        finally:
+            self._jobstores_lock.release()
+
+        # Close the job store if requested
+        if close:
+            jobstore.close()
+
+        # Notify listeners that a job store has been removed
+        self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_REMOVED, alias))
+
+    def add_listener(self, callback, mask=EVENT_ALL):
+        """
+        Adds a listener for scheduler events. When a matching event occurs,
+        ``callback`` is executed with the event object as its sole argument.
+        If the ``mask`` parameter is not provided, the callback will receive
+        events of all types.
+
+        :param callback: any callable that takes one argument
+        :param mask: bitmask that indicates which events should be listened to
+        """
+        self._listeners_lock.acquire()
+        try:
+            self._listeners.append((callback, mask))
+        finally:
+            self._listeners_lock.release()
+
+    def remove_listener(self, callback):
+        """
+        Removes a previously added event listener.
+        """
+        self._listeners_lock.acquire()
+        try:
+            for i, (cb, _) in enumerate(self._listeners):
+                if callback == cb:
+                    del self._listeners[i]
+        finally:
+            self._listeners_lock.release()
+
+    def _notify_listeners(self, event):
+        self._listeners_lock.acquire()
+        try:
+            listeners = tuple(self._listeners)
+        finally:
+            self._listeners_lock.release()
+
+        for cb, mask in listeners:
+            if event.code & mask:
+                try:
+                    cb(event)
+                except:
+                    logger.exception('Error notifying listener')
+
+    def _real_add_job(self, job, jobstore, wakeup):
+        job.compute_next_run_time(datetime.now())
+        if not job.next_run_time:
+            raise ValueError('Not adding job since it would never be run')
+
+        self._jobstores_lock.acquire()
+        try:
+            try:
+                store = self._jobstores[jobstore]
+            except KeyError:
+                raise KeyError('No such job store: %s' % jobstore)
+            store.add_job(job)
+        finally:
+            self._jobstores_lock.release()
+
+        # Notify listeners that a new job has been added
+        event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
+        self._notify_listeners(event)
+
+        logger.info('Added job "%s" to job store "%s"', job, jobstore)
+
+        # Notify the scheduler about the new job
+        if wakeup:
+            self._wakeup.set()
+
+    def add_job(self, trigger, func, args, kwargs, jobstore='default',
+                **options):
+        """
+        Adds the given job to the job list and notifies the scheduler thread.
+        Any extra keyword arguments are passed along to the constructor of the
+        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+
+        :param trigger: trigger that determines when ``func`` is called
+        :param func: callable to run at the given time
+        :param args: list of positional arguments to call func with
+        :param kwargs: dict of keyword arguments to call func with
+        :param jobstore: alias of the job store to store the job in
+        :rtype: :class:`~apscheduler.job.Job`
+        """
+        job = Job(trigger, func, args or [], kwargs or {},
+                  options.pop('misfire_grace_time', self.misfire_grace_time),
+                  options.pop('coalesce', self.coalesce), **options)
+        if not self.running:
+            self._pending_jobs.append((job, jobstore))
+            logger.info('Adding job tentatively -- it will be properly '
+                        'scheduled when the scheduler starts')
+        else:
+            self._real_add_job(job, jobstore, True)
+        return job
+
+    def _remove_job(self, job, alias, jobstore):
+        jobstore.remove_job(job)
+
+        # Notify listeners that a job has been removed
+        event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job)
+        self._notify_listeners(event)
+
+        logger.info('Removed job "%s"', job)
+
+    def add_date_job(self, func, date, args=None, kwargs=None, **options):
+        """
+        Schedules a job to be completed on a specific date and time.
+        Any extra keyword arguments are passed along to the constructor of the
+        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+
+        :param func: callable to run at the given time
+        :param date: the date/time to run the job at
+        :param name: name of the job
+        :param jobstore: stored the job in the named (or given) job store
+        :param misfire_grace_time: seconds after the designated run time that
+            the job is still allowed to be run
+        :type date: :class:`datetime.date`
+        :rtype: :class:`~apscheduler.job.Job`
+        """
+        trigger = SimpleTrigger(date)
+        return self.add_job(trigger, func, args, kwargs, **options)
+
+    def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0,
+                         seconds=0, start_date=None, args=None, kwargs=None,
+                         **options):
+        """
+        Schedules a job to be completed on specified intervals.
+        Any extra keyword arguments are passed along to the constructor of the
+        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+
+        :param func: callable to run
+        :param weeks: number of weeks to wait
+        :param days: number of days to wait
+        :param hours: number of hours to wait
+        :param minutes: number of minutes to wait
+        :param seconds: number of seconds to wait
+        :param start_date: when to first execute the job and start the
+            counter (default is after the given interval)
+        :param args: list of positional arguments to call func with
+        :param kwargs: dict of keyword arguments to call func with
+        :param name: name of the job
+        :param jobstore: alias of the job store to add the job to
+        :param misfire_grace_time: seconds after the designated run time that
+            the job is still allowed to be run
+        :rtype: :class:`~apscheduler.job.Job`
+        """
+        interval = timedelta(weeks=weeks, days=days, hours=hours,
+                             minutes=minutes, seconds=seconds)
+        trigger = IntervalTrigger(interval, start_date)
+        return self.add_job(trigger, func, args, kwargs, **options)
+
+    def add_cron_job(self, func, year=None, month=None, day=None, week=None,
+                     day_of_week=None, hour=None, minute=None, second=None,
+                     start_date=None, args=None, kwargs=None, **options):
+        """
+        Schedules a job to be completed on times that match the given
+        expressions.
+        Any extra keyword arguments are passed along to the constructor of the
+        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+
+        :param func: callable to run
+        :param year: year to run on
+        :param month: month to run on
+        :param day: day of month to run on
+        :param week: week of the year to run on
+        :param day_of_week: weekday to run on (0 = Monday)
+        :param hour: hour to run on
+        :param second: second to run on
+        :param args: list of positional arguments to call func with
+        :param kwargs: dict of keyword arguments to call func with
+        :param name: name of the job
+        :param jobstore: alias of the job store to add the job to
+        :param misfire_grace_time: seconds after the designated run time that
+            the job is still allowed to be run
+        :return: the scheduled job
+        :rtype: :class:`~apscheduler.job.Job`
+        """
+        trigger = CronTrigger(year=year, month=month, day=day, week=week,
+                              day_of_week=day_of_week, hour=hour,
+                              minute=minute, second=second,
+                              start_date=start_date)
+        return self.add_job(trigger, func, args, kwargs, **options)
+
+    def cron_schedule(self, **options):
+        """
+        Decorator version of :meth:`add_cron_job`.
+        This decorator does not wrap its host function.
+        Unscheduling decorated functions is possible by passing the ``job``
+        attribute of the scheduled function to :meth:`unschedule_job`.
+        Any extra keyword arguments are passed along to the constructor of the
+        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+        """
+        def inner(func):
+            func.job = self.add_cron_job(func, **options)
+            return func
+        return inner
+
+    def interval_schedule(self, **options):
+        """
+        Decorator version of :meth:`add_interval_job`.
+        This decorator does not wrap its host function.
+        Unscheduling decorated functions is possible by passing the ``job``
+        attribute of the scheduled function to :meth:`unschedule_job`.
+        Any extra keyword arguments are passed along to the constructor of the
+        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
+        """
+        def inner(func):
+            func.job = self.add_interval_job(func, **options)
+            return func
+        return inner
+
+    def get_jobs(self):
+        """
+        Returns a list of all scheduled jobs.
+
+        :return: list of :class:`~apscheduler.job.Job` objects
+        """
+        self._jobstores_lock.acquire()
+        try:
+            jobs = []
+            for jobstore in itervalues(self._jobstores):
+                jobs.extend(jobstore.jobs)
+            return jobs
+        finally:
+            self._jobstores_lock.release()
+
+    def unschedule_job(self, job):
+        """
+        Removes a job, preventing it from being run any more.
+        """
+        self._jobstores_lock.acquire()
+        try:
+            for alias, jobstore in iteritems(self._jobstores):
+                if job in list(jobstore.jobs):
+                    self._remove_job(job, alias, jobstore)
+                    return
+        finally:
+            self._jobstores_lock.release()
+
+        raise KeyError('Job "%s" is not scheduled in any job store' % job)
+
+    def unschedule_func(self, func):
+        """
+        Removes all jobs that would execute the given function.
+        """
+        found = False
+        self._jobstores_lock.acquire()
+        try:
+            for alias, jobstore in iteritems(self._jobstores):
+                for job in list(jobstore.jobs):
+                    if job.func == func:
+                        self._remove_job(job, alias, jobstore)
+                        found = True
+        finally:
+            self._jobstores_lock.release()
+
+        if not found:
+            raise KeyError('The given function is not scheduled in this '
+                           'scheduler')
+
+    def print_jobs(self, out=None):
+        """
+        Prints out a textual listing of all jobs currently scheduled on this
+        scheduler.
+
+        :param out: a file-like object to print to (defaults to **sys.stdout**
+                    if nothing is given)
+        """
+        out = out or sys.stdout
+        job_strs = []
+        self._jobstores_lock.acquire()
+        try:
+            for alias, jobstore in iteritems(self._jobstores):
+                job_strs.append('Jobstore %s:' % alias)
+                if jobstore.jobs:
+                    for job in jobstore.jobs:
+                        job_strs.append('    %s' % job)
+                else:
+                    job_strs.append('    No scheduled jobs')
+        finally:
+            self._jobstores_lock.release()
+
+        out.write(os.linesep.join(job_strs) + os.linesep)
+
+    def _run_job(self, job, run_times):
+        """
+        Acts as a harness that runs the actual job code in a thread.
+        """
+        for run_time in run_times:
+            # See if the job missed its run time window, and handle possible
+            # misfires accordingly
+            difference = datetime.now() - run_time
+            grace_time = timedelta(seconds=job.misfire_grace_time)
+            if difference > grace_time:
+                # Notify listeners about a missed run
+                event = JobEvent(EVENT_JOB_MISSED, job, run_time)
+                self._notify_listeners(event)
+                logger.warning('Run time of job "%s" was missed by %s',
+                               job, difference)
+            else:
+                try:
+                    job.add_instance()
+                except MaxInstancesReachedError:
+                    event = JobEvent(EVENT_JOB_MISSED, job, run_time)
+                    self._notify_listeners(event)
+                    logger.warning('Execution of job "%s" skipped: '
+                                   'maximum number of running instances '
+                                   'reached (%d)', job, job.max_instances)
+                    break
+
+                logger.info('Running job "%s" (scheduled at %s)', job,
+                            run_time)
+
+                try:
+                    retval = job.func(*job.args, **job.kwargs)
+                except:
+                    # Notify listeners about the exception
+                    exc, tb = sys.exc_info()[1:]
+                    event = JobEvent(EVENT_JOB_ERROR, job, run_time,
+                                     exception=exc, traceback=tb)
+                    self._notify_listeners(event)
+
+                    logger.exception('Job "%s" raised an exception', job)
+                else:
+                    # Notify listeners about successful execution
+                    event = JobEvent(EVENT_JOB_EXECUTED, job, run_time,
+                                     retval=retval)
+                    self._notify_listeners(event)
+
+                    logger.info('Job "%s" executed successfully', job)
+
+                job.remove_instance()
+
+                # If coalescing is enabled, don't attempt any further runs
+                if job.coalesce:
+                    break
+
+    def _process_jobs(self, now):
+        """
+        Iterates through jobs in every jobstore, starts pending jobs
+        and figures out the next wakeup time.
+        """
+        next_wakeup_time = None
+        self._jobstores_lock.acquire()
+        try:
+            for alias, jobstore in iteritems(self._jobstores):
+                for job in tuple(jobstore.jobs):
+                    run_times = job.get_run_times(now)
+                    if run_times:
+                        self._threadpool.submit(self._run_job, job, run_times)
+
+                        # Increase the job's run count
+                        if job.coalesce:
+                            job.runs += 1
+                        else:
+                            job.runs += len(run_times)
+
+                        # Update the job, but don't keep finished jobs around
+                        if job.compute_next_run_time(
+                                now + timedelta(microseconds=1)):
+                            jobstore.update_job(job)
+                        else:
+                            self._remove_job(job, alias, jobstore)
+
+                    if not next_wakeup_time:
+                        next_wakeup_time = job.next_run_time
+                    elif job.next_run_time:
+                        next_wakeup_time = min(next_wakeup_time,
+                                               job.next_run_time)
+            return next_wakeup_time
+        finally:
+            self._jobstores_lock.release()
+
+    def _main_loop(self):
+        """Executes jobs on schedule."""
+
+        logger.info('Scheduler started')
+        self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START))
+
+        self._wakeup.clear()
+        while not self._stopped:
+            logger.debug('Looking for jobs to run')
+            now = datetime.now()
+            next_wakeup_time = self._process_jobs(now)
+
+            # Sleep until the next job is scheduled to be run,
+            # a new job is added or the scheduler is stopped
+            if next_wakeup_time is not None:
+                wait_seconds = time_difference(next_wakeup_time, now)
+                logger.debug('Next wakeup is due at %s (in %f seconds)',
+                             next_wakeup_time, wait_seconds)
+                try:
+                    self._wakeup.wait(wait_seconds)
+                except IOError:  # Catch errno 514 on some Linux kernels
+                    pass
+                self._wakeup.clear()
+            elif self.standalone:
+                logger.debug('No jobs left; shutting down scheduler')
+                self.shutdown()
+                break
+            else:
+                logger.debug('No jobs; waiting until a job is added')
+                try:
+                    self._wakeup.wait()
+                except IOError:  # Catch errno 514 on some Linux kernels
+                    pass
+                self._wakeup.clear()
+
+        logger.info('Scheduler has been shut down')
+        self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
new file mode 100644
index 0000000..8ec47da
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
@@ -0,0 +1,133 @@
+"""
+Generic thread pool class. Modeled after Java's ThreadPoolExecutor.
+Please note that this ThreadPool does *not* fully implement the PEP 3148
+ThreadPool!
+"""
+
+from threading import Thread, Lock, currentThread
+from weakref import ref
+import logging
+import atexit
+
+try:
+    from queue import Queue, Empty
+except ImportError:
+    from Queue import Queue, Empty
+
+logger = logging.getLogger(__name__)
+_threadpools = set()
+
+
+# Worker threads are daemonic in order to let the interpreter exit without
+# an explicit shutdown of the thread pool. The following trick is necessary
+# to allow worker threads to finish cleanly.
+def _shutdown_all():
+    for pool_ref in tuple(_threadpools):
+        pool = pool_ref()
+        if pool:
+            pool.shutdown()
+
+atexit.register(_shutdown_all)
+
+
+class ThreadPool(object):
+    def __init__(self, core_threads=0, max_threads=20, keepalive=1):
+        """
+        :param core_threads: maximum number of persistent threads in the pool
+        :param max_threads: maximum number of total threads in the pool
+        :param thread_class: callable that creates a Thread object
+        :param keepalive: seconds to keep non-core worker threads waiting
+            for new tasks
+        """
+        self.core_threads = core_threads
+        self.max_threads = max(max_threads, core_threads, 1)
+        self.keepalive = keepalive
+        self._queue = Queue()
+        self._threads_lock = Lock()
+        self._threads = set()
+        self._shutdown = False
+
+        _threadpools.add(ref(self))
+        logger.info('Started thread pool with %d core threads and %s maximum '
+                    'threads', core_threads, max_threads or 'unlimited')
+
+    def _adjust_threadcount(self):
+        self._threads_lock.acquire()
+        try:
+            if self.num_threads < self.max_threads:
+                self._add_thread(self.num_threads < self.core_threads)
+        finally:
+            self._threads_lock.release()
+
+    def _add_thread(self, core):
+        t = Thread(target=self._run_jobs, args=(core,))
+        t.setDaemon(True)
+        t.start()
+        self._threads.add(t)
+
+    def _run_jobs(self, core):
+        logger.debug('Started worker thread')
+        block = True
+        timeout = None
+        if not core:
+            block = self.keepalive > 0
+            timeout = self.keepalive
+
+        while True:
+            try:
+                func, args, kwargs = self._queue.get(block, timeout)
+            except Empty:
+                break
+
+            if self._shutdown:
+                break
+
+            try:
+                func(*args, **kwargs)
+            except:
+                logger.exception('Error in worker thread')
+
+        self._threads_lock.acquire()
+        self._threads.remove(currentThread())
+        self._threads_lock.release()
+
+        logger.debug('Exiting worker thread')
+
+    @property
+    def num_threads(self):
+        return len(self._threads)
+
+    def submit(self, func, *args, **kwargs):
+        if self._shutdown:
+            raise RuntimeError('Cannot schedule new tasks after shutdown')
+
+        self._queue.put((func, args, kwargs))
+        self._adjust_threadcount()
+
+    def shutdown(self, wait=True):
+        if self._shutdown:
+            return
+
+        logging.info('Shutting down thread pool')
+        self._shutdown = True
+        _threadpools.remove(ref(self))
+
+        self._threads_lock.acquire()
+        for _ in range(self.num_threads):
+            self._queue.put((None, None, None))
+        self._threads_lock.release()
+
+        if wait:
+            self._threads_lock.acquire()
+            threads = tuple(self._threads)
+            self._threads_lock.release()
+            for thread in threads:
+                thread.join()
+
+    def __repr__(self):
+        if self.max_threads:
+            threadcount = '%d/%d' % (self.num_threads, self.max_threads)
+        else:
+            threadcount = '%d' % self.num_threads
+
+        return '<ThreadPool at %x; threads=%s>' % (id(self), threadcount)

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py
new file mode 100644
index 0000000..74a9788
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py
@@ -0,0 +1,3 @@
+from apscheduler.triggers.cron import CronTrigger
+from apscheduler.triggers.interval import IntervalTrigger
+from apscheduler.triggers.simple import SimpleTrigger

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py
new file mode 100644
index 0000000..9e69f72
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py
@@ -0,0 +1,144 @@
+from datetime import date, datetime
+
+from apscheduler.triggers.cron.fields import *
+from apscheduler.util import datetime_ceil, convert_to_datetime, iteritems
+
+
+class CronTrigger(object):
+    FIELD_NAMES = ('year', 'month', 'day', 'week', 'day_of_week', 'hour',
+                   'minute', 'second')
+    FIELDS_MAP = {'year': BaseField,
+                  'month': BaseField,
+                  'week': WeekField,
+                  'day': DayOfMonthField,
+                  'day_of_week': DayOfWeekField,
+                  'hour': BaseField,
+                  'minute': BaseField,
+                  'second': BaseField}
+
+    def __init__(self, **values):
+        self.start_date = values.pop('start_date', None)
+        if self.start_date:
+            self.start_date = convert_to_datetime(self.start_date)
+
+        # Check field names and yank out all None valued fields
+        for key, value in list(iteritems(values)):
+            if key not in self.FIELD_NAMES:
+                raise TypeError('Invalid field name: %s' % key)
+            if value is None:
+                del values[key]
+
+        self.fields = []
+        assign_defaults = False
+        for field_name in self.FIELD_NAMES:
+            if field_name in values:
+                exprs = values.pop(field_name)
+                is_default = False
+                assign_defaults = not values
+            elif assign_defaults:
+                exprs = DEFAULT_VALUES[field_name]
+                is_default = True
+            else:
+                exprs = '*'
+                is_default = True
+
+            field_class = self.FIELDS_MAP[field_name]
+            field = field_class(field_name, exprs, is_default)
+            self.fields.append(field)
+
+    def _increment_field_value(self, dateval, fieldnum):
+        """
+        Increments the designated field and resets all less significant fields
+        to their minimum values.
+
+        :type dateval: datetime
+        :type fieldnum: int
+        :type amount: int
+        :rtype: tuple
+        :return: a tuple containing the new date, and the number of the field
+                 that was actually incremented
+        """
+        i = 0
+        values = {}
+        while i < len(self.fields):
+            field = self.fields[i]
+            if not field.REAL:
+                if i == fieldnum:
+                    fieldnum -= 1
+                    i -= 1
+                else:
+                    i += 1
+                continue
+
+            if i < fieldnum:
+                values[field.name] = field.get_value(dateval)
+                i += 1
+            elif i > fieldnum:
+                values[field.name] = field.get_min(dateval)
+                i += 1
+            else:
+                value = field.get_value(dateval)
+                maxval = field.get_max(dateval)
+                if value == maxval:
+                    fieldnum -= 1
+                    i -= 1
+                else:
+                    values[field.name] = value + 1
+                    i += 1
+
+        return datetime(**values), fieldnum
+
+    def _set_field_value(self, dateval, fieldnum, new_value):
+        values = {}
+        for i, field in enumerate(self.fields):
+            if field.REAL:
+                if i < fieldnum:
+                    values[field.name] = field.get_value(dateval)
+                elif i > fieldnum:
+                    values[field.name] = field.get_min(dateval)
+                else:
+                    values[field.name] = new_value
+
+        return datetime(**values)
+
+    def get_next_fire_time(self, start_date):
+        if self.start_date:
+            start_date = max(start_date, self.start_date)
+        next_date = datetime_ceil(start_date)
+        fieldnum = 0
+        while 0 <= fieldnum < len(self.fields):
+            field = self.fields[fieldnum]
+            curr_value = field.get_value(next_date)
+            next_value = field.get_next_value(next_date)
+
+            if next_value is None:
+                # No valid value was found
+                next_date, fieldnum = self._increment_field_value(
+                    next_date, fieldnum - 1)
+            elif next_value > curr_value:
+                # A valid, but higher than the starting value, was found
+                if field.REAL:
+                    next_date = self._set_field_value(
+                        next_date, fieldnum, next_value)
+                    fieldnum += 1
+                else:
+                    next_date, fieldnum = self._increment_field_value(
+                        next_date, fieldnum)
+            else:
+                # A valid value was found, no changes necessary
+                fieldnum += 1
+
+        if fieldnum >= 0:
+            return next_date
+
+    def __str__(self):
+        options = ["%s='%s'" % (f.name, str(f)) for f in self.fields
+                   if not f.is_default]
+        return 'cron[%s]' % (', '.join(options))
+
+    def __repr__(self):
+        options = ["%s='%s'" % (f.name, str(f)) for f in self.fields
+                   if not f.is_default]
+        if self.start_date:
+            options.append("start_date='%s'" % self.start_date.isoformat(' '))
+        return '<%s (%s)>' % (self.__class__.__name__, ', '.join(options))

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py
new file mode 100644
index 0000000..b5d2919
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py
@@ -0,0 +1,194 @@
+"""
+This module contains the expressions applicable for CronTrigger's fields.
+"""
+
+from calendar import monthrange
+import re
+
+from apscheduler.util import asint
+
+__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression',
+           'WeekdayPositionExpression', 'LastDayOfMonthExpression')
+
+
+WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
+
+
+class AllExpression(object):
+    value_re = re.compile(r'\*(?:/(?P<step>\d+))?$')
+
+    def __init__(self, step=None):
+        self.step = asint(step)
+        if self.step == 0:
+            raise ValueError('Increment must be higher than 0')
+
+    def get_next_value(self, date, field):
+        start = field.get_value(date)
+        minval = field.get_min(date)
+        maxval = field.get_max(date)
+        start = max(start, minval)
+
+        if not self.step:
+            next = start
+        else:
+            distance_to_next = (self.step - (start - minval)) % self.step
+            next = start + distance_to_next
+
+        if next <= maxval:
+            return next
+
+    def __str__(self):
+        if self.step:
+            return '*/%d' % self.step
+        return '*'
+
+    def __repr__(self):
+        return "%s(%s)" % (self.__class__.__name__, self.step)
+
+
+class RangeExpression(AllExpression):
+    value_re = re.compile(
+        r'(?P<first>\d+)(?:-(?P<last>\d+))?(?:/(?P<step>\d+))?$')
+
+    def __init__(self, first, last=None, step=None):
+        AllExpression.__init__(self, step)
+        first = asint(first)
+        last = asint(last)
+        if last is None and step is None:
+            last = first
+        if last is not None and first > last:
+            raise ValueError('The minimum value in a range must not be '
+                             'higher than the maximum')
+        self.first = first
+        self.last = last
+
+    def get_next_value(self, date, field):
+        start = field.get_value(date)
+        minval = field.get_min(date)
+        maxval = field.get_max(date)
+
+        # Apply range limits
+        minval = max(minval, self.first)
+        if self.last is not None:
+            maxval = min(maxval, self.last)
+        start = max(start, minval)
+
+        if not self.step:
+            next = start
+        else:
+            distance_to_next = (self.step - (start - minval)) % self.step
+            next = start + distance_to_next
+
+        if next <= maxval:
+            return next
+
+    def __str__(self):
+        if self.last != self.first and self.last is not None:
+            range = '%d-%d' % (self.first, self.last)
+        else:
+            range = str(self.first)
+
+        if self.step:
+            return '%s/%d' % (range, self.step)
+        return range
+
+    def __repr__(self):
+        args = [str(self.first)]
+        if self.last != self.first and self.last is not None or self.step:
+            args.append(str(self.last))
+        if self.step:
+            args.append(str(self.step))
+        return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
+
+
+class WeekdayRangeExpression(RangeExpression):
+    value_re = re.compile(r'(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?',
+                          re.IGNORECASE)
+
+    def __init__(self, first, last=None):
+        try:
+            first_num = WEEKDAYS.index(first.lower())
+        except ValueError:
+            raise ValueError('Invalid weekday name "%s"' % first)
+
+        if last:
+            try:
+                last_num = WEEKDAYS.index(last.lower())
+            except ValueError:
+                raise ValueError('Invalid weekday name "%s"' % last)
+        else:
+            last_num = None
+
+        RangeExpression.__init__(self, first_num, last_num)
+
+    def __str__(self):
+        if self.last != self.first and self.last is not None:
+            return '%s-%s' % (WEEKDAYS[self.first], WEEKDAYS[self.last])
+        return WEEKDAYS[self.first]
+
+    def __repr__(self):
+        args = ["'%s'" % WEEKDAYS[self.first]]
+        if self.last != self.first and self.last is not None:
+            args.append("'%s'" % WEEKDAYS[self.last])
+        return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
+
+
+class WeekdayPositionExpression(AllExpression):
+    options = ['1st', '2nd', '3rd', '4th', '5th', 'last']
+    value_re = re.compile(r'(?P<option_name>%s) +(?P<weekday_name>(?:\d+|\w+))'
+                          % '|'.join(options), re.IGNORECASE)
+
+    def __init__(self, option_name, weekday_name):
+        try:
+            self.option_num = self.options.index(option_name.lower())
+        except ValueError:
+            raise ValueError('Invalid weekday position "%s"' % option_name)
+
+        try:
+            self.weekday = WEEKDAYS.index(weekday_name.lower())
+        except ValueError:
+            raise ValueError('Invalid weekday name "%s"' % weekday_name)
+
+    def get_next_value(self, date, field):
+        # Figure out the weekday of the month's first day and the number
+        # of days in that month
+        first_day_wday, last_day = monthrange(date.year, date.month)
+
+        # Calculate which day of the month is the first of the target weekdays
+        first_hit_day = self.weekday - first_day_wday + 1
+        if first_hit_day <= 0:
+            first_hit_day += 7
+
+        # Calculate what day of the month the target weekday would be
+        if self.option_num < 5:
+            target_day = first_hit_day + self.option_num * 7
+        else:
+            target_day = first_hit_day + ((last_day - first_hit_day) / 7) * 7
+
+        if target_day <= last_day and target_day >= date.day:
+            return target_day
+
+    def __str__(self):
+        return '%s %s' % (self.options[self.option_num],
+                          WEEKDAYS[self.weekday])
+
+    def __repr__(self):
+        return "%s('%s', '%s')" % (self.__class__.__name__,
+                                   self.options[self.option_num],
+                                   WEEKDAYS[self.weekday])
+
+
+class LastDayOfMonthExpression(AllExpression):
+    value_re = re.compile(r'last', re.IGNORECASE)
+
+    def __init__(self):
+        pass
+
+    def get_next_value(self, date, field):
+        return monthrange(date.year, date.month)[1]
+
+    def __str__(self):
+        return 'last'
+
+    def __repr__(self):
+        return "%s()" % self.__class__.__name__

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py
new file mode 100644
index 0000000..be5e5e3
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py
@@ -0,0 +1,100 @@
+"""
+Fields represent CronTrigger options which map to :class:`~datetime.datetime`
+fields.
+"""
+
+from calendar import monthrange
+
+from apscheduler.triggers.cron.expressions import *
+
+__all__ = ('MIN_VALUES', 'MAX_VALUES', 'DEFAULT_VALUES', 'BaseField',
+           'WeekField', 'DayOfMonthField', 'DayOfWeekField')
+
+
+MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1,
+              'day_of_week': 0, 'hour': 0, 'minute': 0, 'second': 0}
+MAX_VALUES = {'year': 2 ** 63, 'month': 12, 'day:': 31, 'week': 53,
+              'day_of_week': 6, 'hour': 23, 'minute': 59, 'second': 59}
+DEFAULT_VALUES = {'year': '*', 'month': 1, 'day': 1, 'week': '*',
+                  'day_of_week': '*', 'hour': 0, 'minute': 0, 'second': 0}
+
+
+class BaseField(object):
+    REAL = True
+    COMPILERS = [AllExpression, RangeExpression]
+
+    def __init__(self, name, exprs, is_default=False):
+        self.name = name
+        self.is_default = is_default
+        self.compile_expressions(exprs)
+
+    def get_min(self, dateval):
+        return MIN_VALUES[self.name]
+
+    def get_max(self, dateval):
+        return MAX_VALUES[self.name]
+
+    def get_value(self, dateval):
+        return getattr(dateval, self.name)
+
+    def get_next_value(self, dateval):
+        smallest = None
+        for expr in self.expressions:
+            value = expr.get_next_value(dateval, self)
+            if smallest is None or (value is not None and value < smallest):
+                smallest = value
+
+        return smallest
+
+    def compile_expressions(self, exprs):
+        self.expressions = []
+
+        # Split a comma-separated expression list, if any
+        exprs = str(exprs).strip()
+        if ',' in exprs:
+            for expr in exprs.split(','):
+                self.compile_expression(expr)
+        else:
+            self.compile_expression(exprs)
+
+    def compile_expression(self, expr):
+        for compiler in self.COMPILERS:
+            match = compiler.value_re.match(expr)
+            if match:
+                compiled_expr = compiler(**match.groupdict())
+                self.expressions.append(compiled_expr)
+                return
+
+        raise ValueError('Unrecognized expression "%s" for field "%s"' %
+                         (expr, self.name))
+
+    def __str__(self):
+        expr_strings = (str(e) for e in self.expressions)
+        return ','.join(expr_strings)
+
+    def __repr__(self):
+        return "%s('%s', '%s')" % (self.__class__.__name__, self.name,
+                                   str(self))
+
+
+class WeekField(BaseField):
+    REAL = False
+
+    def get_value(self, dateval):
+        return dateval.isocalendar()[1]
+
+
+class DayOfMonthField(BaseField):
+    COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression,
+                                       LastDayOfMonthExpression]
+
+    def get_max(self, dateval):
+        return monthrange(dateval.year, dateval.month)[1]
+
+
+class DayOfWeekField(BaseField):
+    REAL = False
+    COMPILERS = BaseField.COMPILERS + [WeekdayRangeExpression]
+
+    def get_value(self, dateval):
+        return dateval.weekday()

http://git-wip-us.apache.org/repos/asf/ambari/blob/9e529edd/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py
new file mode 100644
index 0000000..dd16d77
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py
@@ -0,0 +1,39 @@
+from datetime import datetime, timedelta
+from math import ceil
+
+from apscheduler.util import convert_to_datetime, timedelta_seconds
+
+
+class IntervalTrigger(object):
+    def __init__(self, interval, start_date=None):
+        if not isinstance(interval, timedelta):
+            raise TypeError('interval must be a timedelta')
+        if start_date:
+            start_date = convert_to_datetime(start_date)
+
+        self.interval = interval
+        self.interval_length = timedelta_seconds(self.interval)
+        if self.interval_length == 0:
+            self.interval = timedelta(seconds=1)
+            self.interval_length = 1
+
+        if start_date is None:
+            self.start_date = datetime.now() + self.interval
+        else:
+            self.start_date = convert_to_datetime(start_date)
+
+    def get_next_fire_time(self, start_date):
+        if start_date < self.start_date:
+            return self.start_date
+
+        timediff_seconds = timedelta_seconds(start_date - self.start_date)
+        next_interval_num = int(ceil(timediff_seconds / self.interval_length))
+        return self.start_date + self.interval * next_interval_num
+
+    def __str__(self):
+        return 'interval[%s]' % str(self.interval)
+
+    def __repr__(self):
+        return "<%s (interval=%s, start_date=%s)>" % (
+            self.__class__.__name__, repr(self.interval),
+            repr(self.start_date))