You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/04/01 07:08:00 UTC

[16/17] ambari git commit: AMBARI-20645. Integrate coilmq stomp server as a mock server for ambari-agent unittests. (aonishuk)

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/start.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/start.py b/ambari-common/src/test/python/coilmq/start.py
new file mode 100644
index 0000000..f894bab
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/start.py
@@ -0,0 +1,226 @@
+#!python
+"""
+Entrypoint for starting the application.
+"""
+import os
+import logging
+
+
+import time
+import threading
+from contextlib import contextmanager
+
+is_nt = os.name == 'nt'
+
+if not is_nt:
+    import daemon as pydaemon
+    import pid
+else:
+    pydaemon = pid = None
+
+import click
+
+from coilmq.config import config as global_config, init_config, init_logging, resolve_name
+from coilmq.protocol import STOMP11
+from coilmq.topic import TopicManager
+from coilmq.queue import QueueManager
+from coilmq.server.socket_server import ThreadedStompServer
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+logger = logging.getLogger(__name__)
+
+
+def server_from_config(config=None, server_class=None, additional_kwargs=None):
+    """
+    Gets a configured L{coilmq.server.StompServer} from specified config.
+
+    If `config` is None, global L{coilmq.config.config} var will be used instead.
+
+    The `server_class` and `additional_kwargs` are primarily hooks for using this method
+    from a testing environment.
+
+    @param config: A C{ConfigParser.ConfigParser} instance with loaded config values.
+    @type config: C{ConfigParser.ConfigParser}
+
+    @param server_class: Which class to use for the server.  (This doesn't come from config currently.)
+    @type server_class: C{class}
+
+    @param additional_kwargs: Any additional args that should be passed to class.
+    @type additional_kwargs: C{list}
+
+    @return: The configured StompServer.
+    @rtype: L{coilmq.server.StompServer}
+    """
+    global global_config
+    if not config:
+        config = global_config
+
+    queue_store_factory = resolve_name(config.get('coilmq', 'qstore.factory'))
+    subscriber_scheduler_factory = resolve_name(config.get(
+            'coilmq', 'scheduler.subscriber_priority_factory'))
+    queue_scheduler_factory = resolve_name(config.get(
+            'coilmq', 'scheduler.queue_priority_factory'))
+
+    if config.has_option('coilmq', 'auth.factory'):
+        authenticator_factory = resolve_name(
+                config.get('coilmq', 'auth.factory'))
+        authenticator = authenticator_factory()
+    else:
+        authenticator = None
+
+    server = ThreadedStompServer((config.get('coilmq', 'listen_addr'), config.getint('coilmq', 'listen_port')),
+                                 queue_manager=QueueManager(store=queue_store_factory(),
+                                                            subscriber_scheduler=subscriber_scheduler_factory(),
+                                                            queue_scheduler=queue_scheduler_factory()),
+                                 topic_manager=TopicManager(),
+                                 authenticator=authenticator,
+                                 protocol=STOMP11)
+    logger.info("Created server:%r" % server)
+    return server
+
+
+def context_serve(context, configfile, listen_addr, listen_port, logfile,
+                  debug, daemon, uid, gid, pidfile, umask, rundir):
+    """
+    Takes a context object, which implements the __enter__/__exit__ "with" interface 
+    and starts a server within that context.
+
+    This method is a refactored single-place for handling the server-run code whether
+    running in daemon or non-daemon mode.  It is invoked with a dummy (passthrough) 
+    context object for the non-daemon use case. 
+
+    @param options: The compiled collection of options that need to be parsed. 
+    @type options: C{ConfigParser}
+
+    @param context: The context object that implements __enter__/__exit__ "with" methods.
+    @type context: C{object}
+
+    @raise Exception: Any underlying exception will be logged but then re-raised.
+    @see: server_from_config()
+    """
+    global global_config
+
+    server = None
+    try:
+        with context:
+            # There's a possibility here that init_logging() will throw an exception.  If it does,
+            # AND we're in a daemon context, then we're not going to be able to do anything with it.
+            # We've got no stderr/stdout here; and so (to my knowledge) no reliable (& cross-platform),
+            # way to display errors.
+            level = logging.DEBUG if debug else logging.INFO
+            init_logging(logfile=logfile, loglevel=level,
+                         configfile=configfile)
+
+            server = server_from_config()
+            logger.info("Stomp server listening on %s:%s" % server.server_address)
+
+            if debug:
+                poll_interval = float(global_config.get(
+                        'coilmq', 'debug.stats_poll_interval'))
+                if poll_interval:  # Setting poll_interval to 0 effectively disables it.
+                    def diagnostic_loop(server):
+                        log = logger
+                        while True:
+                            log.debug(
+                                    "Stats heartbeat -------------------------------")
+                            store = server.queue_manager.store
+                            for dest in store.destinations():
+                                log.debug("Queue %s: size=%s, subscribers=%s" % (
+                                    dest, store.size(dest), server.queue_manager.subscriber_count(dest)))
+
+                            # TODO: Add number of subscribers?
+
+                            time.sleep(poll_interval)
+
+                    diagnostic_thread = threading.Thread(
+                            target=diagnostic_loop, name='DiagnosticThread', args=(server,))
+                    diagnostic_thread.daemon = True
+                    diagnostic_thread.start()
+
+            server.serve_forever()
+
+    except (KeyboardInterrupt, SystemExit):
+        logger.info("Stomp server stopped by user interrupt.")
+        raise SystemExit()
+    except Exception as e:
+        logger.error("Stomp server stopped due to error: %s" % e)
+        logger.exception(e)
+        raise SystemExit()
+    finally:
+        if server:
+            server.server_close()
+
+
+def _main(config=None, host=None, port=None, logfile=None, debug=None,
+          daemon=None, uid=None, gid=None, pidfile=None, umask=None, rundir=None):
+
+    # Note that we must initialize the configuration before we enter the context
+    # block; however, we _cannot_ initialize logging until we are in the context block
+    # (so we defer that until the context_serve call.)
+    init_config(config)
+
+    if host is not None:
+        global_config.set('coilmq', 'listen_addr', host)
+
+    if port is not None:
+        global_config.set('coilmq', 'listen_port', str(port))
+
+        if daemon and is_nt:
+            warnings.warn("Daemon context is not available for NT platform")
+
+    # in an on-daemon mode, we use a dummy context objectx
+    # so we can use the same run-server code as the daemon version.
+    context = pydaemon.DaemonContext(uid=uid,
+                                     gid=gid,
+                                     pidfile=pid.PidFile(pidname=pidfile) if pidfile else None,
+                                     umask=int(umask, 8),
+                                     working_directory=rundir) if daemon and pydaemon else contextmanager(lambda: (yield))()
+
+    context_serve(context, config, host, port, logfile, debug, daemon, uid, gid, pidfile, umask, rundir)
+
+
+@click.command()
+@click.option("-c", "--config", help="Read configuration from FILE. (CLI options override config file.)", metavar="FILE")
+@click.option("-b", "--host", help="Listen on specified address (default 127.0.0.1)", metavar="ADDR")
+@click.option("-p", "--port", help="Listen on specified port (default 61613)", type=int, metavar="PORT")
+@click.option("-l", "--logfile", help="Log to specified file (unless logging configured in config file).", metavar="FILE")
+@click.option("--debug", default=False, help="Sets logging to debug (unless logging configured in config file).")
+@click.option("-d", "--daemon", default=False, help="Run server as a daemon (default False).")
+@click.option("-u", "--uid", help="The user/UID to use for daemon process.", metavar="UID")
+@click.option("-g", "--gid", help="The group/GID to use for daemon process.", metavar="GID")
+@click.option("--pidfile",   help="The PID file to use.", metavar="FILE")
+@click.option("--umask", help="Umask (octal) to apply for daemonized process.", metavar="MASK")
+@click.option('--rundir', help="The working directory to use for the daemonized process (default /).", metavar="DIR")
+def main(config, host, port, logfile, debug, daemon, uid, gid, pidfile, umask, rundir):
+    """
+    Main entry point for running a socket server from the commandline.
+
+    This method will read in options from the commandline and call the L{config.init_config} method
+    to get everything setup.  Then, depending on whether deamon mode was specified or not, 
+    the process may be forked (or not) and the server will be started.
+    """
+
+    _main(**locals())
+
+
+if __name__ == '__main__':
+    try:
+        main()
+    except (KeyboardInterrupt, SystemExit):
+        pass
+    except Exception as e:
+        logger.error("Server terminated due to error: %s" % e)
+        logger.exception(e)

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/__init__.py b/ambari-common/src/test/python/coilmq/store/__init__.py
new file mode 100644
index 0000000..a0daf9d
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/__init__.py
@@ -0,0 +1,189 @@
+"""
+Storage containers for durable queues and (planned) durable topics.
+"""
+import abc
+import logging
+import threading
+
+from coilmq.util.concurrency import synchronized
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+class QueueStore(object):
+    """
+    Abstract base class for queue storage. 
+
+    Extensions/implementations of this class must be thread-safe.
+
+    @ivar log: A logger for this class.
+    @type log: C{logging.Logger}
+    """
+    __metaclass__ = abc.ABCMeta
+
+    def __init__(self):
+        """
+        A base constructor that sets up logging.
+
+        If you extend this class, you should either call this method or at minimum make sure these values
+        get set.
+        """
+        self.log = logging.getLogger('%s.%s' % (
+            self.__module__, self.__class__.__name__))
+
+    @abc.abstractmethod
+    @synchronized(lock)
+    def enqueue(self, destination, frame):
+        """
+        Store message (frame) for specified destinationination.
+
+        @param destination: The destinationination queue name for this message (frame).
+        @type destination: C{str}
+
+        @param frame: The message (frame) to send to specified destinationination.
+        @type frame: C{stompclient.frame.Frame}
+        """
+
+    @abc.abstractmethod
+    @synchronized(lock)
+    def dequeue(self, destination):
+        """
+        Removes and returns an item from the queue (or C{None} if no items in queue).
+
+        @param destination: The queue name (destinationination).
+        @type destination: C{str}
+
+        @return: The first frame in the specified queue, or C{None} if there are none.
+        @rtype: C{stompclient.frame.Frame} 
+        """
+
+    @synchronized(lock)
+    def requeue(self, destination, frame):
+        """
+        Requeue a message (frame) for storing at specified destinationination.
+
+        @param destination: The destinationination queue name for this message (frame).
+        @type destination: C{str}
+
+        @param frame: The message (frame) to send to specified destinationination.
+        @type frame: C{stompclient.frame.Frame}
+        """
+        self.enqueue(destination, frame)
+
+    @synchronized(lock)
+    def size(self, destination):
+        """
+        Size of the queue for specified destination.
+
+        @param destination: The queue destination (e.g. /queue/foo)
+        @type destination: C{str}
+
+        @return: The number of frames in specified queue.
+        @rtype: C{int}
+        """
+        raise NotImplementedError()
+
+    @synchronized(lock)
+    def has_frames(self, destination):
+        """
+        Whether specified destination has any frames.
+
+        Default implementation uses L{QueueStore.size} to determine if there
+        are any frames in queue.  Subclasses may choose to optimize this.
+
+        @param destination: The queue destination (e.g. /queue/foo)
+        @type destination: C{str}
+
+        @return: The number of frames in specified queue.
+        @rtype: C{int}
+        """
+        return self.size(destination) > 0
+
+    @synchronized(lock)
+    def destinations(self):
+        """
+        Provides a set of destinations (queue "addresses") available.
+
+        @return: A list of the detinations available.
+        @rtype: C{set}
+        """
+        raise NotImplementedError
+
+    @synchronized(lock)
+    def close(self):
+        """
+        May be implemented to perform any necessary cleanup operations when store is closed.
+        """
+        pass
+
+    # This is intentionally not synchronized, since it does not directly
+    # expose any shared data.
+    def frames(self, destination):
+        """
+        Returns an iterator for frames in specified queue.
+
+        The iterator simply wraps calls to L{dequeue} method, so the order of the 
+        frames from the iterator will be the reverse of the order in which the
+        frames were enqueued.
+
+        @param destination: The queue destination (e.g. /queue/foo)
+        @type destination: C{str}
+        """
+        return QueueFrameIterator(self, destination)
+
+
+class QueueFrameIterator(object):
+    """
+    Provides an C{iterable} over the frames for a specified destination in a queue.
+
+    @ivar store: The queue store.
+    @type store: L{coilmq.store.QueueStore}
+
+    @ivar destination: The destination for this iterator.
+    @type destination: C{str}
+    """
+
+    def __init__(self, store, destination):
+        self.store = store
+        self.destination = destination
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        return self.__next__()
+
+    def __next__(self):
+        frame = self.store.dequeue(self.destination)
+        if not frame:
+            raise StopIteration()
+        return frame
+
+    def __len__(self):
+        return self.store.size(self.destination)
+
+
+class TopicStore(object):
+    """
+    Abstract base class for non-durable topic storage.
+    """
+
+
+class DurableTopicStore(TopicStore):
+    """
+    Abstract base class for durable topic storage.
+    """

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/dbm.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/dbm.py b/ambari-common/src/test/python/coilmq/store/dbm.py
new file mode 100644
index 0000000..ab4fb00
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/dbm.py
@@ -0,0 +1,262 @@
+"""
+Queue storage module that stores the queue information and frames in a DBM-style database.
+
+The current implementation uses the Python `shelve` module, which uses a DBM implementation
+under the hood (specifically the `anydbm` module, aka `dbm` in Python 3.x).
+
+Because of how the `shelve` module works (and how we're using it) and caveats in the Python 
+documentation this is likely a BAD storage module to use if you are expecting to traffic in
+large frames.
+"""
+import threading
+import logging
+import os
+import os.path
+import shelve
+from collections import deque
+from datetime import datetime, timedelta
+
+try:
+    from configparser import ConfigParser
+except ImportError:
+    from ConfigParser import ConfigParser
+
+
+from coilmq.store import QueueStore
+from coilmq.config import config
+from coilmq.exception import ConfigError
+from coilmq.util.concurrency import synchronized
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+def make_dbm():
+    """
+    Creates a DBM queue store, pulling config values from the CoilMQ configuration.
+    """
+    try:
+        data_dir = config.get('coilmq', 'qstore.dbm.data_dir')
+        cp_ops = config.getint('coilmq', 'qstore.dbm.checkpoint_operations')
+        cp_timeout = config.getint('coilmq', 'qstore.dbm.checkpoint_timeout')
+    except ConfigParser.NoOptionError as e:
+        raise ConfigError('Missing configuration parameter: %s' % e)
+
+    if not os.path.exists(data_dir):
+        raise ConfigError('DBM directory does not exist: %s' % data_dir)
+    # FIXME: how do these get applied? Is OR appropriate?
+    if not os.access(data_dir, os.W_OK | os.R_OK):
+        raise ConfigError('Cannot read and write DBM directory: %s' % data_dir)
+
+    store = DbmQueue(data_dir, checkpoint_operations=cp_ops,
+                     checkpoint_timeout=cp_timeout)
+    return store
+
+
+class DbmQueue(QueueStore):
+    """
+    A QueueStore implementation that stores messages and queue information in DBM-style
+    database.
+
+    Several database files will be used to support this functionality: metadata about the
+    queues will be stored in its own database and each queue will also have its own
+    database file.
+
+    This classes uses a C{threading.RLock} to guard access to the memory store, since it
+    appears that at least some of the underlying implementations that anydbm uses are not
+    thread-safe
+
+    Due to some impedence mismatch between the types of data we need to store in queues
+    (specifically lists) and the types of data that are best stored in DBM databases
+    (specifically dicts), this class uses the `shelve` module to abstract away some
+    of the ugliness.  The consequence of this is that we only persist objects periodically
+    to the datastore, for performance reasons.  How periodic is determined by the 
+    `checkpoint_operations` and `checkpoint_timeout` instance variables (and params to 
+    L{__init__}).
+
+    @ivar data_dir: The directory where DBM files will be stored.
+    @type data_dir: C{str}
+
+    @ivar queue_metadata: A Shelf (DBM) database that tracks stats & delivered message ids 
+                            for all the queues.
+    @ivar queue_metadata: C{shelve.Shelf}
+
+    @ivar frame_store: A Shelf (DBM) database that contains frame contents indexed by message id.
+    @type frame_store: C{shelve.Shelf}
+
+    @ivar _opcount: Internal counter for keeping track of unpersisted operations.
+    @type _opcount: C{int}
+
+    @ivar checkpoint_operations: Number of operations between syncs.
+    @type checkpoint_operations: C{int}
+
+    @ivar checkpoint_timeout: Max time (in seconds) that can elapse between sync of cache.
+    @type checkpoint_timeout: C{float}
+    """
+
+    def __init__(self, data_dir, checkpoint_operations=100, checkpoint_timeout=30):
+        """
+        @param data_dir: The directory where DBM files will be stored.
+        @param data_dir: C{str}
+
+        @param checkpoint_operations: Number of operations between syncs.
+        @type checkpoint_operations: C{int}
+
+        @param checkpoint_timeout: Max time (in seconds) that can elapse between sync of cache.
+        @type checkpoint_timeout: C{float}
+        """
+        QueueStore.__init__(self)
+
+        self._opcount = 0
+        self._last_sync = datetime.now()
+
+        self.data_dir = data_dir
+        self.checkpoint_operations = checkpoint_operations
+        self.checkpoint_timeout = timedelta(seconds=checkpoint_timeout)
+
+        # Should this be in constructor?
+
+        # The queue metadata stores mutable (dict) objects.  For this reason we set
+        # writeback=True and rely on the sync() method to keep the cache & disk
+        # in-sync.
+        self.queue_metadata = shelve.open(os.path.join(
+            self.data_dir, 'metadata'), writeback=True)
+
+        # Since we do not need mutable objects on the frame stores (we don't modify them, we just
+        # put/get values), we do NOT use writeback=True here.  This should also conserve on memory
+        # usage, since apparently that can get hefty with the caching when
+        # writeback=True.
+        self.frame_store = shelve.open(os.path.join(
+            self.data_dir, 'frames'), writeback=False)
+
+    @synchronized(lock)
+    def enqueue(self, destination, frame):
+        """
+        Store message (frame) for specified destinationination.
+
+        @param destination: The destinationination queue name for this message (frame).
+        @type destination: C{str}
+
+        @param frame: The message (frame) to send to specified destinationination.
+        @type frame: C{stompclient.frame.Frame}
+        """
+        message_id = frame.headers.get('message-id')
+        if not message_id:
+            raise ValueError("Cannot queue a frame without message-id set.")
+
+        if not destination in self.queue_metadata:
+            self.log.info(
+                "Destination %s not in metadata; creating new entry and queue database." % destination)
+            self.queue_metadata[destination] = {
+                'frames': deque(), 'enqueued': 0, 'dequeued': 0, 'size': 0}
+
+        self.queue_metadata[destination]['frames'].appendleft(message_id)
+        self.queue_metadata[destination]['enqueued'] += 1
+
+        self.frame_store[message_id] = frame
+
+        self._opcount += 1
+        self._sync()
+
+    @synchronized(lock)
+    def dequeue(self, destination):
+        """
+        Removes and returns an item from the queue (or C{None} if no items in queue).
+
+        @param destination: The queue name (destinationination).
+        @type destination: C{str}
+
+        @return: The first frame in the specified queue, or C{None} if there are none.
+        @rtype: C{stompclient.frame.Frame} 
+        """
+        if not self.has_frames(destination):
+            return None
+
+        message_id = self.queue_metadata[destination]['frames'].pop()
+        self.queue_metadata[destination]['dequeued'] += 1
+
+        frame = self.frame_store[message_id]
+        del self.frame_store[message_id]
+
+        self._opcount += 1
+        self._sync()
+
+        return frame
+
+    @synchronized(lock)
+    def has_frames(self, destination):
+        """
+        Whether specified queue has any frames.
+
+        @param destination: The queue name (destinationination).
+        @type destination: C{str}
+
+        @return: Whether there are any frames in the specified queue.
+        @rtype: C{bool}
+        """
+        return (destination in self.queue_metadata) and bool(self.queue_metadata[destination]['frames'])
+
+    @synchronized(lock)
+    def size(self, destination):
+        """
+        Size of the queue for specified destination.
+
+        @param destination: The queue destination (e.g. /queue/foo)
+        @type destination: C{str}
+
+        @return: The number of frames in specified queue.
+        @rtype: C{int}
+        """
+        if not destination in self.queue_metadata:
+            return 0
+        else:
+            return len(self.queue_metadata[destination]['frames'])
+
+    @synchronized(lock)
+    def close(self):
+        """
+        Closes the databases, freeing any resources (and flushing any unsaved changes to disk).
+        """
+        self.queue_metadata.close()
+        self.frame_store.close()
+
+    @synchronized(lock)
+    def destinations(self):
+        """
+        Provides a list of destinations (queue "addresses") available.
+
+        @return: A list of the detinations available.
+        @rtype: C{set}
+        """
+        return set(self.queue_metadata.keys())
+
+    def _sync(self):
+        """
+        Synchronize the cached data with the underlyind database.
+
+        Uses an internal transaction counter and compares to the checkpoint_operations
+        and checkpoint_timeout paramters to determine whether to persist the memory store.
+
+        In this implementation, this method wraps calls to C{shelve.Shelf#sync}. 
+        """
+        if (self._opcount > self.checkpoint_operations or
+                datetime.now() > self._last_sync + self.checkpoint_timeout):
+            self.log.debug("Synchronizing queue metadata.")
+            self.queue_metadata.sync()
+            self._last_sync = datetime.now()
+            self._opcount = 0
+        else:
+            self.log.debug("NOT synchronizing queue metadata.")

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/memory.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/memory.py b/ambari-common/src/test/python/coilmq/store/memory.py
new file mode 100644
index 0000000..783967c
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/memory.py
@@ -0,0 +1,76 @@
+"""
+Queue storage module that uses thread-safe, in-memory data structures.  
+"""
+import threading
+from collections import defaultdict, deque
+
+from coilmq.store import QueueStore
+from coilmq.util.concurrency import synchronized
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+class MemoryQueue(QueueStore):
+    """
+    A QueueStore implementation that stores messages in memory.
+
+    This classes uses a C{threading.RLock} to guard access to the memory store.  
+    The locks on this class are probably excessive given that the 
+    L{coilmq.queue.QueueManager} is already implementing coarse-grained locking 
+    on the methods that access this storage backend.  That said, we'll start
+    over-protective and refactor later it if proves unecessary. 
+    """
+
+    def __init__(self):
+        QueueStore.__init__(self)
+        self._messages = defaultdict(deque)
+
+    @synchronized(lock)
+    def enqueue(self, destination, frame):
+        self._messages[destination].appendleft(frame)
+
+    @synchronized(lock)
+    def dequeue(self, destination):
+        try:
+            return self._messages[destination].pop()
+        except IndexError:
+            return None
+
+    @synchronized(lock)
+    def size(self, destination):
+        """
+        Size of the queue for specified destination.
+
+        @param destination: The queue destination (e.g. /queue/foo)
+        @type destination: C{str}
+        """
+        return len(self._messages[destination])
+
+    @synchronized(lock)
+    def has_frames(self, destination):
+        """ Whether this queue has frames for the specified destination. """
+        return bool(self._messages[destination])
+
+    @synchronized(lock)
+    def destinations(self):
+        """
+        Provides a list of destinations (queue "addresses") available.
+
+        @return: A list of the detinations available.
+        @rtype: C{set}
+        """
+        return set(self._messages.keys())

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/rds.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/rds.py b/ambari-common/src/test/python/coilmq/store/rds.py
new file mode 100644
index 0000000..f21bee3
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/rds.py
@@ -0,0 +1,69 @@
+try:
+    import redis
+except ImportError:  # pragma: no cover
+    import sys; sys.exit('please, install redis-py package to use redis-store')
+import threading
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+from coilmq.store import QueueStore
+from coilmq.util.concurrency import synchronized
+from coilmq.config import config
+
+__authors__ = ('"Hans Lellelid" <ha...@xmpl.org>', '"Alexander Zhukov" <zh...@gmail.com>')
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+def make_redis_store(cfg=None):
+    return RedisQueueStore(
+        redis_conn=redis.Redis(**dict((cfg or config).items('redis'))))
+
+
+class RedisQueueStore(QueueStore):
+    """Simple Queue with Redis Backend"""
+    def __init__(self, redis_conn=None):
+        """The default connection parameters are: host='localhost', port=6379, db=0"""
+        self.__db = redis_conn or redis.Redis()
+        # self.key = '{0}:{1}'.format(namespace, name)
+        super(RedisQueueStore, self).__init__()
+
+    @synchronized(lock)
+    def enqueue(self, destination, frame):
+        self.__db.rpush(destination, pickle.dumps(frame))
+
+    @synchronized(lock)
+    def dequeue(self, destination):
+        item = self.__db.lpop(destination)
+        if item:
+            return pickle.loads(item)
+
+    @synchronized(lock)
+    def requeue(self, destination, frame):
+        self.enqueue(destination, frame)
+
+    @synchronized(lock)
+    def size(self, destination):
+        return self.__db.llen(destination)
+
+    @synchronized(lock)
+    def has_frames(self, destination):
+        return self.size(destination) > 0
+
+    @synchronized(lock)
+    def destinations(self):
+        return self.__db.keys()

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/sa/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/sa/__init__.py b/ambari-common/src/test/python/coilmq/store/sa/__init__.py
new file mode 100644
index 0000000..77c6ce9
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/sa/__init__.py
@@ -0,0 +1,205 @@
+"""
+Queue storage module that uses SQLAlchemy to access queue information and frames in a database.
+
+
+"""
+import threading
+import logging
+import os
+import os.path
+import shelve
+from collections import deque
+from datetime import datetime, timedelta
+
+# try:
+#     from configparser import ConfigParser
+# except ImportError:
+#     from ConfigParser import ConfigParser
+
+from sqlalchemy import engine_from_config, MetaData
+from sqlalchemy.orm import scoped_session, sessionmaker
+from sqlalchemy.sql import select, func, distinct
+
+from coilmq.store import QueueStore
+from coilmq.config import config
+from coilmq.exception import ConfigError
+from coilmq.util.concurrency import synchronized
+from coilmq.store.sa import meta, model
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+def make_sa():
+    """
+    Factory to creates a SQLAlchemy queue store, pulling config values from the CoilMQ configuration.
+    """
+    configuration = dict(config.items('coilmq'))
+    engine = engine_from_config(configuration, 'qstore.sqlalchemy.')
+    init_model(engine)
+    store = SAQueue()
+    return store
+
+
+def init_model(engine, create=True, drop=False):
+    """
+    Initializes the shared SQLAlchemy state in the L{coilmq.store.sa.model} module.
+
+    @param engine: The SQLAlchemy engine instance.
+    @type engine: C{sqlalchemy.Engine}
+
+    @param create: Whether to create the tables (if they do not exist).
+    @type create: C{bool}
+
+    @param drop: Whether to drop the tables (if they exist).
+    @type drop: C{bool}
+    """
+    meta.engine = engine
+    meta.metadata = MetaData(bind=meta.engine)
+    meta.Session = scoped_session(sessionmaker(bind=meta.engine))
+    model.setup_tables(create=create, drop=drop)
+
+
+class SAQueue(QueueStore):
+    """
+    A QueueStore implementation that stores messages in a database and uses SQLAlchemy to interface
+    with the database.
+
+    Note that this implementation does not actually use the ORM capabilities of SQLAlchemy, but simply
+    uses SQLAlchemy for the DB abstraction for SQL building and DDL (table creation).
+
+    This L{coilmq.store.sa.model.setup_tables} function is used to actually define (& create) the 
+    database tables.  This class also depends on the L{init_model} method have been called to 
+    define the L{coilmq.store.sa.model.Session} class-like callable (and the engine & metadata).
+
+    Finally, this class does not explicitly use shared data (db connections); a new Session is created
+    in each method.  The actual implementation is handled using SQLAlchemy scoped sessions, which provide
+    thread-local Session class-like callables. As a result of deferring that to the SA layer, we don't 
+    need to use synchronization locks to guard calls to the methods in this store implementation.
+    """
+
+    def enqueue(self, destination, frame):
+        """
+        Store message (frame) for specified destinationination.
+
+        @param destination: The destinationination queue name for this message (frame).
+        @type destination: C{str}
+
+        @param frame: The message (frame) to send to specified destinationination.
+        @type frame: C{stompclient.frame.Frame}
+        """
+        session = meta.Session()
+        message_id = frame.headers.get('message-id')
+        if not message_id:
+            raise ValueError("Cannot queue a frame without message-id set.")
+        ins = model.frames_table.insert().values(
+            message_id=message_id, destination=destination, frame=frame)
+        session.execute(ins)
+        session.commit()
+
+    def dequeue(self, destination):
+        """
+        Removes and returns an item from the queue (or C{None} if no items in queue).
+
+        @param destination: The queue name (destinationination).
+        @type destination: C{str}
+
+        @return: The first frame in the specified queue, or C{None} if there are none.
+        @rtype: C{stompclient.frame.Frame} 
+        """
+        session = meta.Session()
+
+        try:
+
+            selstmt = select(
+                [model.frames_table.c.message_id, model.frames_table.c.frame])
+            selstmt = selstmt.where(
+                model.frames_table.c.destination == destination)
+            selstmt = selstmt.order_by(
+                model.frames_table.c.queued, model.frames_table.c.sequence)
+
+            result = session.execute(selstmt)
+
+            first = result.fetchone()
+            if not first:
+                return None
+
+            delstmt = model.frames_table.delete().where(model.frames_table.c.message_id ==
+                                                        first[model.frames_table.c.message_id])
+            session.execute(delstmt)
+
+            frame = first[model.frames_table.c.frame]
+
+        except:
+            session.rollback()
+            raise
+        else:
+            session.commit()
+            return frame
+
+    def has_frames(self, destination):
+        """
+        Whether specified queue has any frames.
+
+        @param destination: The queue name (destinationination).
+        @type destination: C{str}
+
+        @return: Whether there are any frames in the specified queue.
+        @rtype: C{bool}
+        """
+        session = meta.Session()
+        sel = select([model.frames_table.c.message_id]).where(
+            model.frames_table.c.destination == destination)
+        result = session.execute(sel)
+
+        first = result.fetchone()
+        return first is not None
+
+    def size(self, destination):
+        """
+        Size of the queue for specified destination.
+
+        @param destination: The queue destination (e.g. /queue/foo)
+        @type destination: C{str}
+
+        @return: The number of frames in specified queue.
+        @rtype: C{int}
+        """
+        session = meta.Session()
+        sel = select([func.count(model.frames_table.c.message_id)]).where(
+            model.frames_table.c.destination == destination)
+        result = session.execute(sel)
+        first = result.fetchone()
+        if not first:
+            return 0
+        else:
+            return int(first[0])
+
+    def destinations(self):
+        """
+        Provides a list of destinations (queue "addresses") available.
+
+        @return: A list of the detinations available.
+        @rtype: C{set}
+        """
+        session = meta.Session()
+        sel = select([distinct(model.frames_table.c.destination)])
+        result = session.execute(sel)
+        return set([r[0] for r in result.fetchall()])
+
+    def close(self):
+        """
+        Closes the databases, freeing any resources (and flushing any unsaved changes to disk).
+        """
+        meta.Session.remove()

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/sa/meta.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/sa/meta.py b/ambari-common/src/test/python/coilmq/store/sa/meta.py
new file mode 100644
index 0000000..bf47ca9
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/sa/meta.py
@@ -0,0 +1,9 @@
+"""
+Module to hold shared SQLAlchemy state.
+
+These objects are set by the L{coilmq.store.sa.init_model} function.
+"""
+engine = None  # : The SA engine
+Session = None  # : The SA Session (or Session-like callable)
+# : The SA C{sqlalchemy.orm.MetaData} instance bound to the engine.
+metadata = None

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/sa/model.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/sa/model.py b/ambari-common/src/test/python/coilmq/store/sa/model.py
new file mode 100644
index 0000000..43e5009
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/sa/model.py
@@ -0,0 +1,53 @@
+"""
+Definition of the datamodel required for SA storage backend.
+"""
+
+from sqlalchemy import Table, Column, BigInteger, String, PickleType, DateTime
+from sqlalchemy.sql import func
+
+from coilmq.store.sa import meta
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+frames_table = None  # : The C{sqlalchemy.Table} set by L{setup_tables}
+
+
+def setup_tables(create=True, drop=False):
+    """
+    Binds the model classes to registered metadata and engine and (potentially) 
+    creates the db tables.
+
+    This function expects that you have bound the L{meta.metadata} and L{meta.engine}.
+
+    @param create: Whether to create the tables (if they do not exist).
+    @type create: C{bool}
+
+    @param drop: Whether to drop the tables (if they exist).
+    @type drop: C{bool}
+    """
+    global frames_table
+    frames_table = Table('frames', meta.metadata,
+                         Column('message_id', String(255), primary_key=True),
+                         Column('sequence', BigInteger,
+                                primary_key=False, autoincrement=True),
+                         Column('destination', String(255), index=True),
+                         Column('frame', PickleType),
+                         Column('queued', DateTime, default=func.now()))
+
+    if drop:
+        meta.metadata.drop_all()
+
+    if drop or create:
+        meta.metadata.create_all()

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/topic.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/topic.py b/ambari-common/src/test/python/coilmq/topic.py
new file mode 100644
index 0000000..d6436be
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/topic.py
@@ -0,0 +1,144 @@
+"""
+Non-durable topic support functionality.
+
+This code is inspired by the design of the Ruby stompserver project, by 
+Patrick Hurley and Lionel Bouton.  See http://stompserver.rubyforge.org/
+"""
+import logging
+import threading
+import uuid
+from collections import defaultdict
+
+from coilmq.util.concurrency import synchronized
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+class TopicManager(object):
+    """
+    Class that manages distribution of messages to topic subscribers.
+
+    This class uses C{threading.RLock} to guard the public methods.  This is probably
+    a bit excessive, given 1) the actomic nature of basic C{dict} read/write operations 
+    and  2) the fact that most of the internal data structures are keying off of the 
+    STOMP connection, which is going to be thread-isolated.  That said, this seems like 
+    the technically correct approach and should increase the chance of this code being
+    portable to non-GIL systems.
+
+    @ivar _topics: A dict of registered topics, keyed by destination.
+    @type _topics: C{dict} of C{str} to C{set} of L{coilmq.server.StompConnection}
+    """
+
+    def __init__(self):
+        self.log = logging.getLogger(
+            '%s.%s' % (__name__, self.__class__.__name__))
+
+        # Lock var is required for L{synchornized} decorator.
+        self._lock = threading.RLock()
+
+        self._topics = defaultdict(set)
+
+        # TODO: If we want durable topics, we'll need a store for topics.
+
+    @synchronized(lock)
+    def close(self):
+        """
+        Closes all resources associated with this topic manager.
+
+        (Currently this is simply here for API conformity w/ L{coilmq.queue.QueueManager}.)
+        """
+        self.log.info("Shutting down topic manager.")  # pragma: no cover
+
+    @synchronized(lock)
+    def subscribe(self, connection, destination):
+        """
+        Subscribes a connection to the specified topic destination. 
+
+        @param connection: The client connection to subscribe.
+        @type connection: L{coilmq.server.StompConnection}
+
+        @param destination: The topic destination (e.g. '/topic/foo')
+        @type destination: C{str} 
+        """
+        self.log.debug("Subscribing %s to %s" % (connection, destination))
+        self._topics[destination].add(connection)
+
+    @synchronized(lock)
+    def unsubscribe(self, connection, destination):
+        """
+        Unsubscribes a connection from the specified topic destination. 
+
+        @param connection: The client connection to unsubscribe.
+        @type connection: L{coilmq.server.StompConnection}
+
+        @param destination: The topic destination (e.g. '/topic/foo')
+        @type destination: C{str} 
+        """
+        self.log.debug("Unsubscribing %s from %s" % (connection, destination))
+        if connection in self._topics[destination]:
+            self._topics[destination].remove(connection)
+
+        if not self._topics[destination]:
+            del self._topics[destination]
+
+    @synchronized(lock)
+    def disconnect(self, connection):
+        """
+        Removes a subscriber connection.
+
+        @param connection: The client connection to unsubscribe.
+        @type connection: L{coilmq.server.StompConnection}
+        """
+        self.log.debug("Disconnecting %s" % connection)
+        for dest in list(self._topics.keys()):
+            if connection in self._topics[dest]:
+                self._topics[dest].remove(connection)
+            if not self._topics[dest]:
+                # This won't trigger RuntimeError, since we're using keys()
+                del self._topics[dest]
+
+    @synchronized(lock)
+    def send(self, message):
+        """
+        Sends a message to all subscribers of destination.
+
+        @param message: The message frame.  (The frame will be modified to set command 
+                            to MESSAGE and set a message id.)
+        @type message: L{stompclient.frame.Frame}
+        """
+        dest = message.headers.get('destination')
+        if not dest:
+            raise ValueError(
+                "Cannot send frame with no destination: %s" % message)
+
+        message.cmd = 'message'
+
+        message.headers.setdefault('message-id', str(uuid.uuid4()))
+
+        bad_subscribers = set()
+        for subscriber in self._topics[dest]:
+            try:
+                subscriber.send_frame(message)
+            except:
+                self.log.exception(
+                    "Error delivering message to subscriber %s; client will be disconnected." % subscriber)
+                # We queue for deletion so we are not modifying the topics dict
+                # while iterating over it.
+                bad_subscribers.add(subscriber)
+
+        for subscriber in bad_subscribers:
+            self.disconnect(subscriber)

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/util/__init__.py b/ambari-common/src/test/python/coilmq/util/__init__.py
new file mode 100644
index 0000000..ae25f74
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/util/__init__.py
@@ -0,0 +1,16 @@
+"""
+CoilMQ utility modules.
+"""
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/concurrency.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/util/concurrency.py b/ambari-common/src/test/python/coilmq/util/concurrency.py
new file mode 100644
index 0000000..3eb539a
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/util/concurrency.py
@@ -0,0 +1,96 @@
+"""
+Tools to facilitate developing thread-safe components.
+"""
+
+import abc
+import threading
+import functools
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+def synchronized(lock):
+    def synchronize(func):
+        """
+        Decorator to lock and unlock a method (Phillip J. Eby).
+
+        This function is to be used with object instance methods; the object must
+        have a _lock variable (of type C{threading.Lock} or C{threading.RLock}).
+
+        @param func: Method to decorate
+        @type func: C{callable}
+        """
+        @functools.wraps(func)
+        def wrapper(*args, **kwargs):
+            with lock:
+                return func(*args, **kwargs)
+        return wrapper
+    return synchronize
+
+
+class CoilTimerBase(object):
+
+    __metaclass__ = abc.ABCMeta
+
+    def __init__(self):
+        self.jobs = []
+
+    def schedule(self, period, callback):
+        self.jobs.append((period, callback))
+
+    @abc.abstractmethod
+    def run(self):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def start(self):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def stop(self):
+        raise NotImplementedError
+
+    def __enter__(self):
+        self.start()
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.stop()
+
+
+# TODO: check against following notes
+# <http://stackoverflow.com/questions/2124540/how-does-timer-in-python-work-regarding-mutlithreading>
+# <http://stackoverflow.com/questions/12435211/python-threading-timer-repeat-function-every-n-seconds>
+class CoilThreadingTimer(CoilTimerBase):
+
+    def __init__(self, *args, **kwargs):
+        super(CoilThreadingTimer, self).__init__(*args, **kwargs)
+        self._running = False
+
+    def run(self):
+        def run_job(interval, callback):
+            if self._running:
+                threading.Timer(interval, run_job, args=(interval, callback)).start()
+                callback()
+        for period, job in self.jobs:
+            run_job(period, job)
+
+    def start(self):
+        self._running = True
+        self.run()
+
+    def stop(self):
+        self._running = False
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/frames.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/util/frames.py b/ambari-common/src/test/python/coilmq/util/frames.py
new file mode 100644
index 0000000..3026ee0
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/util/frames.py
@@ -0,0 +1,359 @@
+from functools import partial
+import re
+import logging
+from collections import OrderedDict
+import io
+
+import six
+
+SEND = 'SEND'
+CONNECT = 'CONNECT'
+MESSAGE = 'MESSAGE'
+ERROR = 'ERROR'
+CONNECTED = 'CONNECTED'
+SUBSCRIBE = 'SUBSCRIBE'
+UNSUBSCRIBE = 'UNSUBSCRIBE'
+BEGIN = 'BEGIN'
+COMMIT = 'COMMIT'
+ABORT = 'ABORT'
+ACK = 'ACK'
+NACK = 'NACK'
+DISCONNECT = 'DISCONNECT'
+
+VALID_COMMANDS = ['message', 'connect', 'connected', 'error', 'send',
+                  'subscribe', 'unsubscribe', 'begin', 'commit', 'abort', 'ack', 'disconnect', 'nack']
+
+TEXT_PLAIN = 'text/plain'
+
+
+class IncompleteFrame(Exception):
+    """The frame has incomplete body"""
+
+
+class BodyNotTerminated(Exception):
+    """The frame's body is not terminated with the NULL character"""
+
+
+class EmptyBuffer(Exception):
+    """The buffer is empty"""
+
+
+def parse_headers(buff):
+    """
+    Parses buffer and returns command and headers as strings
+    """
+    preamble_lines = list(map(
+        lambda x: six.u(x).decode(),
+        iter(lambda: buff.readline().strip(), b''))
+    )
+    if not preamble_lines:
+        raise EmptyBuffer()
+    return preamble_lines[0], OrderedDict([l.split(':') for l in preamble_lines[1:]])
+
+
+def parse_body(buff, headers):
+    content_length = int(headers.get('content-length', -1))
+    body = buff.read(content_length)
+    if content_length >= 0:
+        if len(body) < content_length:
+            raise IncompleteFrame()
+        terminator = six.u(buff.read(1)).decode()
+        if not terminator:
+            raise BodyNotTerminated()
+    else:
+        # no content length
+        body, terminator, rest = body.partition(b'\x00')
+        if not terminator:
+            raise BodyNotTerminated()
+        else:
+            buff.seek(-len(rest), 2)
+
+    return body
+
+
+class Frame(object):
+    """
+    A STOMP frame (or message).
+
+    :param cmd: the protocol command
+    :param headers: a map of headers for the frame
+    :param body: the content of the frame.
+    """
+
+    def __init__(self, cmd, headers=None, body=None):
+        self.cmd = cmd
+        self.headers = headers or {}
+        self.body = body or ''
+
+    def __str__(self):
+        return '{{cmd={0},headers=[{1}],body={2}}}'.format(
+            self.cmd,
+            self.headers,
+            self.body if isinstance(
+                self.body, six.binary_type) else six.b(self.body)
+        )
+
+    def __eq__(self, other):
+        """ Override equality checking to test for matching command, headers, and body. """
+        return all([isinstance(other, Frame),
+                    self.cmd == other.cmd,
+                    self.headers == other.headers,
+                    self.body == other.body])
+
+    @property
+    def transaction(self):
+        return self.headers.get('transaction')
+
+    @classmethod
+    def from_buffer(cls, buff):
+        cmd, headers = parse_headers(buff)
+        body = parse_body(buff, headers)
+        return cls(cmd, headers=headers, body=body)
+
+    def pack(self):
+        """
+        Create a string representation from object state.
+
+        @return: The string (bytes) for this stomp frame.
+        @rtype: C{str}
+        """
+
+        self.headers.setdefault('content-length', len(self.body))
+
+        # Convert and append any existing headers to a string as the
+        # protocol describes.
+        headerparts = ("{0}:{1}\n".format(key, value)
+                       for key, value in self.headers.items())
+
+        # Frame is Command + Header + EOF marker.
+        return six.b("{0}\n{1}\n".format(self.cmd, "".join(headerparts))) + (self.body if isinstance(self.body, six.binary_type) else six.b(self.body)) + six.b('\x00')
+
+
+class ConnectedFrame(Frame):
+    """ A CONNECTED server frame (response to CONNECT).
+
+    @ivar session: The (throw-away) session ID to include in response.
+    @type session: C{str}
+    """
+
+    def __init__(self, session, extra_headers=None):
+        """
+        @param session: The (throw-away) session ID to include in response.
+        @type session: C{str}
+        """
+        super(ConnectedFrame, self).__init__(
+            cmd='connected', headers=extra_headers or {})
+        self.headers['session'] = session
+
+
+class HeaderValue(object):
+    """
+    An descriptor class that can be used when a calculated header value is needed.
+
+    This class is a descriptor, implementing  __get__ to return the calculated value.
+    While according to  U{http://docs.codehaus.org/display/STOMP/Character+Encoding} there
+    seems to some general idea about having UTF-8 as the character encoding for headers;
+    however the C{stomper} lib does not support this currently.
+
+    For example, to use this class to generate the content-length header:
+
+        >>> body = 'asdf'
+        >>> headers = {}
+        >>> headers['content-length'] = HeaderValue(calculator=lambda: len(body))
+        >>> str(headers['content-length'])
+        '4'
+
+    @ivar calc: The calculator function.
+    @type calc: C{callable}
+    """
+
+    def __init__(self, calculator):
+        """
+        @param calculator: The calculator callable that will yield the desired value.
+        @type calculator: C{callable}
+        """
+        if not callable(calculator):
+            raise ValueError("Non-callable param: %s" % calculator)
+        self.calc = calculator
+
+    def __get__(self, obj, objtype):
+        return self.calc()
+
+    def __str__(self):
+        return str(self.calc())
+
+    def __set__(self, obj, value):
+        self.calc = value
+
+    def __repr__(self):
+        return '<%s calculator=%s>' % (self.__class__.__name__, self.calc)
+
+
+class ErrorFrame(Frame):
+    """ An ERROR server frame. """
+
+    def __init__(self, message, body=None, extra_headers=None):
+        """
+        @param body: The message body bytes.
+        @type body: C{str}
+        """
+        super(ErrorFrame, self).__init__(cmd='error',
+                                         headers=extra_headers or {}, body=body)
+        self.headers['message'] = message
+        self.headers[
+            'content-length'] = HeaderValue(calculator=lambda: len(self.body))
+
+    def __repr__(self):
+        return '<%s message=%r>' % (self.__class__.__name__, self.headers['message'])
+
+
+class ReceiptFrame(Frame):
+    """ A RECEIPT server frame. """
+
+    def __init__(self, receipt, extra_headers=None):
+        """
+        @param receipt: The receipt message ID.
+        @type receipt: C{str}
+        """
+        super(ReceiptFrame, self).__init__(
+            'RECEIPT', headers=extra_headers or {})
+        self.headers['receipt-id'] = receipt
+
+
+class FrameBuffer(object):
+    """
+    A customized version of the StompBuffer class from Stomper project that returns frame objects
+    and supports iteration.
+
+    This version of the parser also assumes that stomp messages with no content-lengh
+    end in a simple \\x00 char, not \\x00\\n as is assumed by
+    C{stomper.stompbuffer.StompBuffer}. Additionally, this class differs from Stomper version
+    by conforming to PEP-8 coding style.
+
+    This class can be used to smooth over a transport that may provide partial frames (or
+    may provide multiple frames in one data buffer).
+
+    @ivar _buffer: The internal byte buffer.
+    @type _buffer: C{str}
+
+    @ivar debug: Log extra parsing debug (logs will be DEBUG level).
+    @type debug: C{bool}
+    """
+
+    # regexp to check that the buffer starts with a command.
+    command_re = re.compile('^(.+?)\n')
+
+    # regexp to remove everything up to and including the first
+    # instance of '\x00' (used in resynching the buffer).
+    sync_re = re.compile('^.*?\x00')
+
+    # regexp to determine the content length. The buffer should always start
+    # with a command followed by the headers, so the content-length header will
+    # always be preceded by a newline.  It may not always proceeded by a
+    # newline, though!
+    content_length_re = re.compile('\ncontent-length\s*:\s*(\d+)\s*(\n|$)')
+
+    def __init__(self):
+        self._buffer = io.BytesIO()
+        self._pointer = 0
+        self.debug = False
+        self.log = logging.getLogger('%s.%s' % (
+            self.__module__, self.__class__.__name__))
+
+    def clear(self):
+        """
+        Clears (empties) the internal buffer.
+        """
+        self._buffer = io
+
+    def buffer_len(self):
+        """
+        @return: Number of bytes in the internal buffer.
+        @rtype: C{int}
+        """
+        return len(self._buffer)
+
+    def buffer_empty(self):
+        """
+        @return: C{True} if buffer is empty, C{False} otherwise.
+        @rtype: C{bool}
+        """
+        return not bool(self._buffer)
+
+    def append(self, data):
+        """
+        Appends bytes to the internal buffer (may or may not contain full stomp frames).
+
+        @param data: The bytes to append.
+        @type data: C{str}
+        """
+        self._buffer.write(data)
+
+    def extract_frame(self):
+        """
+        Pulls one complete frame off the buffer and returns it.
+
+        If there is no complete message in the buffer, returns None.
+
+        Note that the buffer can contain more than once message. You
+        should therefore call this method in a loop (or use iterator
+        functionality exposed by class) until None returned.
+
+        @return: The next complete frame in the buffer.
+        @rtype: L{stomp.frame.Frame}
+        """
+        # (mbytes, hbytes) = self._find_message_bytes(self.buffer)
+        # if not mbytes:
+        #     return None
+        #
+        # msgdata = self.buffer[:mbytes]
+        # self.buffer = self.buffer[mbytes:]
+        # hdata = msgdata[:hbytes]
+        # # Strip off any leading whitespace from headers; this is necessary, because
+        # # we do not (any longer) expect a trailing \n after the \x00 byte (which means
+        # # it will become a leading \n to the next frame).
+        # hdata = hdata.lstrip()
+        # elems = hdata.split('\n')
+        # cmd = elems.pop(0)
+        # headers = {}
+        #
+        # for e in elems:
+        #     try:
+        #         (k,v) = e.split(':', 1) # header values may contain ':' so specify maxsplit
+        #     except ValueError:
+        #         continue
+        #     headers[k.strip()] = v.strip()
+        #
+        # # hbytes points to the start of the '\n\n' at the end of the header,
+        # # so 2 bytes beyond this is the start of the body. The body EXCLUDES
+        # # the final byte, which is  '\x00'.
+        # body = msgdata[hbytes + 2:-1]
+        self._buffer.seek(self._pointer, 0)
+        try:
+            f = Frame.from_buffer(self._buffer)
+            self._pointer = self._buffer.tell()
+        except (IncompleteFrame, EmptyBuffer):
+            self._buffer.seek(self._pointer, 0)
+            return None
+
+        return f
+
+    def __iter__(self):
+        """
+        Returns an iterator object.
+        """
+        return self
+
+    def __next__(self):
+        """
+        Return the next STOMP message in the buffer (supporting iteration).
+
+        @rtype: L{stomp.frame.Frame}
+        """
+        msg = self.extract_frame()
+        if not msg:
+            raise StopIteration()
+        return msg
+
+    def next(self):
+        return self.__next__()

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/six.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/util/six.py b/ambari-common/src/test/python/coilmq/util/six.py
new file mode 100644
index 0000000..0a381fd
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/util/six.py
@@ -0,0 +1,16 @@
+import sys
+
+PY3 = sys.version_info[0] == 3
+
+if PY3:
+  def b(s):
+    return s.encode("latin-1")
+  def u(s):
+    return s
+  binary_type = bytes
+else:
+  def b(s):
+    return s
+  def u(s):
+    return unicode(s.replace(r'\\', r'\\\\'), "unicode_escape")
+  binary_type = str
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6f9bfdd..f192895 100644
--- a/pom.xml
+++ b/pom.xml
@@ -297,6 +297,8 @@
 
             <!--Python Mock library (BSD license)-->
             <exclude>ambari-common/src/test/python/mock/**</exclude>
+            <!--Coilmq Mock library (Apache license)-->
+            <exclude>ambari-common/src/test/python/coilmq/**</exclude>
             <!--Jinja2 library (BSD license)-->
             <exclude>ambari-common/src/main/python/ambari_jinja2/**</exclude>
             <exclude>ambari-common/src/main/python/jinja2/**</exclude>