You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/04/01 07:08:00 UTC
[16/17] ambari git commit: AMBARI-20645. Integrate coilmq stomp
server as a mock server for ambari-agent unittests. (aonishuk)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/start.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/start.py b/ambari-common/src/test/python/coilmq/start.py
new file mode 100644
index 0000000..f894bab
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/start.py
@@ -0,0 +1,226 @@
+#!python
+"""
+Entrypoint for starting the application.
+"""
+import os
+import logging
+
+
+import time
+import threading
+from contextlib import contextmanager
+
+is_nt = os.name == 'nt'
+
+if not is_nt:
+ import daemon as pydaemon
+ import pid
+else:
+ pydaemon = pid = None
+
+import click
+
+from coilmq.config import config as global_config, init_config, init_logging, resolve_name
+from coilmq.protocol import STOMP11
+from coilmq.topic import TopicManager
+from coilmq.queue import QueueManager
+from coilmq.server.socket_server import ThreadedStompServer
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+logger = logging.getLogger(__name__)
+
+
+def server_from_config(config=None, server_class=None, additional_kwargs=None):
+ """
+ Gets a configured L{coilmq.server.StompServer} from specified config.
+
+ If `config` is None, global L{coilmq.config.config} var will be used instead.
+
+ The `server_class` and `additional_kwargs` are primarily hooks for using this method
+ from a testing environment.
+
+ @param config: A C{ConfigParser.ConfigParser} instance with loaded config values.
+ @type config: C{ConfigParser.ConfigParser}
+
+ @param server_class: Which class to use for the server. (This doesn't come from config currently.)
+ @type server_class: C{class}
+
+ @param additional_kwargs: Any additional args that should be passed to class.
+ @type additional_kwargs: C{list}
+
+ @return: The configured StompServer.
+ @rtype: L{coilmq.server.StompServer}
+ """
+ global global_config
+ if not config:
+ config = global_config
+
+ queue_store_factory = resolve_name(config.get('coilmq', 'qstore.factory'))
+ subscriber_scheduler_factory = resolve_name(config.get(
+ 'coilmq', 'scheduler.subscriber_priority_factory'))
+ queue_scheduler_factory = resolve_name(config.get(
+ 'coilmq', 'scheduler.queue_priority_factory'))
+
+ if config.has_option('coilmq', 'auth.factory'):
+ authenticator_factory = resolve_name(
+ config.get('coilmq', 'auth.factory'))
+ authenticator = authenticator_factory()
+ else:
+ authenticator = None
+
+ server = ThreadedStompServer((config.get('coilmq', 'listen_addr'), config.getint('coilmq', 'listen_port')),
+ queue_manager=QueueManager(store=queue_store_factory(),
+ subscriber_scheduler=subscriber_scheduler_factory(),
+ queue_scheduler=queue_scheduler_factory()),
+ topic_manager=TopicManager(),
+ authenticator=authenticator,
+ protocol=STOMP11)
+ logger.info("Created server:%r" % server)
+ return server
+
+
+def context_serve(context, configfile, listen_addr, listen_port, logfile,
+ debug, daemon, uid, gid, pidfile, umask, rundir):
+ """
+ Takes a context object, which implements the __enter__/__exit__ "with" interface
+ and starts a server within that context.
+
+ This method is a refactored single-place for handling the server-run code whether
+ running in daemon or non-daemon mode. It is invoked with a dummy (passthrough)
+ context object for the non-daemon use case.
+
+ @param options: The compiled collection of options that need to be parsed.
+ @type options: C{ConfigParser}
+
+ @param context: The context object that implements __enter__/__exit__ "with" methods.
+ @type context: C{object}
+
+ @raise Exception: Any underlying exception will be logged but then re-raised.
+ @see: server_from_config()
+ """
+ global global_config
+
+ server = None
+ try:
+ with context:
+ # There's a possibility here that init_logging() will throw an exception. If it does,
+ # AND we're in a daemon context, then we're not going to be able to do anything with it.
+ # We've got no stderr/stdout here; and so (to my knowledge) no reliable (& cross-platform),
+ # way to display errors.
+ level = logging.DEBUG if debug else logging.INFO
+ init_logging(logfile=logfile, loglevel=level,
+ configfile=configfile)
+
+ server = server_from_config()
+ logger.info("Stomp server listening on %s:%s" % server.server_address)
+
+ if debug:
+ poll_interval = float(global_config.get(
+ 'coilmq', 'debug.stats_poll_interval'))
+ if poll_interval: # Setting poll_interval to 0 effectively disables it.
+ def diagnostic_loop(server):
+ log = logger
+ while True:
+ log.debug(
+ "Stats heartbeat -------------------------------")
+ store = server.queue_manager.store
+ for dest in store.destinations():
+ log.debug("Queue %s: size=%s, subscribers=%s" % (
+ dest, store.size(dest), server.queue_manager.subscriber_count(dest)))
+
+ # TODO: Add number of subscribers?
+
+ time.sleep(poll_interval)
+
+ diagnostic_thread = threading.Thread(
+ target=diagnostic_loop, name='DiagnosticThread', args=(server,))
+ diagnostic_thread.daemon = True
+ diagnostic_thread.start()
+
+ server.serve_forever()
+
+ except (KeyboardInterrupt, SystemExit):
+ logger.info("Stomp server stopped by user interrupt.")
+ raise SystemExit()
+ except Exception as e:
+ logger.error("Stomp server stopped due to error: %s" % e)
+ logger.exception(e)
+ raise SystemExit()
+ finally:
+ if server:
+ server.server_close()
+
+
+def _main(config=None, host=None, port=None, logfile=None, debug=None,
+ daemon=None, uid=None, gid=None, pidfile=None, umask=None, rundir=None):
+
+ # Note that we must initialize the configuration before we enter the context
+ # block; however, we _cannot_ initialize logging until we are in the context block
+ # (so we defer that until the context_serve call.)
+ init_config(config)
+
+ if host is not None:
+ global_config.set('coilmq', 'listen_addr', host)
+
+ if port is not None:
+ global_config.set('coilmq', 'listen_port', str(port))
+
+ if daemon and is_nt:
+ warnings.warn("Daemon context is not available for NT platform")
+
+ # in an on-daemon mode, we use a dummy context objectx
+ # so we can use the same run-server code as the daemon version.
+ context = pydaemon.DaemonContext(uid=uid,
+ gid=gid,
+ pidfile=pid.PidFile(pidname=pidfile) if pidfile else None,
+ umask=int(umask, 8),
+ working_directory=rundir) if daemon and pydaemon else contextmanager(lambda: (yield))()
+
+ context_serve(context, config, host, port, logfile, debug, daemon, uid, gid, pidfile, umask, rundir)
+
+
+@click.command()
+@click.option("-c", "--config", help="Read configuration from FILE. (CLI options override config file.)", metavar="FILE")
+@click.option("-b", "--host", help="Listen on specified address (default 127.0.0.1)", metavar="ADDR")
+@click.option("-p", "--port", help="Listen on specified port (default 61613)", type=int, metavar="PORT")
+@click.option("-l", "--logfile", help="Log to specified file (unless logging configured in config file).", metavar="FILE")
+@click.option("--debug", default=False, help="Sets logging to debug (unless logging configured in config file).")
+@click.option("-d", "--daemon", default=False, help="Run server as a daemon (default False).")
+@click.option("-u", "--uid", help="The user/UID to use for daemon process.", metavar="UID")
+@click.option("-g", "--gid", help="The group/GID to use for daemon process.", metavar="GID")
+@click.option("--pidfile", help="The PID file to use.", metavar="FILE")
+@click.option("--umask", help="Umask (octal) to apply for daemonized process.", metavar="MASK")
+@click.option('--rundir', help="The working directory to use for the daemonized process (default /).", metavar="DIR")
+def main(config, host, port, logfile, debug, daemon, uid, gid, pidfile, umask, rundir):
+ """
+ Main entry point for running a socket server from the commandline.
+
+ This method will read in options from the commandline and call the L{config.init_config} method
+ to get everything setup. Then, depending on whether deamon mode was specified or not,
+ the process may be forked (or not) and the server will be started.
+ """
+
+ _main(**locals())
+
+
+if __name__ == '__main__':
+ try:
+ main()
+ except (KeyboardInterrupt, SystemExit):
+ pass
+ except Exception as e:
+ logger.error("Server terminated due to error: %s" % e)
+ logger.exception(e)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/__init__.py b/ambari-common/src/test/python/coilmq/store/__init__.py
new file mode 100644
index 0000000..a0daf9d
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/__init__.py
@@ -0,0 +1,189 @@
+"""
+Storage containers for durable queues and (planned) durable topics.
+"""
+import abc
+import logging
+import threading
+
+from coilmq.util.concurrency import synchronized
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+class QueueStore(object):
+ """
+ Abstract base class for queue storage.
+
+ Extensions/implementations of this class must be thread-safe.
+
+ @ivar log: A logger for this class.
+ @type log: C{logging.Logger}
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self):
+ """
+ A base constructor that sets up logging.
+
+ If you extend this class, you should either call this method or at minimum make sure these values
+ get set.
+ """
+ self.log = logging.getLogger('%s.%s' % (
+ self.__module__, self.__class__.__name__))
+
+ @abc.abstractmethod
+ @synchronized(lock)
+ def enqueue(self, destination, frame):
+ """
+ Store message (frame) for specified destinationination.
+
+ @param destination: The destinationination queue name for this message (frame).
+ @type destination: C{str}
+
+ @param frame: The message (frame) to send to specified destinationination.
+ @type frame: C{stompclient.frame.Frame}
+ """
+
+ @abc.abstractmethod
+ @synchronized(lock)
+ def dequeue(self, destination):
+ """
+ Removes and returns an item from the queue (or C{None} if no items in queue).
+
+ @param destination: The queue name (destinationination).
+ @type destination: C{str}
+
+ @return: The first frame in the specified queue, or C{None} if there are none.
+ @rtype: C{stompclient.frame.Frame}
+ """
+
+ @synchronized(lock)
+ def requeue(self, destination, frame):
+ """
+ Requeue a message (frame) for storing at specified destinationination.
+
+ @param destination: The destinationination queue name for this message (frame).
+ @type destination: C{str}
+
+ @param frame: The message (frame) to send to specified destinationination.
+ @type frame: C{stompclient.frame.Frame}
+ """
+ self.enqueue(destination, frame)
+
+ @synchronized(lock)
+ def size(self, destination):
+ """
+ Size of the queue for specified destination.
+
+ @param destination: The queue destination (e.g. /queue/foo)
+ @type destination: C{str}
+
+ @return: The number of frames in specified queue.
+ @rtype: C{int}
+ """
+ raise NotImplementedError()
+
+ @synchronized(lock)
+ def has_frames(self, destination):
+ """
+ Whether specified destination has any frames.
+
+ Default implementation uses L{QueueStore.size} to determine if there
+ are any frames in queue. Subclasses may choose to optimize this.
+
+ @param destination: The queue destination (e.g. /queue/foo)
+ @type destination: C{str}
+
+ @return: The number of frames in specified queue.
+ @rtype: C{int}
+ """
+ return self.size(destination) > 0
+
+ @synchronized(lock)
+ def destinations(self):
+ """
+ Provides a set of destinations (queue "addresses") available.
+
+ @return: A list of the detinations available.
+ @rtype: C{set}
+ """
+ raise NotImplementedError
+
+ @synchronized(lock)
+ def close(self):
+ """
+ May be implemented to perform any necessary cleanup operations when store is closed.
+ """
+ pass
+
+ # This is intentionally not synchronized, since it does not directly
+ # expose any shared data.
+ def frames(self, destination):
+ """
+ Returns an iterator for frames in specified queue.
+
+ The iterator simply wraps calls to L{dequeue} method, so the order of the
+ frames from the iterator will be the reverse of the order in which the
+ frames were enqueued.
+
+ @param destination: The queue destination (e.g. /queue/foo)
+ @type destination: C{str}
+ """
+ return QueueFrameIterator(self, destination)
+
+
+class QueueFrameIterator(object):
+ """
+ Provides an C{iterable} over the frames for a specified destination in a queue.
+
+ @ivar store: The queue store.
+ @type store: L{coilmq.store.QueueStore}
+
+ @ivar destination: The destination for this iterator.
+ @type destination: C{str}
+ """
+
+ def __init__(self, store, destination):
+ self.store = store
+ self.destination = destination
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ return self.__next__()
+
+ def __next__(self):
+ frame = self.store.dequeue(self.destination)
+ if not frame:
+ raise StopIteration()
+ return frame
+
+ def __len__(self):
+ return self.store.size(self.destination)
+
+
+class TopicStore(object):
+ """
+ Abstract base class for non-durable topic storage.
+ """
+
+
+class DurableTopicStore(TopicStore):
+ """
+ Abstract base class for durable topic storage.
+ """
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/dbm.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/dbm.py b/ambari-common/src/test/python/coilmq/store/dbm.py
new file mode 100644
index 0000000..ab4fb00
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/dbm.py
@@ -0,0 +1,262 @@
+"""
+Queue storage module that stores the queue information and frames in a DBM-style database.
+
+The current implementation uses the Python `shelve` module, which uses a DBM implementation
+under the hood (specifically the `anydbm` module, aka `dbm` in Python 3.x).
+
+Because of how the `shelve` module works (and how we're using it) and caveats in the Python
+documentation this is likely a BAD storage module to use if you are expecting to traffic in
+large frames.
+"""
+import threading
+import logging
+import os
+import os.path
+import shelve
+from collections import deque
+from datetime import datetime, timedelta
+
+try:
+ from configparser import ConfigParser
+except ImportError:
+ from ConfigParser import ConfigParser
+
+
+from coilmq.store import QueueStore
+from coilmq.config import config
+from coilmq.exception import ConfigError
+from coilmq.util.concurrency import synchronized
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+def make_dbm():
+ """
+ Creates a DBM queue store, pulling config values from the CoilMQ configuration.
+ """
+ try:
+ data_dir = config.get('coilmq', 'qstore.dbm.data_dir')
+ cp_ops = config.getint('coilmq', 'qstore.dbm.checkpoint_operations')
+ cp_timeout = config.getint('coilmq', 'qstore.dbm.checkpoint_timeout')
+ except ConfigParser.NoOptionError as e:
+ raise ConfigError('Missing configuration parameter: %s' % e)
+
+ if not os.path.exists(data_dir):
+ raise ConfigError('DBM directory does not exist: %s' % data_dir)
+ # FIXME: how do these get applied? Is OR appropriate?
+ if not os.access(data_dir, os.W_OK | os.R_OK):
+ raise ConfigError('Cannot read and write DBM directory: %s' % data_dir)
+
+ store = DbmQueue(data_dir, checkpoint_operations=cp_ops,
+ checkpoint_timeout=cp_timeout)
+ return store
+
+
+class DbmQueue(QueueStore):
+ """
+ A QueueStore implementation that stores messages and queue information in DBM-style
+ database.
+
+ Several database files will be used to support this functionality: metadata about the
+ queues will be stored in its own database and each queue will also have its own
+ database file.
+
+ This classes uses a C{threading.RLock} to guard access to the memory store, since it
+ appears that at least some of the underlying implementations that anydbm uses are not
+ thread-safe
+
+ Due to some impedence mismatch between the types of data we need to store in queues
+ (specifically lists) and the types of data that are best stored in DBM databases
+ (specifically dicts), this class uses the `shelve` module to abstract away some
+ of the ugliness. The consequence of this is that we only persist objects periodically
+ to the datastore, for performance reasons. How periodic is determined by the
+ `checkpoint_operations` and `checkpoint_timeout` instance variables (and params to
+ L{__init__}).
+
+ @ivar data_dir: The directory where DBM files will be stored.
+ @type data_dir: C{str}
+
+ @ivar queue_metadata: A Shelf (DBM) database that tracks stats & delivered message ids
+ for all the queues.
+ @ivar queue_metadata: C{shelve.Shelf}
+
+ @ivar frame_store: A Shelf (DBM) database that contains frame contents indexed by message id.
+ @type frame_store: C{shelve.Shelf}
+
+ @ivar _opcount: Internal counter for keeping track of unpersisted operations.
+ @type _opcount: C{int}
+
+ @ivar checkpoint_operations: Number of operations between syncs.
+ @type checkpoint_operations: C{int}
+
+ @ivar checkpoint_timeout: Max time (in seconds) that can elapse between sync of cache.
+ @type checkpoint_timeout: C{float}
+ """
+
+ def __init__(self, data_dir, checkpoint_operations=100, checkpoint_timeout=30):
+ """
+ @param data_dir: The directory where DBM files will be stored.
+ @param data_dir: C{str}
+
+ @param checkpoint_operations: Number of operations between syncs.
+ @type checkpoint_operations: C{int}
+
+ @param checkpoint_timeout: Max time (in seconds) that can elapse between sync of cache.
+ @type checkpoint_timeout: C{float}
+ """
+ QueueStore.__init__(self)
+
+ self._opcount = 0
+ self._last_sync = datetime.now()
+
+ self.data_dir = data_dir
+ self.checkpoint_operations = checkpoint_operations
+ self.checkpoint_timeout = timedelta(seconds=checkpoint_timeout)
+
+ # Should this be in constructor?
+
+ # The queue metadata stores mutable (dict) objects. For this reason we set
+ # writeback=True and rely on the sync() method to keep the cache & disk
+ # in-sync.
+ self.queue_metadata = shelve.open(os.path.join(
+ self.data_dir, 'metadata'), writeback=True)
+
+ # Since we do not need mutable objects on the frame stores (we don't modify them, we just
+ # put/get values), we do NOT use writeback=True here. This should also conserve on memory
+ # usage, since apparently that can get hefty with the caching when
+ # writeback=True.
+ self.frame_store = shelve.open(os.path.join(
+ self.data_dir, 'frames'), writeback=False)
+
+ @synchronized(lock)
+ def enqueue(self, destination, frame):
+ """
+ Store message (frame) for specified destinationination.
+
+ @param destination: The destinationination queue name for this message (frame).
+ @type destination: C{str}
+
+ @param frame: The message (frame) to send to specified destinationination.
+ @type frame: C{stompclient.frame.Frame}
+ """
+ message_id = frame.headers.get('message-id')
+ if not message_id:
+ raise ValueError("Cannot queue a frame without message-id set.")
+
+ if not destination in self.queue_metadata:
+ self.log.info(
+ "Destination %s not in metadata; creating new entry and queue database." % destination)
+ self.queue_metadata[destination] = {
+ 'frames': deque(), 'enqueued': 0, 'dequeued': 0, 'size': 0}
+
+ self.queue_metadata[destination]['frames'].appendleft(message_id)
+ self.queue_metadata[destination]['enqueued'] += 1
+
+ self.frame_store[message_id] = frame
+
+ self._opcount += 1
+ self._sync()
+
+ @synchronized(lock)
+ def dequeue(self, destination):
+ """
+ Removes and returns an item from the queue (or C{None} if no items in queue).
+
+ @param destination: The queue name (destinationination).
+ @type destination: C{str}
+
+ @return: The first frame in the specified queue, or C{None} if there are none.
+ @rtype: C{stompclient.frame.Frame}
+ """
+ if not self.has_frames(destination):
+ return None
+
+ message_id = self.queue_metadata[destination]['frames'].pop()
+ self.queue_metadata[destination]['dequeued'] += 1
+
+ frame = self.frame_store[message_id]
+ del self.frame_store[message_id]
+
+ self._opcount += 1
+ self._sync()
+
+ return frame
+
+ @synchronized(lock)
+ def has_frames(self, destination):
+ """
+ Whether specified queue has any frames.
+
+ @param destination: The queue name (destinationination).
+ @type destination: C{str}
+
+ @return: Whether there are any frames in the specified queue.
+ @rtype: C{bool}
+ """
+ return (destination in self.queue_metadata) and bool(self.queue_metadata[destination]['frames'])
+
+ @synchronized(lock)
+ def size(self, destination):
+ """
+ Size of the queue for specified destination.
+
+ @param destination: The queue destination (e.g. /queue/foo)
+ @type destination: C{str}
+
+ @return: The number of frames in specified queue.
+ @rtype: C{int}
+ """
+ if not destination in self.queue_metadata:
+ return 0
+ else:
+ return len(self.queue_metadata[destination]['frames'])
+
+ @synchronized(lock)
+ def close(self):
+ """
+ Closes the databases, freeing any resources (and flushing any unsaved changes to disk).
+ """
+ self.queue_metadata.close()
+ self.frame_store.close()
+
+ @synchronized(lock)
+ def destinations(self):
+ """
+ Provides a list of destinations (queue "addresses") available.
+
+ @return: A list of the detinations available.
+ @rtype: C{set}
+ """
+ return set(self.queue_metadata.keys())
+
+ def _sync(self):
+ """
+ Synchronize the cached data with the underlyind database.
+
+ Uses an internal transaction counter and compares to the checkpoint_operations
+ and checkpoint_timeout paramters to determine whether to persist the memory store.
+
+ In this implementation, this method wraps calls to C{shelve.Shelf#sync}.
+ """
+ if (self._opcount > self.checkpoint_operations or
+ datetime.now() > self._last_sync + self.checkpoint_timeout):
+ self.log.debug("Synchronizing queue metadata.")
+ self.queue_metadata.sync()
+ self._last_sync = datetime.now()
+ self._opcount = 0
+ else:
+ self.log.debug("NOT synchronizing queue metadata.")
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/memory.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/memory.py b/ambari-common/src/test/python/coilmq/store/memory.py
new file mode 100644
index 0000000..783967c
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/memory.py
@@ -0,0 +1,76 @@
+"""
+Queue storage module that uses thread-safe, in-memory data structures.
+"""
+import threading
+from collections import defaultdict, deque
+
+from coilmq.store import QueueStore
+from coilmq.util.concurrency import synchronized
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+class MemoryQueue(QueueStore):
+ """
+ A QueueStore implementation that stores messages in memory.
+
+ This classes uses a C{threading.RLock} to guard access to the memory store.
+ The locks on this class are probably excessive given that the
+ L{coilmq.queue.QueueManager} is already implementing coarse-grained locking
+ on the methods that access this storage backend. That said, we'll start
+ over-protective and refactor later it if proves unecessary.
+ """
+
+ def __init__(self):
+ QueueStore.__init__(self)
+ self._messages = defaultdict(deque)
+
+ @synchronized(lock)
+ def enqueue(self, destination, frame):
+ self._messages[destination].appendleft(frame)
+
+ @synchronized(lock)
+ def dequeue(self, destination):
+ try:
+ return self._messages[destination].pop()
+ except IndexError:
+ return None
+
+ @synchronized(lock)
+ def size(self, destination):
+ """
+ Size of the queue for specified destination.
+
+ @param destination: The queue destination (e.g. /queue/foo)
+ @type destination: C{str}
+ """
+ return len(self._messages[destination])
+
+ @synchronized(lock)
+ def has_frames(self, destination):
+ """ Whether this queue has frames for the specified destination. """
+ return bool(self._messages[destination])
+
+ @synchronized(lock)
+ def destinations(self):
+ """
+ Provides a list of destinations (queue "addresses") available.
+
+ @return: A list of the detinations available.
+ @rtype: C{set}
+ """
+ return set(self._messages.keys())
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/rds.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/rds.py b/ambari-common/src/test/python/coilmq/store/rds.py
new file mode 100644
index 0000000..f21bee3
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/rds.py
@@ -0,0 +1,69 @@
+try:
+ import redis
+except ImportError: # pragma: no cover
+ import sys; sys.exit('please, install redis-py package to use redis-store')
+import threading
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
+
+from coilmq.store import QueueStore
+from coilmq.util.concurrency import synchronized
+from coilmq.config import config
+
+__authors__ = ('"Hans Lellelid" <ha...@xmpl.org>', '"Alexander Zhukov" <zh...@gmail.com>')
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+def make_redis_store(cfg=None):
+ return RedisQueueStore(
+ redis_conn=redis.Redis(**dict((cfg or config).items('redis'))))
+
+
+class RedisQueueStore(QueueStore):
+ """Simple Queue with Redis Backend"""
+ def __init__(self, redis_conn=None):
+ """The default connection parameters are: host='localhost', port=6379, db=0"""
+ self.__db = redis_conn or redis.Redis()
+ # self.key = '{0}:{1}'.format(namespace, name)
+ super(RedisQueueStore, self).__init__()
+
+ @synchronized(lock)
+ def enqueue(self, destination, frame):
+ self.__db.rpush(destination, pickle.dumps(frame))
+
+ @synchronized(lock)
+ def dequeue(self, destination):
+ item = self.__db.lpop(destination)
+ if item:
+ return pickle.loads(item)
+
+ @synchronized(lock)
+ def requeue(self, destination, frame):
+ self.enqueue(destination, frame)
+
+ @synchronized(lock)
+ def size(self, destination):
+ return self.__db.llen(destination)
+
+ @synchronized(lock)
+ def has_frames(self, destination):
+ return self.size(destination) > 0
+
+ @synchronized(lock)
+ def destinations(self):
+ return self.__db.keys()
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/sa/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/sa/__init__.py b/ambari-common/src/test/python/coilmq/store/sa/__init__.py
new file mode 100644
index 0000000..77c6ce9
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/sa/__init__.py
@@ -0,0 +1,205 @@
+"""
+Queue storage module that uses SQLAlchemy to access queue information and frames in a database.
+
+
+"""
+import threading
+import logging
+import os
+import os.path
+import shelve
+from collections import deque
+from datetime import datetime, timedelta
+
+# try:
+# from configparser import ConfigParser
+# except ImportError:
+# from ConfigParser import ConfigParser
+
+from sqlalchemy import engine_from_config, MetaData
+from sqlalchemy.orm import scoped_session, sessionmaker
+from sqlalchemy.sql import select, func, distinct
+
+from coilmq.store import QueueStore
+from coilmq.config import config
+from coilmq.exception import ConfigError
+from coilmq.util.concurrency import synchronized
+from coilmq.store.sa import meta, model
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+def make_sa():
+ """
+ Factory to creates a SQLAlchemy queue store, pulling config values from the CoilMQ configuration.
+ """
+ configuration = dict(config.items('coilmq'))
+ engine = engine_from_config(configuration, 'qstore.sqlalchemy.')
+ init_model(engine)
+ store = SAQueue()
+ return store
+
+
+def init_model(engine, create=True, drop=False):
+ """
+ Initializes the shared SQLAlchemy state in the L{coilmq.store.sa.model} module.
+
+ @param engine: The SQLAlchemy engine instance.
+ @type engine: C{sqlalchemy.Engine}
+
+ @param create: Whether to create the tables (if they do not exist).
+ @type create: C{bool}
+
+ @param drop: Whether to drop the tables (if they exist).
+ @type drop: C{bool}
+ """
+ meta.engine = engine
+ meta.metadata = MetaData(bind=meta.engine)
+ meta.Session = scoped_session(sessionmaker(bind=meta.engine))
+ model.setup_tables(create=create, drop=drop)
+
+
+class SAQueue(QueueStore):
+ """
+ A QueueStore implementation that stores messages in a database and uses SQLAlchemy to interface
+ with the database.
+
+ Note that this implementation does not actually use the ORM capabilities of SQLAlchemy, but simply
+ uses SQLAlchemy for the DB abstraction for SQL building and DDL (table creation).
+
+ This L{coilmq.store.sa.model.setup_tables} function is used to actually define (& create) the
+ database tables. This class also depends on the L{init_model} method have been called to
+ define the L{coilmq.store.sa.model.Session} class-like callable (and the engine & metadata).
+
+ Finally, this class does not explicitly use shared data (db connections); a new Session is created
+ in each method. The actual implementation is handled using SQLAlchemy scoped sessions, which provide
+ thread-local Session class-like callables. As a result of deferring that to the SA layer, we don't
+ need to use synchronization locks to guard calls to the methods in this store implementation.
+ """
+
+ def enqueue(self, destination, frame):
+ """
+ Store message (frame) for specified destinationination.
+
+ @param destination: The destinationination queue name for this message (frame).
+ @type destination: C{str}
+
+ @param frame: The message (frame) to send to specified destinationination.
+ @type frame: C{stompclient.frame.Frame}
+ """
+ session = meta.Session()
+ message_id = frame.headers.get('message-id')
+ if not message_id:
+ raise ValueError("Cannot queue a frame without message-id set.")
+ ins = model.frames_table.insert().values(
+ message_id=message_id, destination=destination, frame=frame)
+ session.execute(ins)
+ session.commit()
+
+ def dequeue(self, destination):
+ """
+ Removes and returns an item from the queue (or C{None} if no items in queue).
+
+ @param destination: The queue name (destinationination).
+ @type destination: C{str}
+
+ @return: The first frame in the specified queue, or C{None} if there are none.
+ @rtype: C{stompclient.frame.Frame}
+ """
+ session = meta.Session()
+
+ try:
+
+ selstmt = select(
+ [model.frames_table.c.message_id, model.frames_table.c.frame])
+ selstmt = selstmt.where(
+ model.frames_table.c.destination == destination)
+ selstmt = selstmt.order_by(
+ model.frames_table.c.queued, model.frames_table.c.sequence)
+
+ result = session.execute(selstmt)
+
+ first = result.fetchone()
+ if not first:
+ return None
+
+ delstmt = model.frames_table.delete().where(model.frames_table.c.message_id ==
+ first[model.frames_table.c.message_id])
+ session.execute(delstmt)
+
+ frame = first[model.frames_table.c.frame]
+
+ except:
+ session.rollback()
+ raise
+ else:
+ session.commit()
+ return frame
+
+ def has_frames(self, destination):
+ """
+ Whether specified queue has any frames.
+
+ @param destination: The queue name (destinationination).
+ @type destination: C{str}
+
+ @return: Whether there are any frames in the specified queue.
+ @rtype: C{bool}
+ """
+ session = meta.Session()
+ sel = select([model.frames_table.c.message_id]).where(
+ model.frames_table.c.destination == destination)
+ result = session.execute(sel)
+
+ first = result.fetchone()
+ return first is not None
+
+ def size(self, destination):
+ """
+ Size of the queue for specified destination.
+
+ @param destination: The queue destination (e.g. /queue/foo)
+ @type destination: C{str}
+
+ @return: The number of frames in specified queue.
+ @rtype: C{int}
+ """
+ session = meta.Session()
+ sel = select([func.count(model.frames_table.c.message_id)]).where(
+ model.frames_table.c.destination == destination)
+ result = session.execute(sel)
+ first = result.fetchone()
+ if not first:
+ return 0
+ else:
+ return int(first[0])
+
+ def destinations(self):
+ """
+ Provides a list of destinations (queue "addresses") available.
+
+ @return: A list of the detinations available.
+ @rtype: C{set}
+ """
+ session = meta.Session()
+ sel = select([distinct(model.frames_table.c.destination)])
+ result = session.execute(sel)
+ return set([r[0] for r in result.fetchall()])
+
+ def close(self):
+ """
+ Closes the databases, freeing any resources (and flushing any unsaved changes to disk).
+ """
+ meta.Session.remove()
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/sa/meta.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/sa/meta.py b/ambari-common/src/test/python/coilmq/store/sa/meta.py
new file mode 100644
index 0000000..bf47ca9
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/sa/meta.py
@@ -0,0 +1,9 @@
+"""
+Module to hold shared SQLAlchemy state.
+
+These objects are set by the L{coilmq.store.sa.init_model} function.
+"""
+engine = None # : The SA engine
+Session = None # : The SA Session (or Session-like callable)
+# : The SA C{sqlalchemy.orm.MetaData} instance bound to the engine.
+metadata = None
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/sa/model.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/store/sa/model.py b/ambari-common/src/test/python/coilmq/store/sa/model.py
new file mode 100644
index 0000000..43e5009
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/store/sa/model.py
@@ -0,0 +1,53 @@
+"""
+Definition of the datamodel required for SA storage backend.
+"""
+
+from sqlalchemy import Table, Column, BigInteger, String, PickleType, DateTime
+from sqlalchemy.sql import func
+
+from coilmq.store.sa import meta
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+frames_table = None # : The C{sqlalchemy.Table} set by L{setup_tables}
+
+
+def setup_tables(create=True, drop=False):
+ """
+ Binds the model classes to registered metadata and engine and (potentially)
+ creates the db tables.
+
+ This function expects that you have bound the L{meta.metadata} and L{meta.engine}.
+
+ @param create: Whether to create the tables (if they do not exist).
+ @type create: C{bool}
+
+ @param drop: Whether to drop the tables (if they exist).
+ @type drop: C{bool}
+ """
+ global frames_table
+ frames_table = Table('frames', meta.metadata,
+ Column('message_id', String(255), primary_key=True),
+ Column('sequence', BigInteger,
+ primary_key=False, autoincrement=True),
+ Column('destination', String(255), index=True),
+ Column('frame', PickleType),
+ Column('queued', DateTime, default=func.now()))
+
+ if drop:
+ meta.metadata.drop_all()
+
+ if drop or create:
+ meta.metadata.create_all()
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/topic.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/topic.py b/ambari-common/src/test/python/coilmq/topic.py
new file mode 100644
index 0000000..d6436be
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/topic.py
@@ -0,0 +1,144 @@
+"""
+Non-durable topic support functionality.
+
+This code is inspired by the design of the Ruby stompserver project, by
+Patrick Hurley and Lionel Bouton. See http://stompserver.rubyforge.org/
+"""
+import logging
+import threading
+import uuid
+from collections import defaultdict
+
+from coilmq.util.concurrency import synchronized
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+class TopicManager(object):
+ """
+ Class that manages distribution of messages to topic subscribers.
+
+ This class uses C{threading.RLock} to guard the public methods. This is probably
+ a bit excessive, given 1) the actomic nature of basic C{dict} read/write operations
+ and 2) the fact that most of the internal data structures are keying off of the
+ STOMP connection, which is going to be thread-isolated. That said, this seems like
+ the technically correct approach and should increase the chance of this code being
+ portable to non-GIL systems.
+
+ @ivar _topics: A dict of registered topics, keyed by destination.
+ @type _topics: C{dict} of C{str} to C{set} of L{coilmq.server.StompConnection}
+ """
+
+ def __init__(self):
+ self.log = logging.getLogger(
+ '%s.%s' % (__name__, self.__class__.__name__))
+
+ # Lock var is required for L{synchornized} decorator.
+ self._lock = threading.RLock()
+
+ self._topics = defaultdict(set)
+
+ # TODO: If we want durable topics, we'll need a store for topics.
+
+ @synchronized(lock)
+ def close(self):
+ """
+ Closes all resources associated with this topic manager.
+
+ (Currently this is simply here for API conformity w/ L{coilmq.queue.QueueManager}.)
+ """
+ self.log.info("Shutting down topic manager.") # pragma: no cover
+
+ @synchronized(lock)
+ def subscribe(self, connection, destination):
+ """
+ Subscribes a connection to the specified topic destination.
+
+ @param connection: The client connection to subscribe.
+ @type connection: L{coilmq.server.StompConnection}
+
+ @param destination: The topic destination (e.g. '/topic/foo')
+ @type destination: C{str}
+ """
+ self.log.debug("Subscribing %s to %s" % (connection, destination))
+ self._topics[destination].add(connection)
+
+ @synchronized(lock)
+ def unsubscribe(self, connection, destination):
+ """
+ Unsubscribes a connection from the specified topic destination.
+
+ @param connection: The client connection to unsubscribe.
+ @type connection: L{coilmq.server.StompConnection}
+
+ @param destination: The topic destination (e.g. '/topic/foo')
+ @type destination: C{str}
+ """
+ self.log.debug("Unsubscribing %s from %s" % (connection, destination))
+ if connection in self._topics[destination]:
+ self._topics[destination].remove(connection)
+
+ if not self._topics[destination]:
+ del self._topics[destination]
+
+ @synchronized(lock)
+ def disconnect(self, connection):
+ """
+ Removes a subscriber connection.
+
+ @param connection: The client connection to unsubscribe.
+ @type connection: L{coilmq.server.StompConnection}
+ """
+ self.log.debug("Disconnecting %s" % connection)
+ for dest in list(self._topics.keys()):
+ if connection in self._topics[dest]:
+ self._topics[dest].remove(connection)
+ if not self._topics[dest]:
+ # This won't trigger RuntimeError, since we're using keys()
+ del self._topics[dest]
+
+ @synchronized(lock)
+ def send(self, message):
+ """
+ Sends a message to all subscribers of destination.
+
+ @param message: The message frame. (The frame will be modified to set command
+ to MESSAGE and set a message id.)
+ @type message: L{stompclient.frame.Frame}
+ """
+ dest = message.headers.get('destination')
+ if not dest:
+ raise ValueError(
+ "Cannot send frame with no destination: %s" % message)
+
+ message.cmd = 'message'
+
+ message.headers.setdefault('message-id', str(uuid.uuid4()))
+
+ bad_subscribers = set()
+ for subscriber in self._topics[dest]:
+ try:
+ subscriber.send_frame(message)
+ except:
+ self.log.exception(
+ "Error delivering message to subscriber %s; client will be disconnected." % subscriber)
+ # We queue for deletion so we are not modifying the topics dict
+ # while iterating over it.
+ bad_subscribers.add(subscriber)
+
+ for subscriber in bad_subscribers:
+ self.disconnect(subscriber)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/util/__init__.py b/ambari-common/src/test/python/coilmq/util/__init__.py
new file mode 100644
index 0000000..ae25f74
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/util/__init__.py
@@ -0,0 +1,16 @@
+"""
+CoilMQ utility modules.
+"""
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/concurrency.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/util/concurrency.py b/ambari-common/src/test/python/coilmq/util/concurrency.py
new file mode 100644
index 0000000..3eb539a
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/util/concurrency.py
@@ -0,0 +1,96 @@
+"""
+Tools to facilitate developing thread-safe components.
+"""
+
+import abc
+import threading
+import functools
+
+__authors__ = ['"Hans Lellelid" <ha...@xmpl.org>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+def synchronized(lock):
+ def synchronize(func):
+ """
+ Decorator to lock and unlock a method (Phillip J. Eby).
+
+ This function is to be used with object instance methods; the object must
+ have a _lock variable (of type C{threading.Lock} or C{threading.RLock}).
+
+ @param func: Method to decorate
+ @type func: C{callable}
+ """
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ with lock:
+ return func(*args, **kwargs)
+ return wrapper
+ return synchronize
+
+
+class CoilTimerBase(object):
+
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self):
+ self.jobs = []
+
+ def schedule(self, period, callback):
+ self.jobs.append((period, callback))
+
+ @abc.abstractmethod
+ def run(self):
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def start(self):
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def stop(self):
+ raise NotImplementedError
+
+ def __enter__(self):
+ self.start()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.stop()
+
+
+# TODO: check against following notes
+# <http://stackoverflow.com/questions/2124540/how-does-timer-in-python-work-regarding-mutlithreading>
+# <http://stackoverflow.com/questions/12435211/python-threading-timer-repeat-function-every-n-seconds>
+class CoilThreadingTimer(CoilTimerBase):
+
+ def __init__(self, *args, **kwargs):
+ super(CoilThreadingTimer, self).__init__(*args, **kwargs)
+ self._running = False
+
+ def run(self):
+ def run_job(interval, callback):
+ if self._running:
+ threading.Timer(interval, run_job, args=(interval, callback)).start()
+ callback()
+ for period, job in self.jobs:
+ run_job(period, job)
+
+ def start(self):
+ self._running = True
+ self.run()
+
+ def stop(self):
+ self._running = False
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/frames.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/util/frames.py b/ambari-common/src/test/python/coilmq/util/frames.py
new file mode 100644
index 0000000..3026ee0
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/util/frames.py
@@ -0,0 +1,359 @@
+from functools import partial
+import re
+import logging
+from collections import OrderedDict
+import io
+
+import six
+
+SEND = 'SEND'
+CONNECT = 'CONNECT'
+MESSAGE = 'MESSAGE'
+ERROR = 'ERROR'
+CONNECTED = 'CONNECTED'
+SUBSCRIBE = 'SUBSCRIBE'
+UNSUBSCRIBE = 'UNSUBSCRIBE'
+BEGIN = 'BEGIN'
+COMMIT = 'COMMIT'
+ABORT = 'ABORT'
+ACK = 'ACK'
+NACK = 'NACK'
+DISCONNECT = 'DISCONNECT'
+
+VALID_COMMANDS = ['message', 'connect', 'connected', 'error', 'send',
+ 'subscribe', 'unsubscribe', 'begin', 'commit', 'abort', 'ack', 'disconnect', 'nack']
+
+TEXT_PLAIN = 'text/plain'
+
+
+class IncompleteFrame(Exception):
+ """The frame has incomplete body"""
+
+
+class BodyNotTerminated(Exception):
+ """The frame's body is not terminated with the NULL character"""
+
+
+class EmptyBuffer(Exception):
+ """The buffer is empty"""
+
+
+def parse_headers(buff):
+ """
+ Parses buffer and returns command and headers as strings
+ """
+ preamble_lines = list(map(
+ lambda x: six.u(x).decode(),
+ iter(lambda: buff.readline().strip(), b''))
+ )
+ if not preamble_lines:
+ raise EmptyBuffer()
+ return preamble_lines[0], OrderedDict([l.split(':') for l in preamble_lines[1:]])
+
+
+def parse_body(buff, headers):
+ content_length = int(headers.get('content-length', -1))
+ body = buff.read(content_length)
+ if content_length >= 0:
+ if len(body) < content_length:
+ raise IncompleteFrame()
+ terminator = six.u(buff.read(1)).decode()
+ if not terminator:
+ raise BodyNotTerminated()
+ else:
+ # no content length
+ body, terminator, rest = body.partition(b'\x00')
+ if not terminator:
+ raise BodyNotTerminated()
+ else:
+ buff.seek(-len(rest), 2)
+
+ return body
+
+
+class Frame(object):
+ """
+ A STOMP frame (or message).
+
+ :param cmd: the protocol command
+ :param headers: a map of headers for the frame
+ :param body: the content of the frame.
+ """
+
+ def __init__(self, cmd, headers=None, body=None):
+ self.cmd = cmd
+ self.headers = headers or {}
+ self.body = body or ''
+
+ def __str__(self):
+ return '{{cmd={0},headers=[{1}],body={2}}}'.format(
+ self.cmd,
+ self.headers,
+ self.body if isinstance(
+ self.body, six.binary_type) else six.b(self.body)
+ )
+
+ def __eq__(self, other):
+ """ Override equality checking to test for matching command, headers, and body. """
+ return all([isinstance(other, Frame),
+ self.cmd == other.cmd,
+ self.headers == other.headers,
+ self.body == other.body])
+
+ @property
+ def transaction(self):
+ return self.headers.get('transaction')
+
+ @classmethod
+ def from_buffer(cls, buff):
+ cmd, headers = parse_headers(buff)
+ body = parse_body(buff, headers)
+ return cls(cmd, headers=headers, body=body)
+
+ def pack(self):
+ """
+ Create a string representation from object state.
+
+ @return: The string (bytes) for this stomp frame.
+ @rtype: C{str}
+ """
+
+ self.headers.setdefault('content-length', len(self.body))
+
+ # Convert and append any existing headers to a string as the
+ # protocol describes.
+ headerparts = ("{0}:{1}\n".format(key, value)
+ for key, value in self.headers.items())
+
+ # Frame is Command + Header + EOF marker.
+ return six.b("{0}\n{1}\n".format(self.cmd, "".join(headerparts))) + (self.body if isinstance(self.body, six.binary_type) else six.b(self.body)) + six.b('\x00')
+
+
+class ConnectedFrame(Frame):
+ """ A CONNECTED server frame (response to CONNECT).
+
+ @ivar session: The (throw-away) session ID to include in response.
+ @type session: C{str}
+ """
+
+ def __init__(self, session, extra_headers=None):
+ """
+ @param session: The (throw-away) session ID to include in response.
+ @type session: C{str}
+ """
+ super(ConnectedFrame, self).__init__(
+ cmd='connected', headers=extra_headers or {})
+ self.headers['session'] = session
+
+
+class HeaderValue(object):
+ """
+ An descriptor class that can be used when a calculated header value is needed.
+
+ This class is a descriptor, implementing __get__ to return the calculated value.
+ While according to U{http://docs.codehaus.org/display/STOMP/Character+Encoding} there
+ seems to some general idea about having UTF-8 as the character encoding for headers;
+ however the C{stomper} lib does not support this currently.
+
+ For example, to use this class to generate the content-length header:
+
+ >>> body = 'asdf'
+ >>> headers = {}
+ >>> headers['content-length'] = HeaderValue(calculator=lambda: len(body))
+ >>> str(headers['content-length'])
+ '4'
+
+ @ivar calc: The calculator function.
+ @type calc: C{callable}
+ """
+
+ def __init__(self, calculator):
+ """
+ @param calculator: The calculator callable that will yield the desired value.
+ @type calculator: C{callable}
+ """
+ if not callable(calculator):
+ raise ValueError("Non-callable param: %s" % calculator)
+ self.calc = calculator
+
+ def __get__(self, obj, objtype):
+ return self.calc()
+
+ def __str__(self):
+ return str(self.calc())
+
+ def __set__(self, obj, value):
+ self.calc = value
+
+ def __repr__(self):
+ return '<%s calculator=%s>' % (self.__class__.__name__, self.calc)
+
+
+class ErrorFrame(Frame):
+ """ An ERROR server frame. """
+
+ def __init__(self, message, body=None, extra_headers=None):
+ """
+ @param body: The message body bytes.
+ @type body: C{str}
+ """
+ super(ErrorFrame, self).__init__(cmd='error',
+ headers=extra_headers or {}, body=body)
+ self.headers['message'] = message
+ self.headers[
+ 'content-length'] = HeaderValue(calculator=lambda: len(self.body))
+
+ def __repr__(self):
+ return '<%s message=%r>' % (self.__class__.__name__, self.headers['message'])
+
+
+class ReceiptFrame(Frame):
+ """ A RECEIPT server frame. """
+
+ def __init__(self, receipt, extra_headers=None):
+ """
+ @param receipt: The receipt message ID.
+ @type receipt: C{str}
+ """
+ super(ReceiptFrame, self).__init__(
+ 'RECEIPT', headers=extra_headers or {})
+ self.headers['receipt-id'] = receipt
+
+
+class FrameBuffer(object):
+ """
+ A customized version of the StompBuffer class from Stomper project that returns frame objects
+ and supports iteration.
+
+ This version of the parser also assumes that stomp messages with no content-lengh
+ end in a simple \\x00 char, not \\x00\\n as is assumed by
+ C{stomper.stompbuffer.StompBuffer}. Additionally, this class differs from Stomper version
+ by conforming to PEP-8 coding style.
+
+ This class can be used to smooth over a transport that may provide partial frames (or
+ may provide multiple frames in one data buffer).
+
+ @ivar _buffer: The internal byte buffer.
+ @type _buffer: C{str}
+
+ @ivar debug: Log extra parsing debug (logs will be DEBUG level).
+ @type debug: C{bool}
+ """
+
+ # regexp to check that the buffer starts with a command.
+ command_re = re.compile('^(.+?)\n')
+
+ # regexp to remove everything up to and including the first
+ # instance of '\x00' (used in resynching the buffer).
+ sync_re = re.compile('^.*?\x00')
+
+ # regexp to determine the content length. The buffer should always start
+ # with a command followed by the headers, so the content-length header will
+ # always be preceded by a newline. It may not always proceeded by a
+ # newline, though!
+ content_length_re = re.compile('\ncontent-length\s*:\s*(\d+)\s*(\n|$)')
+
+ def __init__(self):
+ self._buffer = io.BytesIO()
+ self._pointer = 0
+ self.debug = False
+ self.log = logging.getLogger('%s.%s' % (
+ self.__module__, self.__class__.__name__))
+
+ def clear(self):
+ """
+ Clears (empties) the internal buffer.
+ """
+ self._buffer = io
+
+ def buffer_len(self):
+ """
+ @return: Number of bytes in the internal buffer.
+ @rtype: C{int}
+ """
+ return len(self._buffer)
+
+ def buffer_empty(self):
+ """
+ @return: C{True} if buffer is empty, C{False} otherwise.
+ @rtype: C{bool}
+ """
+ return not bool(self._buffer)
+
+ def append(self, data):
+ """
+ Appends bytes to the internal buffer (may or may not contain full stomp frames).
+
+ @param data: The bytes to append.
+ @type data: C{str}
+ """
+ self._buffer.write(data)
+
+ def extract_frame(self):
+ """
+ Pulls one complete frame off the buffer and returns it.
+
+ If there is no complete message in the buffer, returns None.
+
+ Note that the buffer can contain more than once message. You
+ should therefore call this method in a loop (or use iterator
+ functionality exposed by class) until None returned.
+
+ @return: The next complete frame in the buffer.
+ @rtype: L{stomp.frame.Frame}
+ """
+ # (mbytes, hbytes) = self._find_message_bytes(self.buffer)
+ # if not mbytes:
+ # return None
+ #
+ # msgdata = self.buffer[:mbytes]
+ # self.buffer = self.buffer[mbytes:]
+ # hdata = msgdata[:hbytes]
+ # # Strip off any leading whitespace from headers; this is necessary, because
+ # # we do not (any longer) expect a trailing \n after the \x00 byte (which means
+ # # it will become a leading \n to the next frame).
+ # hdata = hdata.lstrip()
+ # elems = hdata.split('\n')
+ # cmd = elems.pop(0)
+ # headers = {}
+ #
+ # for e in elems:
+ # try:
+ # (k,v) = e.split(':', 1) # header values may contain ':' so specify maxsplit
+ # except ValueError:
+ # continue
+ # headers[k.strip()] = v.strip()
+ #
+ # # hbytes points to the start of the '\n\n' at the end of the header,
+ # # so 2 bytes beyond this is the start of the body. The body EXCLUDES
+ # # the final byte, which is '\x00'.
+ # body = msgdata[hbytes + 2:-1]
+ self._buffer.seek(self._pointer, 0)
+ try:
+ f = Frame.from_buffer(self._buffer)
+ self._pointer = self._buffer.tell()
+ except (IncompleteFrame, EmptyBuffer):
+ self._buffer.seek(self._pointer, 0)
+ return None
+
+ return f
+
+ def __iter__(self):
+ """
+ Returns an iterator object.
+ """
+ return self
+
+ def __next__(self):
+ """
+ Return the next STOMP message in the buffer (supporting iteration).
+
+ @rtype: L{stomp.frame.Frame}
+ """
+ msg = self.extract_frame()
+ if not msg:
+ raise StopIteration()
+ return msg
+
+ def next(self):
+ return self.__next__()
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/six.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/util/six.py b/ambari-common/src/test/python/coilmq/util/six.py
new file mode 100644
index 0000000..0a381fd
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/util/six.py
@@ -0,0 +1,16 @@
+import sys
+
+PY3 = sys.version_info[0] == 3
+
+if PY3:
+ def b(s):
+ return s.encode("latin-1")
+ def u(s):
+ return s
+ binary_type = bytes
+else:
+ def b(s):
+ return s
+ def u(s):
+ return unicode(s.replace(r'\\', r'\\\\'), "unicode_escape")
+ binary_type = str
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6f9bfdd..f192895 100644
--- a/pom.xml
+++ b/pom.xml
@@ -297,6 +297,8 @@
<!--Python Mock library (BSD license)-->
<exclude>ambari-common/src/test/python/mock/**</exclude>
+ <!--Coilmq Mock library (Apache license)-->
+ <exclude>ambari-common/src/test/python/coilmq/**</exclude>
<!--Jinja2 library (BSD license)-->
<exclude>ambari-common/src/main/python/ambari_jinja2/**</exclude>
<exclude>ambari-common/src/main/python/jinja2/**</exclude>