You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sc...@apache.org on 2016/08/23 03:37:01 UTC
[1/4] airavata git commit: add simstream module to airavata
Repository: airavata
Updated Branches:
refs/heads/develop ef310d3ca -> ff18106ab
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/pikaproducer.py
----------------------------------------------------------------------
diff --git a/modules/simstream/simstream/pikaproducer.py b/modules/simstream/simstream/pikaproducer.py
new file mode 100755
index 0000000..6ffaf3d
--- /dev/null
+++ b/modules/simstream/simstream/pikaproducer.py
@@ -0,0 +1,202 @@
+"""
+Utilties for sending data.
+
+Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+import json
+import pika
+
+
+class PikaProducer(object):
+ """
+ Utility for sending job data to a set of endpoints.
+ """
+
+ def __init__(self, rabbitmq_url, exchange, exchange_type="direct", routing_keys=[]):
+ """
+ Instantiate a new PikaProducer.
+
+ Arguments:
+ rabbitmq_url -- the url of the RabbitMQ server to send to
+ exchange -- the name of the exchange to send to
+
+ Keyword Arguments:
+ exchange_type -- one of one of 'direct', 'topic', 'fanout', 'headers'
+ (default 'direct')
+ routing_key -- the routing keys to the endpoints for this producer
+ (default [])
+ """
+ self._url = rabbitmq_url
+ self._exchange = exchange
+ self._exchange_type = exchange_type
+ self._routing_keys = routing_keys
+
+ self._connection = None # RabbitMQ connection object
+ self._channel = None # RabbitMQ channel object
+
+ import random
+ self._name = random.randint(0,100)
+
+ def __call__(self, data):
+ """
+ Publish data to the RabbitMQ server.
+
+ Arguments:
+ data -- JSON serializable data to send
+ """
+ if self._connection is None: # Start the connection if it is inactive
+ self.start()
+ else: # Serialize and send the data
+ message = self.pack_data(data)
+ self.send_data(message)
+
+ def add_routing_key(self, key):
+ """
+ Add a new endpoint that will receive this data.
+
+ Arguments:
+ key -- the routing key for the new endpoint
+ """
+ if key not in self._routing_keys:
+ #print("Adding key %s to %s" % (key, self._name))
+ self._routing_keys.append(key)
+ #print(self._routing_keys)
+
+ def remove_routing_key(self, key):
+ """
+ Stop sending data to an existing endpoint.
+
+ Arguments:
+ key -- the routing key for the existing endpoint
+ """
+ try:
+ self._routing_keys.remove(key)
+ except ValueError:
+ pass
+
+ def pack_data(self, data):
+ """
+ JSON-serialize the data for transport.
+
+ Arguments:
+ data -- JSON-serializable data
+ """
+ try: # Generate a JSON string from the data
+ msg = json.dumps(data)
+ except TypeError as e: # Generate and return an error if serialization fails
+ msg = json.dumps({"err": str(e)})
+ finally:
+ return msg
+
+ def send_data(self, data):
+ """
+ Send the data to all active endpoints.
+
+ Arguments:
+ data -- the message to send
+ """
+ if self._channel is not None: # Make sure the connection is active
+ for key in self._routing_keys: # Send to all endpoints
+ #print(self._exchange, key, self._name)
+ self._channel.basic_publish(exchange = self._exchange,
+ routing_key=key,
+ body=data)
+
+ def start(self):
+ """
+ Open a connection if one does not exist.
+ """
+ print("Starting new connection")
+ if self._connection is None:
+ print("Creating connection object")
+ self._connection = pika.BlockingConnection(pika.URLParameters(self._url))
+ self._channel = self._connection.channel()
+ self._channel.exchange_declare(exchange=self._exchange,
+ type=self._exchange_type)
+
+ def shutdown(self):
+ """
+ Close an existing connection.
+ """
+ if self._channel is not None:
+ self._channel.close()
+
+ def _on_connection_open(self, unused_connection):
+ """
+ Create a new channel if the connection opens successful.
+
+ Arguments:
+ unused_connection -- a reference to self._connection
+ """
+ print("Connection is open")
+ self._connection.channel(on_open_callback=self._on_channel_open)
+
+ def _on_connection_close(self, connection, code, text):
+ """
+ Actions to take when the connection is closed for any reason.
+
+ Arguments:
+ connection -- the connection that was closed (same as self._connection)
+ code -- response code from the RabbitMQ server
+ text -- response body from the RabbitMQ server
+ """
+ print("Connection is closed")
+ self._channel = None
+ self._connection = None
+
+ def _on_channel_open(self, channel):
+ """
+ Actions to take when the channel opens.
+
+ Arguments:
+ channel -- the newly opened channel
+ """
+ print("Channel is open")
+ self._channel = channel
+ self._channel.add_on_close_callback(self._on_channel_close)
+ self._declare_exchange()
+
+ def _on_channel_close(self, channel, code, text):
+ """
+ Actions to take when the channel closes for any reason.
+
+ Arguments:
+ channel -- the channel that was closed (same as self._channel)
+ code -- response code from the RabbitMQ server
+ text -- response body from the RabbitMQ server
+ """
+ print("Channel is closed")
+ self._connection.close()
+
+ def _declare_exchange(self):
+ """
+ Set up the exchange to publish to even if it already exists.
+ """
+ print("Exchange is declared")
+ self._channel.exchange_declare(exchange=self._exchange,
+ type=self.exchange_type)
+
+if __name__ == "__main__":
+ import time
+
+ config = {
+ "url": "amqp://guest:guest@localhost:5672",
+ "exchange": "simstream",
+ "routing_key": "test_consumer",
+ "exchange_type": "topic"
+ }
+
+ producer = PikaProducer(config["url"],
+ config["exchange"],
+ exchange_type=config["exchange_type"],
+ routing_keys=[config["routing_key"]])
+ producer.start()
+
+ while True:
+ try:
+ time.sleep(5)
+ data = str(time.time()) + ": Hello SimStream"
+ producer.send_data(data)
+ except KeyboardInterrupt:
+ producer.shutdown()
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/simstream.py
----------------------------------------------------------------------
diff --git a/modules/simstream/simstream/simstream.py b/modules/simstream/simstream/simstream.py
new file mode 100755
index 0000000..499a8c3
--- /dev/null
+++ b/modules/simstream/simstream/simstream.py
@@ -0,0 +1,167 @@
+import pika
+
+from .pikaasyncconsumer import PikaAsyncConsumer
+from .datacollector import DataCollector
+from .datareporter import DataReporter
+from .eventhandler import EventHandler
+from .eventmonitor import EventMonitor
+
+
+class ReporterExistsException(Exception):
+ """Thrown when attempting to add a DataReporter with a conflicting name"""
+ pass
+
+
+class SimStream(object):
+ """
+ Manager for routing messages to their correct reporter.
+ """
+
+ DEFAULT_CONFIG_PATH="simstream.cnf"
+
+
+ class MessageParser(object):
+ """
+ Internal message parsing facilities.
+ """
+
+ def __init__(self):
+ self.parsed = None
+
+ def __call__(self, message):
+ pass
+
+
+ def __init__(self, reporters={}, config={}):
+ self.reporters = reporters
+ self.consumer = None
+ self.config = config
+
+ def add_data_reporter(self, reporter):
+ """
+ Add a new DataReporter object.
+
+ Arguments:
+ reporter -- the DataReporter to add
+ """
+ if reporter.name in self.reporters:
+ raise ReporterExistsException
+ self.reporters[reporter.name] = reporter
+
+ def parse_config(self):
+ """
+ Read the config file and set up the specified, data collection and
+ event handling resources.
+ """
+ # TODO: Read in config
+ # TODO: Set up configuration dict
+ pass
+
+ def route_message(self, message):
+ """
+ Send a message to the correct reporter.
+ """
+ # TODO: Create new MessageParser
+ # TODO: Run message through MessageParser
+ # TODO: Route message to the correct DataReporter/EventMonitor
+ parser = MessageParser()
+ parser(message)
+ if parser.reporter_name in self.reporters:
+ self.reporters[parser.reporter_name].start_streaming(
+ parser.collector_name,
+ parser.routing_key
+ )
+
+ def start_collecting(self):
+ """
+ Begin collecting data and monitoring for events.
+ """
+ for reporter in self.reporters:
+ self.reporters[reporter].start_collecting()
+
+ def setup(self):
+ """
+ Set up the SimStream instance: create DataCollectors, create
+ EventMonitors, configure AMQP consumer.
+ """
+ self.parse_config()
+ #self.setup_consumer()
+ self.setup_data_collection()
+ self.setup_event_monitoring()
+
+ def setup_data_collection(self):
+ """
+ Set up all DataReporters and DataCollectors.
+ """
+ # TODO: Create and configure all DataReporters
+ # TODO: Create and configure all DataCollectors
+ # TODO: Assign each DataCollector to the correct DataReporter
+ if "reporters" in self.config:
+ for reporter in self.config.reporters:
+ pass
+ for collector in self.config.collectors:
+ pass
+
+ def setup_event_monitoring(self):
+ #TODO: Create and configure all EventMonitors
+ #TODO: Create and configure all EventHandlers
+ #TODO: Assign each EventHandler to the correct EventMonitor
+ #TODO: Assign each EventMonitor to the correct DataCollector
+ pass
+
+ def setup_consumer(self):
+ """
+ Set up and configure the consumer.
+ """
+ if len(self.config) > 0 and self.consumer is None:
+ if "message_handler" in self.config:
+ message_handler = self.config["message_handler"]
+ else:
+ message_handler = self.route_message
+ self.consumer = PikaAsyncConsumer(self.config["url"],
+ self.config["exchange"],
+ self.config["queue"],
+ message_handler,
+ exchange_type=self.config["exchange_type"],
+ routing_key=self.config["routing_key"]
+ )
+
+ def start(self):
+ """
+ Configure and start SimStream.
+ """
+ if self.consumer is None:
+ self.setup()
+ self.start_collecting()
+ #self.consumer.start()
+
+ def stop(self):
+ """
+ Stop all data collection, event monitoring, and message consumption.
+ """
+ self.consumer.stop()
+ self.stop_collecting()
+
+
+if __name__ == "__main__":
+ def print_message(message):
+ with open("test.out", "w") as f:
+ print(message)
+
+ print(SimStream.DEFAULT_CONFIG_PATH)
+
+ config = {
+ "url": "amqp://guest:guest@localhost:5672",
+ "exchange": "simstream",
+ "queue": "simstream_test",
+ "message_handler": print_message,
+ "routing_key": "test_consumer",
+ "exchange_type": "topic"
+ }
+
+ streamer = SimStream(config=config)
+
+ try:
+ streamer.start()
+ except KeyboardInterrupt:
+ streamer.stop()
[3/4] airavata git commit: Addition of AMQPWSTunnel as an Airavata
module
Posted by sc...@apache.org.
Addition of AMQPWSTunnel as an Airavata module
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7b14e0fa
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7b14e0fa
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7b14e0fa
Branch: refs/heads/develop
Commit: 7b14e0fa95e22db5ad20a2f93273195339d628ed
Parents: 24ced80
Author: Jeff Kinnison <je...@gmail.com>
Authored: Fri Aug 19 17:25:53 2016 -0400
Committer: Jeff Kinnison <je...@gmail.com>
Committed: Fri Aug 19 17:25:53 2016 -0400
----------------------------------------------------------------------
modules/amqpwstunnel/python/amqpwstunnel.py | 583 +++++++++++++++++++++++
modules/amqpwstunnel/python/config.json | 10 +
modules/amqpwstunnel/wstest.html | 157 ++++++
3 files changed, 750 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b14e0fa/modules/amqpwstunnel/python/amqpwstunnel.py
----------------------------------------------------------------------
diff --git a/modules/amqpwstunnel/python/amqpwstunnel.py b/modules/amqpwstunnel/python/amqpwstunnel.py
new file mode 100644
index 0000000..af5d68a
--- /dev/null
+++ b/modules/amqpwstunnel/python/amqpwstunnel.py
@@ -0,0 +1,583 @@
+import argparse
+import base64
+import functools
+import json
+import sys
+import uuid
+import weakref
+
+from threading import Thread, Lock
+
+try:
+ from urllib.parse import urlencode
+except ImportError:
+ from urllib import urlencode
+
+import pika
+import tornado.websocket
+import tornado.ioloop
+import tornado.auth
+import tornado.escape
+import tornado.concurrent
+
+
+SETTINGS = {}
+
+
+class Error(Exception):
+ """Base error class for exceptions in this module"""
+ pass
+
+class ConsumerConfigError(Error):
+ """Raised when an issue with consumer configuration occurs"""
+ def __init__(self, message):
+ self.message = message
+
+class ConsumerKeyError(Error):
+ def __init__(self, message, key):
+ self.message = message
+ self.key = key
+
+class AuthError(Error):
+ """Raised when something went wrong during authentication"""
+ def __init__(self, error, code):
+ self.message = error
+ self.code = code
+
+
+
+class PikaAsyncConsumer(Thread):
+
+ """
+ The primary entry point for routing incoming messages to the proper handler.
+
+ """
+
+ def __init__(self, rabbitmq_url, exchange_name, queue_name,
+ exchange_type="direct", routing_key="#"):
+ """
+ Create a new instance of Streamer.
+
+ Arguments:
+ rabbitmq_url -- URL to RabbitMQ server
+ exchange_name -- name of RabbitMQ exchange to join
+ queue_name -- name of RabbitMQ queue to join
+
+ Keyword Arguments:
+ exchange_type -- one of 'direct', 'topic', 'fanout', 'headers'
+ (default 'direct')
+ routing_keys -- the routing key that this consumer listens for
+ (default '#', receives all messages)
+
+ """
+ print("Creating new consumer")
+ super(PikaAsyncConsumer, self).__init__(daemon=True)
+ self._connection = None
+ self._channel = None
+ self._shut_down = False
+ self._consumer_tag = None
+ self._url = rabbitmq_url
+ self._client_list = []
+ self._lock = Lock()
+
+ # The following are necessary to guarantee that both the RabbitMQ
+ # server and Streamer know where to look for messages. These names will
+ # be decided before dispatch and should be recorded in a config file or
+ # else on a per-job basis.
+ self._exchange = exchange_name
+ self._exchange_type = exchange_type
+ self._queue = queue_name
+ self._routing_key = routing_key
+
+ def add_client(self, client):
+ """Add a new client to the recipient list.
+
+ Arguments:
+ client -- a reference to the client object to add
+ """
+ self._lock.acquire()
+ # Create a weakref to ensure that cyclic references to WebSocketHandler
+ # objects do not cause problems for garbage collection
+ self._client_list.append(weakref.ref(client))
+ self._lock.release()
+
+ def remove_client(self, client):
+ """Remove a client from the recipient list.
+
+ Arguments:
+ client -- a reference to the client object to remove
+ """
+ self._lock.acquire()
+ for i in range(0, len(self._client_list)):
+ # Parentheses after _client_list[i] to deference the weakref to its
+ # strong reference
+ if self._client_list[i]() is client:
+ self._client_list.pop(i)
+ break
+ self._lock.release()
+
+
+ def connect(self):
+ """
+ Create an asynchronous connection to the RabbitMQ server at URL.
+
+ """
+ return pika.SelectConnection(pika.URLParameters(self._url),
+ on_open_callback=self.on_connection_open,
+ on_close_callback=self.on_connection_close,
+ stop_ioloop_on_close=False)
+
+ def on_connection_open(self, unused_connection):
+ """
+ Actions to perform when the connection opens. This may not happen
+ immediately, so defer action to this callback.
+
+ Arguments:
+ unused_connection -- the created connection (by this point already
+ available as self._connection)
+
+ """
+ self._connection.channel(on_open_callback=self.on_channel_open)
+
+ def on_connection_close(self, connection, code, text):
+ """
+ Actions to perform when the connection is unexpectedly closed by the
+ RabbitMQ server.
+
+ Arguments:
+ connection -- the connection that was closed (same as self._connection)
+ code -- response code from the RabbitMQ server
+ text -- response body from the RabbitMQ server
+
+ """
+ self._channel = None
+ if self._shut_down:
+ self._connection.ioloop.stop()
+ else:
+ self._connection.add_timeout(5, self.reconnect)
+
+ def reconnect(self):
+ """
+ Attempt to reestablish a connection with the RabbitMQ server.
+ """
+ self._connection.ioloop.stop() # Stop the ioloop to completely close
+
+ if not self._shut_down: # Connect and restart the ioloop
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+
+ def on_channel_open(self, channel):
+ """
+ Store the opened channel for future use and set up the exchange and
+ queue to be used.
+
+ Arguments:
+ channel -- the Channel instance opened by the Channel.Open RPC
+ """
+ self._channel = channel
+ self._channel.add_on_close_callback(self.on_channel_close)
+ self.declare_exchange()
+
+
+ def on_channel_close(self, channel, code, text):
+ """
+ Actions to perform when the channel is unexpectedly closed by the
+ RabbitMQ server.
+
+ Arguments:
+ connection -- the connection that was closed (same as self._connection)
+ code -- response code from the RabbitMQ server
+ text -- response body from the RabbitMQ server
+ """
+ self._connection.close()
+
+ def declare_exchange(self):
+ """
+ Set up the exchange that will route messages to this consumer. Each
+ RabbitMQ exchange is uniquely identified by its name, so it does not
+ matter if the exchange has already been declared.
+ """
+ self._channel.exchange_declare(self.declare_exchange_success,
+ self._exchange,
+ self._exchange_type)
+
+ def declare_exchange_success(self, unused_connection):
+ """
+ Actions to perform on successful exchange declaration.
+ """
+ self.declare_queue()
+
+ def declare_queue(self):
+ """
+ Set up the queue that will route messages to this consumer. Each
+ RabbitMQ queue can be defined with routing keys to use only one
+ queue for multiple jobs.
+ """
+ self._channel.queue_declare(self.declare_queue_success,
+ self._queue)
+
+ def declare_queue_success(self, method_frame):
+ """
+ Actions to perform on successful queue declaration.
+ """
+ self._channel.queue_bind(self.munch,
+ self._queue,
+ self._exchange,
+ self._routing_key
+ )
+
+ def munch(self, unused):
+ """
+ Begin consuming messages from the Airavata API server.
+ """
+ self._channel.add_on_cancel_callback(self.cancel_channel)
+ self._consumer_tag = self._channel.basic_consume(self._process_message)
+
+ def cancel_channel(self, method_frame):
+ if self._channel is not None:
+ self._channel._close()
+
+ def _process_message(self, ch, method, properties, body):
+ """
+ Receive and verify a message, then pass it to the router.
+
+ Arguments:
+ ch -- the channel that routed the message
+ method -- delivery information
+ properties -- message properties
+ body -- the message
+ """
+ print("Received Message: %s" % body)
+ self._lock.acquire()
+ for client in self._client_list:
+ # Parentheses after client to deference the weakref to its
+ # strong reference
+ client().write_message(body)
+ self._lock.release()
+ self._channel.basic_ack(delivery_tag=method.delivery_tag)
+
+ def stop_consuming(self):
+ """
+ Stop the consumer if active.
+ """
+ if self._channel:
+ self._channel.basic_cancel(self.close_channel, self._consumer_tag)
+
+ def close_channel(self, unused):
+ """
+ Close the channel to shut down the consumer and connection.
+ """
+ self._channel.queue_delete(queue=self._queue)
+ self._channel.close()
+
+ def run(self):
+ """
+ Start a connection with the RabbitMQ server.
+ """
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+
+ def stop(self):
+ """
+ Stop an active connection with the RabbitMQ server.
+ """
+ self._closing = True
+ self.stop_consuming()
+
+
+class Wso2OAuth2Mixin(tornado.auth.OAuth2Mixin):
+ _OAUTH_AUTHORIZE_URL = "https://idp.scigap.org:9443/oauth2/authorize"
+ _OAUTH_ACCESS_TOKEN_URL = "https://idp.scigap.org:9443/oauth2/token"
+
+ @tornado.auth._auth_return_future
+ def get_authenticated_user(self, username, password, callback=None):
+ print("Authenticating user %s" % (username))
+ http = self.get_auth_http_client()
+ body = urlencode({
+ "client_id": SETTINGS["oauth_client_key"],
+ "client_secret": SETTINGS["oauth_client_secret"],
+ "grant_type": SETTINGS["oauth_grant_type"],
+ "username": username,
+ "password": password
+ })
+ http.fetch(self._OAUTH_ACCESS_TOKEN_URL, functools.partial(self._on_access_token, callback), method="POST", body=body)
+
+ def _on_access_token(self, future, response):
+ if response.error:
+ print(str(response))
+ print(response.body)
+ print(response.error)
+ future.set_exception(AuthError(response.error, response.code))
+ return
+
+ print(response.body)
+ future.set_result(tornado.escape.json_decode(response.body))
+
+class AuthHandler(tornado.web.RequestHandler, Wso2OAuth2Mixin):
+ def get_current_user(self):
+ expires_in = self.get_secure_cookie("expires-in", max_age_days=SETTINGS['maximum_cookie_age'])
+ print(expires_in)
+ if expires_in:
+ return self.get_secure_cookie("ws-auth-token", max_age_days=float(expires_in))
+ return None
+
+ def set_default_headers(self):
+ self.set_header("Content-Type", "text/plain")
+ self.set_header("Access-Control-Allow-Origin", "*")
+ self.set_header("Access-Control-Allow-Headers", "x-requested-with")
+ self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
+
+ def get(self):
+ if self.get_current_user():
+ self.set_status(200)
+ print("Authenticated")
+ self.write("Authenticated")
+
+ else:
+ self.set_status(403)
+ print("Not Authenticated")
+ self.write("Not Authenticated")
+
+ @tornado.gen.coroutine
+ def post(self):
+ try:
+ username = self.get_body_argument("username")
+ password = self.get_body_argument("password")
+ redirect = self.get_body_argument("redirect")
+ if username == "" or password == "":
+ raise tornado.web.MissingArgumentError
+
+ access = yield self.get_authenticated_user(username, password)
+ days = (access["expires_in"] / 3600) / 24 # Convert to days
+ print(days)
+ self.set_secure_cookie("ws-auth-token",
+ access["access_token"],
+ expires_days=days)
+ self.set_secure_cookie("expires-in",
+ str(1),
+ expires_days=SETTINGS['maximum_cookie_age'])
+ self.write("Success")
+ except tornado.web.MissingArgumentError:
+ print("Missing an argument")
+ self.set_status(400)
+ self.write("Authentication information missing")
+ except AuthError as e:
+ print("The future freaks me out")
+ self.set_status(access.code)
+ self.set_header("Content-Type", "text/html")
+ self.write(access.message)
+
+ success_code = """<p>Redirecting to <a href="%(url)s">%(url)s</a></p>
+<script type="text/javascript">
+window.location = %(url)s;
+</script>
+ """ % { 'url': redirect}
+ self.set_status(200)
+ self.redirect(redirect)
+ #return self.render_string(success_code)
+
+
+
+class AMQPWSHandler(tornado.websocket.WebSocketHandler):#, Wso2OAuth2Mixin):
+
+ """
+ Pass messages to a connected WebSockets client.
+
+ A subclass of the Tornado WebSocketHandler class, this class takes no
+ action when receiving a message from the client. Instead, it is associated
+ with an AMQP consumer and writes a message to the client each time one is
+ consumed in the queue.
+ """
+
+ # def set_default_headers(self):
+ # self.set_header("Access-Control-Allow-Origin", "*")
+ # self.set_header("Access-Control-Allow-Headers", "x-requested-with")
+ # self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
+
+ def check_origin(self, origin):
+ """Check the domain origin of the connection request.
+
+ This can be made more robust to ensure that connections are only
+ accepted from verified PGAs.
+
+ Arguments:
+ origin -- the value of the Origin HTTP header
+ """
+ return True
+
+ def open(self, resource_type, resource_id):
+ """Associate a new connection with a consumer.
+
+ When a new connection is opened, it is a request to retrieve data
+ from an AMQP queue. The open operation should also do some kind of
+ authentication.
+
+ Arguments:
+ resource_type -- "experiment" or "project" or "data"
+ resource_id -- the Airavata id for the resource
+ """
+ self.stream.set_nodelay(True)
+ self.resource_id = resource_id
+ self.write_message("Opened the connection")
+
+ self.add_to_consumer()
+
+ # expires_in = self.get_secure_cookie("expires_in", max_age_days=SETTINGS["maximum_cookie_age"])
+ # if expires_in is not None and self.get_secure_cookie("ws-auth-token", max_age_days=float(expires_in)):
+ # print("Found secure cookie")
+ # self.write_message("Authenticated")
+ # self.add_to_consumer()
+ # else:
+ # print("Closing connection")
+ # self.close()
+
+ def on_message(self, message):
+ """Handle incoming messages from the client.
+
+ Tornado requires subclasses to override this method, however in this
+ case we do not wish to take any action when receiving a message from
+ the client. The purpose of this class is only to push messages to the
+ client.
+ """
+ print(message)
+ message = tornado.escape.json_decode(message)
+ access = yield self.get_authenticated_user(message["username"], message["password"])
+ access = access
+ days = (access["expires_in"] / 3600) / 24 # Convert to days
+ print(days)
+ self.set_secure_cookie("ws-auth-token",
+ access["access_token"],
+ expires_days=days)
+ self.set_secure_cookie("expires_in",
+ str(days),
+ expires_days=SETTINGS['maximum_cookie_age'])
+
+
+ def on_close(self):
+ try:
+ print("Closing connection")
+ self.application.remove_client_from_consumer(self.resource_id, self)
+ except KeyError:
+ print("Error: resource %s does not exist" % self.resource_id)
+ finally:
+ self.close()
+
+ def add_to_consumer(self):
+ try:
+ self.application.add_client_to_consumer(self.resource_id, self)
+ except AttributeError as e:
+ print("Error: tornado.web.Application object is not AMQPWSTunnel")
+ print(e)
+
+
+class AMQPWSTunnel(tornado.web.Application):
+
+ """
+ Send messages from an AMQP queue to WebSockets clients.
+
+ In addition to the standard Tornado Application class functionality, this
+ class maintains a list of active AMQP consumers and maps WebSocketHandlers
+ to the correct consumers.
+ """
+
+ def __init__(self, consumer_list=None, consumer_config=None, handlers=None,
+ default_host='', transforms=None, **settings):
+ print("Starting AMQP-WS-Tunnel application")
+ super(AMQPWSTunnel, self).__init__(handlers=handlers,
+ default_host=default_host,
+ transforms=transforms,
+ **settings)
+
+ self.consumer_list = {} if consumer_list is None else consumer_list
+ if consumer_config is None:
+ raise ConsumerConfigError("No consumer configuration provided")
+ self.consumer_config = consumer_config
+
+ def consumer_exists(self, resource_id):
+ """Determine if a consumer exists for a particular resource.
+
+ Arguments:
+ resource_id -- the consumer to find
+ """
+ return resource_id in self.consumer_list
+
+ def add_client_to_consumer(self, resource_id, client):
+ """Add a new client to a consumer's messaging list.
+
+ Arguments:
+ resource_id -- the consumer to add to
+ client -- the client to add
+ """
+ if not self.consumer_exists(resource_id):
+ print("Creating new consumer")
+ print(self.consumer_config)
+ consumer = PikaAsyncConsumer(self.consumer_config["rabbitmq_url"],
+ self.consumer_config["exchange_name"],
+ self.consumer_config["queue_name"],
+ exchange_type=self.consumer_config["exchange_type"],
+ routing_key=resource_id)
+ print("Adding to consumer list")
+ self.consumer_list[resource_id] = consumer
+ print("Starting consumer")
+ consumer.start()
+
+ print("Adding new client to %s" % (resource_id))
+ consumer = self.consumer_list[resource_id]
+ consumer.add_client(client)
+
+ def remove_client_from_consumer(self, resource_id, client):
+ """Remove a client from a consumer's messaging list.
+
+ Arguments:
+ resource_id -- the consumer to remove from
+ client -- the client to remove
+ """
+ if self.consumer_exists(resource_id):
+ print("Removing client from %s" % (resource_id))
+ self.consumer_list[resource_id].remove_client(client)
+ #else:
+ # raise ConsumerKeyError("Trying to remove client from nonexistent consumer", resource_id)
+
+ def shutdown(self):
+ """Shut down the application and release all resources.
+
+
+ """
+ for name, consumer in self.consumer_list.items():
+ consumer.stop()
+ #consumer.join()
+ #self.consumer_list[name] = None
+
+ #self.consumer_list = {}
+
+
+
+if __name__ == "__main__":
+ i = open(sys.argv[1])
+ config = json.load(i)
+ i.close()
+
+ SETTINGS["oauth_client_key"] = config["oauth_client_key"]
+ SETTINGS["oauth_client_secret"] = config["oauth_client_secret"]
+ SETTINGS["oauth_grant_type"] = config["oauth_grant_type"]
+ SETTINGS["maximum_cookie_age"] = config["maximum_cookie_age"]
+
+ settings = {
+ "cookie_secret": base64.b64encode(uuid.uuid4().bytes + uuid.uuid4().bytes),
+ #"xsrf_cookies": True
+ }
+
+ application = AMQPWSTunnel(handlers=[
+ (r"/auth", AuthHandler),
+ (r"/(experiment)/(.+)", AMQPWSHandler)
+ ],
+ consumer_config=config,
+ debug=True,
+ **settings)
+
+ application.listen(8888)
+
+ try:
+ tornado.ioloop.IOLoop.current().start()
+ except KeyboardInterrupt:
+ application.shutdown()
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b14e0fa/modules/amqpwstunnel/python/config.json
----------------------------------------------------------------------
diff --git a/modules/amqpwstunnel/python/config.json b/modules/amqpwstunnel/python/config.json
new file mode 100644
index 0000000..b092001
--- /dev/null
+++ b/modules/amqpwstunnel/python/config.json
@@ -0,0 +1,10 @@
+{
+ "rabbitmq_url": "amqp://airavata:airavata@gw56.iu.xsede.org:5672/messaging",
+ "exchange_name": "simstream",
+ "queue_name": "test",
+ "exchange_type": "direct",
+ "oauth_client_key": "y7xgdnNUx6ifOswJTPcqtzw4aOEa",
+ "oauth_client_secret": "CgfbuupAPhaOBSBPSScZUWHNANwa",
+ "oauth_grant_type": "password",
+ "maximum_cookie_age": 1
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b14e0fa/modules/amqpwstunnel/wstest.html
----------------------------------------------------------------------
diff --git a/modules/amqpwstunnel/wstest.html b/modules/amqpwstunnel/wstest.html
new file mode 100644
index 0000000..eedbf78
--- /dev/null
+++ b/modules/amqpwstunnel/wstest.html
@@ -0,0 +1,157 @@
+<!DOCTYPE html>
+
+<html lang="en">
+
+<head>
+ <meta charset="utf-8" />
+ <title>AMQP Websockets Test</title>
+
+ <style>
+ #content {
+ width: 100%;
+ min-height: 250px;
+ }
+
+ #ws-url-label, #ws-url-input, #ws-connect-button {
+ display: inline;
+ float: left;
+ margin-right: 10px;
+ }
+
+ #logs {
+ background-color: #888888;
+ width: 50%;
+ overflow-y: auto;
+ list-style: none;
+ padding: 3px;
+ margin-left: auto;
+ margin-right: auto;
+ text-align: center;
+ }
+
+ .log {
+ background-color: #cccccc;
+ display: inline-block;
+ min-height: 30px;
+ width: 90%;
+ border: 1px solid #000000;
+ padding: 3px;
+ margin-left: auto;
+ margin-right: auto;
+ margin-bottom: 5px;
+ text-align: left;
+ }
+ </style>
+</head>
+
+<body>
+
+<div id="content">
+ <div id="ws-url">
+ <label id="ws-url-label" for="ws-url-input">WebSockets URL</label>
+ <input type="text" name="ws-url-input" id="ws-url-input" />
+ <input type="text" name="username" id="username" placeholder="Username" />
+ <input type="password" name="password" id="password" placeholder="Password" />
+ <button id="ws-connect-button" class="open">Connect</button>
+ <button id="ws-send-credentials">Send</button><br />
+ <frame>
+ <form action="http://localhost:8888/auth" method="post">
+ <input type="text" name="username" id="username" placeholder="Username" />
+ <input type="password" name="password" id="password" placeholder="Password" />
+ <input type="hidden" name="redirect" id="redirect" value="file:///Users/jeffkinnison/development/amqp-ws-tunnel/wstest.html" />
+ <input type="submit" value="Auth" />
+ </form>
+ </frame>
+ </div>
+
+ <ul id="logs"><p>Logs</p></ul>
+</div>
+
+<script src="https://code.jquery.com/jquery-3.1.0.min.js"
+ integrity="sha256-cCueBR6CsyA4/9szpPfrX3s49M9vUU5BgtiJj06wt/s="
+ crossorigin="anonymous"></script>
+<script type="text/javascript">
+ var ws, open_handler, message_handler, error_handler, close_handler;
+
+ console.log(document.cookie);
+
+ $("#ws-connect-button").on("click", function() {
+ ws = new WebSocket("ws://localhost:8888/experiment/test");
+
+ ws.onopen = function() {
+ var username, password;
+ $("#ws-connect-button").toggleClass("open close").text("Disconnect");
+ $("#ws-url-input").prop("disabled", true);
+ ws.send("hi");
+ // username = $("#username").val();
+ // password = $("#password").val();
+ // ws.send(JSON.stringify({username: username, password: password}));
+ }
+
+ ws.onmessage = function(e) {
+ var msg;
+
+ console.log(e.data);
+
+ //msg = JSON.parse(e.data);
+ // if (msg.hasOwnProperty("logs")) {
+ // for (log in msg.logs) {
+ // if (msg.logs.hasOwnProperty(log)) {
+ // $("#logs").append($('<li class="log">' + log + '</li>'));
+ // }
+ // }
+ // }
+ }
+
+ ws.onclose = function(e) {
+ $("#ws-connect-button").toggleClass("open close").text("Connect");
+ $("#ws-url-input").prop("disabled", false);
+ }
+ });
+
+ $("#ws-send-credentials").on("click", function(e) {
+ uname = $("#username").val();
+ pass = $("#password").val();
+ console.log("Sending credentials");
+ //ws.send("moop");
+ //ws.send(JSON.stringify({username: uname, password: pass}));
+ $.ajax({
+ url: "http://localhost:8888/auth",
+ method: "post",
+ data: {username: uname, password: pass},
+ crossDomain: true,
+ success: function(data) {
+ console.log("Success");
+ console.log(document.cookie);
+ },
+ error: function(e) {
+ console.log(e);
+ },
+ complete: function() {
+ $.ajax({
+ url: "http://localhost:8888/auth",
+ method: "get",
+ crossDomain: true,
+ xhrHeaders: {
+
+ },
+ success: function(data) {
+ console.log(data);
+ },
+ error: function(xhr) {
+ console.log(xhr);
+ }
+ });
+ }
+ });
+ });
+
+ // $("form").on("submit", function(e) {
+ // e.preventDefault();
+ // });
+
+</script>
+
+</body>
+
+</html>
[2/4] airavata git commit: add simstream module to airavata
Posted by sc...@apache.org.
add simstream module to airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/24ced800
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/24ced800
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/24ced800
Branch: refs/heads/develop
Commit: 24ced80054f084a78d86fff36672c834e144d3e9
Parents: 2c7da5e
Author: Jeff Kinnison <je...@gmail.com>
Authored: Fri Jun 10 16:46:02 2016 -0400
Committer: Jeff Kinnison <je...@gmail.com>
Committed: Fri Jun 10 16:46:02 2016 -0400
----------------------------------------------------------------------
modules/simstream/README.md | 18 +
modules/simstream/example/README.md | 9 +
.../simstream/example/logfile_checker/README.md | 23 +
.../example/logfile_checker/generate_logs.sh | 22 +
.../example/logfile_checker/log_consumer.py | 43 ++
.../example/logfile_checker/log_streamer.py | 111 ++++
.../example/logfile_checker/remote_log.slurm | 21 +
.../simstream/example/logfile_checker/test.txt | 657 +++++++++++++++++++
.../simstream/example/mem_streamer/README.md | 17 +
.../example/mem_streamer/memory_consumption.py | 83 +++
.../example/mem_streamer/memory_streamer.py | 46 ++
.../simstream/example/openmm_example/README.md | 33 +
.../application/alanine_dipeptide.py | 55 ++
.../openmm_example/application/input.pdb | 24 +
.../openmm_example/application/trajectory.dcd | 0
.../example/openmm_example/openmm_consumer.py | 8 +
.../openmm_example/openmm_log_consumer.py | 32 +
.../openmm_example/openmm_rmsd_consumer.py | 36 +
.../example/openmm_example/openmm_stream.slurm | 19 +
.../example/openmm_example/openmm_streamer.py | 130 ++++
.../simstream/example/openmm_example/test.txt | 0
modules/simstream/example/settings.json | 6 +
modules/simstream/setup.py | 19 +
modules/simstream/simstream/__init__.py | 11 +
modules/simstream/simstream/datacollector.py | 110 ++++
modules/simstream/simstream/datareporter.py | 169 +++++
modules/simstream/simstream/eventhandler.py | 17 +
modules/simstream/simstream/eventmonitor.py | 46 ++
.../simstream/simstream/pikaasyncconsumer.py | 203 ++++++
modules/simstream/simstream/pikaproducer.py | 202 ++++++
modules/simstream/simstream/simstream.py | 167 +++++
31 files changed, 2337 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/README.md
----------------------------------------------------------------------
diff --git a/modules/simstream/README.md b/modules/simstream/README.md
new file mode 100755
index 0000000..9ab1379
--- /dev/null
+++ b/modules/simstream/README.md
@@ -0,0 +1,18 @@
+# simstream
+A utility for user-defined remote system and simulation data monitoring.
+
+## Dependencies
+* pika >= 0.10.0 (`pip install pika`)
+* A running, accessible instance of RabbitMQ server
+
+## Installation
+1. Clone this repository
+2. `python setup.py install`
+
+## Running the Example
+The example runs a simple collector that records the maximum memory used by the server (MB) and a timestamp. It also generates a plot of the results.
+
+1. Edit `example/memory_consumption.py` and `example/memory_streamer.py` with the correct RabbitMQ settings
+2. From the repository root, run `python example/memory_consumption.py`
+3. Open a new terminal session and run `python example/memory_streamer.py`
+4. Memory usage information should now be collected in the current terminal and received in the original terminal
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/README.md
----------------------------------------------------------------------
diff --git a/modules/simstream/example/README.md b/modules/simstream/example/README.md
new file mode 100755
index 0000000..23f36d5
--- /dev/null
+++ b/modules/simstream/example/README.md
@@ -0,0 +1,9 @@
+# SimStream Examples
+
+This directory contains several examples showcasing the functionality of SimStream. To run them, download and install Python 2.7/3.5, install SimStream using setup.py, and modify the settings.json file to match your RabbitMQ server settings.
+
+## The Examples
+
+* mem_streamer: Stream max RSS memory consumed by a basic SimStream utility
+* logfile_checker: Collect, filter, and stream tagged log file entries
+* openmm_example: Run a molecular dynamics simulation and return log information and system state measured by root mean squared deviation
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/README.md
----------------------------------------------------------------------
diff --git a/modules/simstream/example/logfile_checker/README.md b/modules/simstream/example/logfile_checker/README.md
new file mode 100755
index 0000000..30ed071
--- /dev/null
+++ b/modules/simstream/example/logfile_checker/README.md
@@ -0,0 +1,23 @@
+# SimStream Example: Logfile Streaming
+
+This example filters log file entries by starting tag and sends them to a remote listener. The listener prints the logs it receives to terminal.
+
+## Instructions
+
+### Start the Publisher
+1. Open a terminal
+2. `cd path/to/simstream/examples/logfile_checker`
+3. `python log_streamer.py`
+
+### Start the Consumer
+1. Open a terminal
+2. `cd path/to/simstream/examples/logfile_checker`
+3. `python log_consumer.py`
+
+### Write Some Logs
+1. Open a terminal
+2. `cd path/to/simstream/examples/logfile_checker`
+3. `chmod 700 generate_logs.sh`
+4. `./generate_logs.sh`
+
+This will write logs to `test.txt`. The Publisher will continuously check for new logs, filter based on the [STATUS] and [ERROR] tags, and send the filtered results to the RabbitMQ server. The Consumer will receive the filtered log entries and print them to the terminal.
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/generate_logs.sh
----------------------------------------------------------------------
diff --git a/modules/simstream/example/logfile_checker/generate_logs.sh b/modules/simstream/example/logfile_checker/generate_logs.sh
new file mode 100755
index 0000000..5fb7aa0
--- /dev/null
+++ b/modules/simstream/example/logfile_checker/generate_logs.sh
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+
+outfile="test.txt"
+
+echo "[STATUS] Starting logfile generator" >> $outfile
+
+sleep 2
+
+echo "[STATUS] Doing stuff" >> $outfile
+echo "Stuff that doesn't need to be reported" >> $outfile
+echo "Stuff that also doesn't need to be reported" >> $outfile
+echo "[DATA] 7.267" >> $outfile
+
+sleep 2
+
+echo "[STATUS] Doing more stuff" >> $outfile
+echo "Yet more stuff that doesn't need to be reported" >> $outfile
+echo "[ERROR] Some non-fatal error that the user should know about" >> $outfile
+
+sleep 2
+
+echo "[STATUS] Finished generating logs" >> $outfile
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/log_consumer.py
----------------------------------------------------------------------
diff --git a/modules/simstream/example/logfile_checker/log_consumer.py b/modules/simstream/example/logfile_checker/log_consumer.py
new file mode 100755
index 0000000..bf3beac
--- /dev/null
+++ b/modules/simstream/example/logfile_checker/log_consumer.py
@@ -0,0 +1,43 @@
+import json
+from simstream import PikaAsyncConsumer
+
+#settings = {
+# "url": "amqp://guest:guest@localhost:5672",
+# "exchange": "simstream",
+# "queue": "test",
+# "routing_key": "logfile",
+# "exchange_type": "topic"
+#}
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+ settings = json.load(f)
+ settings["routing_key"] = "memory"
+
+def print_log_line(body):
+ try:
+ lines = json.loads(body.decode())
+ if lines is not None:
+ for line in lines:
+ print(line)
+ except json.decoder.JSONDecodeError as e:
+ print("[Error]: Could not decode %s" % (body))
+ except UnicodeError as e:
+ print("[Error]: Could not decode from bytes to string: %s" % (e.reason))
+
+
+consumer = PikaAsyncConsumer(
+ settings["url"],
+ settings["exchange"],
+ settings["queue"],
+ print_log_line,
+ exchange_type=settings["exchange_type"],
+ routing_key=settings["routing_key"]
+ )
+
+if __name__ == "__main__":
+ try:
+ consumer.start()
+ except KeyboardInterrupt:
+ consumer.stop()
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/log_streamer.py
----------------------------------------------------------------------
diff --git a/modules/simstream/example/logfile_checker/log_streamer.py b/modules/simstream/example/logfile_checker/log_streamer.py
new file mode 100755
index 0000000..65f84f0
--- /dev/null
+++ b/modules/simstream/example/logfile_checker/log_streamer.py
@@ -0,0 +1,111 @@
+from simstream import SimStream, DataReporter
+
+import sys, json
+
+class LogMonitor(object):
+ """
+ A callable class that returns unprocessed lines in an open logfile.
+
+ Instance Variables:
+ logfile -- the path to the logfile to monitor
+ """
+
+ def __init__(self, logfile):
+ """
+ Set up a monitor for a logfile.
+
+ Arguments:
+ logfile -- the path to the logfile to monitor
+ """
+ self.logfile = logfile
+ self._generator = None
+ self._version = sys.version_info[0]
+
+ def __call__(self):
+ """
+ Get the next line from the logfile.
+ """
+ if not self._generator:
+ self._generator = self._monitor_logfile()
+
+ lines = []
+
+ line = self._next()
+ while line is not None:
+ lines.append(line)
+ line = self._next()
+
+ return lines
+
+ def _monitor_logfile(self):
+ """
+ Yield the next set of lines from the logfile.
+ """
+ try:
+ # Make the file persistent for the lifetime of the generator
+ with open(self.logfile) as f:
+ f.seek(0,2) # Move to the end of the file
+ while True:
+ # Get the next line or indicate the end of the file
+ line = f.readline()
+ if line:
+ yield line.strip()
+ else:
+ yield None
+
+ except EnvironmentError as e:
+ # Handle I/O exceptions in an OS-agnostic way
+ print("Error: Could not open file %s: %s" % (self.logfile, e))
+
+ def _next(self):
+ """
+ Python 2/3 agnostic retrieval of generator values.
+ """
+ return self._generator.__next__() if self._version == 3 else self._generator.next()
+
+
+def get_relevant_log_lines(log_lines):
+ import re
+ relevant_lines = []
+ pattern = r'^\[(STATUS|ERROR)\]'
+ for line in log_lines:
+ if re.match(pattern, line) is not None:
+ relevant_lines.append(line)
+ return relevant_lines
+
+
+#settings = {
+# "url": "amqp://guest:guest@localhost:5672",
+# "exchange": "simstream",
+# "queue": "test",
+# "routing_key": "logfile",
+# "exchange_type": "topic"
+#}
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+ settings = json.load(f)
+ settings["routing_key"] = "memory"
+
+if __name__ == "__main__":
+ logfile = sys.argv[1]
+ log_reporter = DataReporter()
+ log_reporter.add_collector("logger",
+ LogMonitor(logfile),
+ settings["url"],
+ settings["exchange"],
+ limit=10,
+ interval=2,
+ exchange_type=settings["exchange_type"],
+ postprocessor=get_relevant_log_lines)
+
+ log_reporter.start_streaming("logger", settings["routing_key"])
+
+ streamer = SimStream(config=settings, reporters={"log_reporter": log_reporter})
+ streamer.setup()
+
+ try:
+ streamer.start()
+ except KeyboardInterrupt:
+ streamer.stop()
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/remote_log.slurm
----------------------------------------------------------------------
diff --git a/modules/simstream/example/logfile_checker/remote_log.slurm b/modules/simstream/example/logfile_checker/remote_log.slurm
new file mode 100644
index 0000000..55834e9
--- /dev/null
+++ b/modules/simstream/example/logfile_checker/remote_log.slurm
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+
+#SBATCH -J remote_logger # Job name
+#SBATCH -o remote_logger.o%j # Name of stdout output file(%j expands to jobId)
+#SBATCH -e remote_logger.o%j # Name of stderr output file(%j expands to jobId)
+#SBATCH -p development # large queue for jobs > 256 nodes
+#SBATCH -t 00:10:00 # Run time (hh:mm:ss) - 1.5 hours
+#SBATCH -n 1 # Nodes to use
+
+module use "/home1/03947/tg832463/modulefiles"
+module load openmm
+
+touch test.txt
+
+python log_streamer.py test.txt &
+
+while true; do
+ bash generate_logs.sh
+ sleep 5
+done
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/test.txt
----------------------------------------------------------------------
diff --git a/modules/simstream/example/logfile_checker/test.txt b/modules/simstream/example/logfile_checker/test.txt
new file mode 100755
index 0000000..2ffb48c
--- /dev/null
+++ b/modules/simstream/example/logfile_checker/test.txt
@@ -0,0 +1,657 @@
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/mem_streamer/README.md
----------------------------------------------------------------------
diff --git a/modules/simstream/example/mem_streamer/README.md b/modules/simstream/example/mem_streamer/README.md
new file mode 100755
index 0000000..897b77a
--- /dev/null
+++ b/modules/simstream/example/mem_streamer/README.md
@@ -0,0 +1,17 @@
+# SimStream Example: Memory Usage Streamer
+
+This example collects data on the memory used by the Publisher and sends that data to the Consumer.
+
+## Instructions
+
+### Start the Consumer
+1. Open a terminal
+2. `cd path/to/simstream/examples/logfile_checker`
+3. `python log_consumer.py`
+
+### Starting the Consumer
+1. Open a new terminal
+2. `cd path/to/simstream/examples/mem_streamer`
+3. `python memory_consumer.py
+
+The Consumer should receive the memory used by the Publisher (KB) and the time that the data was collected (s since UNIX epoch) at a 2-second interval.
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/mem_streamer/memory_consumption.py
----------------------------------------------------------------------
diff --git a/modules/simstream/example/mem_streamer/memory_consumption.py b/modules/simstream/example/mem_streamer/memory_consumption.py
new file mode 100755
index 0000000..b67e975
--- /dev/null
+++ b/modules/simstream/example/mem_streamer/memory_consumption.py
@@ -0,0 +1,83 @@
+import tornado.ioloop
+import tornado.web
+import tornado.websocket
+
+import json
+
+from simstream import PikaAsyncConsumer, PikaProducer
+
+#settings = {
+# "url": "amqp://localhost:5672",
+# "exchange": "simstream",
+# "queue": "remote_node",
+# "routing_key": "test",
+# "exchange_type": "topic"
+#}
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+ settings = json.load(f)
+ settings["routing_key"] = "memory"
+
+
+def print_result(body):
+ try:
+ data = json.loads(body.decode())
+ print("%s: %s" % (data["x"], data["y"]))
+ except json.decoder.JSONDecodeError as e:
+ print("[ERROR] Could not decode JSON %s: %s", (body, e))
+ except UnicodeError as e:
+ print("[ERROR] Could not decode message %s: %s" % (body, e.reason))
+
+consumer = PikaAsyncConsumer(settings['url'],
+ settings['exchange'],
+ settings['queue'],
+ print_result,
+ exchange_type=settings['exchange_type'],
+ routing_key=settings['routing_key'])
+
+consumer.start()
+
+# class PlotHandler(tornado.web.RequestHandler):
+
+# def get(self):
+# pass
+
+
+# class StreamingHandler(tornado.websocket.WebSocketHandler):
+
+# def open(self):
+# self.consumer = PikaAsyncConsumer(settings.url,
+# settings.exchange,
+# settings.queue,
+# self.send_data,
+# routing_keys=settings.routing_key,
+# exchange_type=settings.exchange_type
+# )
+# self.producer = PikaProducer("",
+# remote_settings.url,
+# remote_settings.exchange,
+# remote_settings.queue,
+# remote_settings.routing_key)
+
+# def on_message(self, message):
+# if hasattr(self, producer) and producer is not None:
+# self.producer.send_data(message)
+
+# def on_close(self):
+# self.consumer.stop()
+# self.producer.shutdown()
+# self.consumer = None
+# self.producer = None
+
+# def send_data(self, ch, method, properties, body):
+# self.write_message(body)
+
+# if __name__ == "__main__":
+# app = tornado.web.Application([
+# (r"/plot/(.*)", )
+# (r"/stream/(.*)", StreamingHandler)
+# ])
+# app.listen(8888)
+# tornado.ioloop.IOLoop.current().start()
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/mem_streamer/memory_streamer.py
----------------------------------------------------------------------
diff --git a/modules/simstream/example/mem_streamer/memory_streamer.py b/modules/simstream/example/mem_streamer/memory_streamer.py
new file mode 100755
index 0000000..88f0d9a
--- /dev/null
+++ b/modules/simstream/example/mem_streamer/memory_streamer.py
@@ -0,0 +1,46 @@
+import resource
+import time
+import json
+
+from simstream import SimStream, DataReporter, DataCollector
+
+#settings = {
+# "url": "amqp://localhost:5672",
+# "exchange": "simstream",
+# "queue": "remote_node",
+# "routing_key": "stream_sender",
+# "exchange_type": "topic"
+#}
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+ settings = json.load(f)
+ settings["routing_key"] = "memory"
+
+def mem_callback():
+ return {'x': time.time() * 1000,
+ 'y': resource.getrusage(resource.RUSAGE_SELF).ru_maxrss}
+
+
+def mem_postprocessor(rss):
+ rss.y = rss.y / 1000000
+ return rss
+
+mem_reporter = DataReporter()
+mem_reporter.add_collector("rss",
+ mem_callback,
+ settings["url"],
+ settings["exchange"],
+ limit=100,
+ interval=2,
+ postprocessor=mem_postprocessor,
+ )
+
+mem_reporter.start_streaming("rss", "test")
+
+if __name__ == "__main__":
+ resource_streamer = SimStream(reporters={"memory": mem_reporter},
+ config=settings)
+ resource_streamer.setup()
+ resource_streamer.start()
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/README.md
----------------------------------------------------------------------
diff --git a/modules/simstream/example/openmm_example/README.md b/modules/simstream/example/openmm_example/README.md
new file mode 100644
index 0000000..59a0588
--- /dev/null
+++ b/modules/simstream/example/openmm_example/README.md
@@ -0,0 +1,33 @@
+# SimStream Example: Simulating Alanine Dipeptide
+
+This example runs a simulation of the small molecule Alanine Dipeptide and streams logs and RMSD. RMSD is a metric for judging how similar two molecular states are for the same model.
+
+## Instructions
+
+### Installing OpenMM
+The easiest way to install OpenMM is to use the Anaconda distribution of Python and run
+`conda install -c https://conda.anaconda.org/omnia openmm`
+
+If you do not wish to use Anaconda, install OpenMM from source by following the instructions in the [OpenMM docs](http://docs.openmm.org/7.0.0/userguide/application.html#installing-openmm "OpenMM documentation")
+
+### Start the Logfile Consumer
+1. Open a terminal
+2. `cd path/to/simstream/examples/openmm_example`
+3. `python openmm_log_consumer.py`
+
+### Start the RMSD Consumer
+1. Open a terminal
+2. `cd path/to/simstream/examples/openmm_example`
+3. `python openmm_rmsd_consumer.py`
+
+### Starting the Producer
+1. Open a new terminal
+2. `cd path/to/simstream/examples/openmm_example`
+3. `python openmm_streamer.py application/sim.out application/trajectory.dcd application/input.pdb application/input.pdb`
+
+### Starting the Simulation
+1. Open a new terminal
+2. `cd path/to/simstream/examples/openmm_example/application`
+3. `python alanine_dipeptide.py > sim.out`
+
+The Logfile Consumer should now be printing tagged log entries to the screen; the RMSD Consumer should be printing the calculated RMSD each time the trajectory file is written.
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/application/alanine_dipeptide.py
----------------------------------------------------------------------
diff --git a/modules/simstream/example/openmm_example/application/alanine_dipeptide.py b/modules/simstream/example/openmm_example/application/alanine_dipeptide.py
new file mode 100644
index 0000000..8b22b16
--- /dev/null
+++ b/modules/simstream/example/openmm_example/application/alanine_dipeptide.py
@@ -0,0 +1,55 @@
+##########################################################################
+# this script was generated by openmm-builder. to customize it further,
+# you can save the file to disk and edit it with your favorite editor.
+##########################################################################
+
+from __future__ import print_function
+from simtk.openmm import app
+import simtk.openmm as mm
+from simtk import unit
+from sys import stdout
+
+print("[START] Application is now running")
+
+pdb = app.PDBFile('input.pdb')
+print("[STATUS] Loaded model")
+forcefield = app.ForceField('amber03.xml', 'amber03_obc.xml')
+print("[STATUS] Loaded force field")
+
+system = forcefield.createSystem(pdb.topology, nonbondedMethod=app.NoCutoff,
+ constraints=None, rigidWater=False)
+print("[STATUS] Created system")
+integrator = mm.LangevinIntegrator(300*unit.kelvin, 91/unit.picoseconds,
+ 1.0*unit.femtoseconds)
+print("[STATUS] Created integrator")
+
+try:
+ platform = mm.Platform.getPlatformByName('CPU')
+except Exception as e:
+ print("[ERROR] Could not load platform CPU. Running Reference")
+ platform = mm.Platform.getPlatformByName("Reference")
+
+simulation = app.Simulation(pdb.topology, system, integrator, platform)
+print("[STATUS] Set up compute platform")
+simulation.context.setPositions(pdb.positions)
+print("[STATUS] Set atomic positions")
+
+print('[STATUS] Minimizing...')
+simulation.minimizeEnergy()
+print('[STATUS] Equilibrating...')
+simulation.step(100)
+
+simulation.reporters.append(app.DCDReporter('trajectory.dcd', 1000))
+simulation.reporters.append(app.StateDataReporter(stdout, 1000, step=True,
+ potentialEnergy=True, totalEnergy=True, temperature=True, separator='\t'))
+print("[STATUS] Set up reporters")
+
+print('[STATUS] Running Production...')
+
+increment = 1000
+
+for i in range(0,100000,increment):
+ print("[STATUS] Step %s" % (i))
+ simulation.step(increment)
+
+print('[END] Done!')
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/application/input.pdb
----------------------------------------------------------------------
diff --git a/modules/simstream/example/openmm_example/application/input.pdb b/modules/simstream/example/openmm_example/application/input.pdb
new file mode 100644
index 0000000..a47f196
--- /dev/null
+++ b/modules/simstream/example/openmm_example/application/input.pdb
@@ -0,0 +1,24 @@
+ATOM 1 1HH3 ACE 1 4.300 13.100 8.600 1.00 0.00
+ATOM 2 CH3 ACE 1 5.200 13.600 8.800 1.00 0.00
+ATOM 3 2HH3 ACE 1 4.900 14.300 9.600 1.00 0.00
+ATOM 4 3HH3 ACE 1 5.600 14.200 7.900 1.00 0.00
+ATOM 5 C ACE 1 6.100 12.500 9.400 1.00 0.00
+ATOM 6 O ACE 1 6.400 12.500 10.600 1.00 0.00
+ATOM 7 N ALA 2 6.600 11.600 8.500 1.00 0.00
+ATOM 8 H ALA 2 6.500 11.600 7.500 1.00 0.00
+ATOM 9 CA ALA 2 7.300 10.400 9.100 1.00 0.00
+ATOM 10 HA ALA 2 7.900 10.700 10.000 1.00 0.00
+ATOM 11 CB ALA 2 6.200 9.500 9.600 1.00 0.00
+ATOM 12 HB1 ALA 2 5.700 9.100 8.800 1.00 0.00
+ATOM 13 HB2 ALA 2 6.600 8.700 10.200 1.00 0.00
+ATOM 14 HB3 ALA 2 5.400 10.000 10.200 1.00 0.00
+ATOM 15 C ALA 2 8.400 9.800 8.200 1.00 0.00
+ATOM 16 O ALA 2 8.400 9.900 7.000 1.00 0.00
+ATOM 17 N NME 3 9.300 9.000 8.800 1.00 0.00
+ATOM 18 H NME 3 9.100 9.000 9.800 1.00 0.00
+ATOM 19 CH3 NME 3 10.500 8.400 8.300 1.00 0.00
+ATOM 20 1HH3 NME 3 10.700 7.700 9.100 1.00 0.00
+ATOM 21 2HH3 NME 3 10.400 8.000 7.300 1.00 0.00
+ATOM 22 3HH3 NME 3 11.300 9.100 8.300 1.00 0.00
+TER
+ENDMDL
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/application/trajectory.dcd
----------------------------------------------------------------------
diff --git a/modules/simstream/example/openmm_example/application/trajectory.dcd b/modules/simstream/example/openmm_example/application/trajectory.dcd
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/openmm_consumer.py
----------------------------------------------------------------------
diff --git a/modules/simstream/example/openmm_example/openmm_consumer.py b/modules/simstream/example/openmm_example/openmm_consumer.py
new file mode 100644
index 0000000..4ba2763
--- /dev/null
+++ b/modules/simstream/example/openmm_example/openmm_consumer.py
@@ -0,0 +1,8 @@
+import json
+from simstream import PikaAsyncConsumer
+
+def recv_log(body):
+ try:
+ logs = json.loads(body.decode())
+ for log in logs:
+ print(log)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/openmm_log_consumer.py
----------------------------------------------------------------------
diff --git a/modules/simstream/example/openmm_example/openmm_log_consumer.py b/modules/simstream/example/openmm_example/openmm_log_consumer.py
new file mode 100644
index 0000000..e28043f
--- /dev/null
+++ b/modules/simstream/example/openmm_example/openmm_log_consumer.py
@@ -0,0 +1,32 @@
+import json
+from simstream import PikaAsyncConsumer
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+ settings = json.load(f)
+settings["routing_key"] = "openmm.log"
+
+def print_log_line(body):
+ try:
+ lines = json.loads(body.decode())
+ if lines is not None:
+ for line in lines:
+ print(line)
+ except json.decoder.JSONDecodeError as e:
+ print("[Error]: Could not decode %s" % (body))
+ except UnicodeError as e:
+ print("[Error]: Could not decode from bytes to string: %s" % (e.reason))
+
+consumer = PikaAsyncConsumer(settings["url"],
+ settings["exchange"],
+ "openmm.log", # settings["queue"],
+ message_handler=print_log_line,
+ routing_key=settings["routing_key"],
+ exchange_type=settings["exchange_type"])
+
+if __name__ == "__main__":
+ try:
+ consumer.start()
+ except KeyboardInterrupt:
+ consumer.stop()
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/openmm_rmsd_consumer.py
----------------------------------------------------------------------
diff --git a/modules/simstream/example/openmm_example/openmm_rmsd_consumer.py b/modules/simstream/example/openmm_example/openmm_rmsd_consumer.py
new file mode 100644
index 0000000..f5d87c6
--- /dev/null
+++ b/modules/simstream/example/openmm_example/openmm_rmsd_consumer.py
@@ -0,0 +1,36 @@
+import json
+from simstream import PikaAsyncConsumer
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+ settings = json.load(f)
+settings["routing_key"] = "openmm.rmsd"
+
+def print_rmsd(body):
+ try:
+ lines = json.loads(body.decode())
+ if lines is not None:
+ for line in lines:
+ print(line[0])
+ except json.decoder.JSONDecodeError as e:
+ print("[Error]: Could not decode %s" % (body))
+ except UnicodeError as e:
+ print("[Error]: Could not decode from bytes to string: %s" % (e.reason))
+ except IndexError as e:
+ print("[Error]: List is empty")
+ except KeyError:
+ print(lines)
+
+consumer = PikaAsyncConsumer(settings["url"],
+ settings["exchange"],
+ "openmm.rmsd", # settings["queue"],
+ message_handler=print_rmsd,
+ routing_key=settings["routing_key"],
+ exchange_type=settings["exchange_type"])
+
+if __name__ == "__main__":
+ try:
+ consumer.start()
+ except KeyboardInterrupt:
+ consumer.stop()
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/openmm_stream.slurm
----------------------------------------------------------------------
diff --git a/modules/simstream/example/openmm_example/openmm_stream.slurm b/modules/simstream/example/openmm_example/openmm_stream.slurm
new file mode 100644
index 0000000..837e4d4
--- /dev/null
+++ b/modules/simstream/example/openmm_example/openmm_stream.slurm
@@ -0,0 +1,19 @@
+#!/usr/bin/env bash
+
+#SBATCH -J remote_logger # Job name
+#SBATCH -o remote_logger.o%j # Name of stdout output file(%j expands to jobId)
+#SBATCH -e remote_logger.o%j # Name of stderr output file(%j expands to jobId)
+#SBATCH -p development # large queue for jobs > 256 nodes
+#SBATCH -t 00:10:00 # Run time (hh:mm:ss) - 1.5 hours
+#SBATCH -n 1 # Nodes to use
+
+#module use "/home1/03947/tg832463/modulefiles"
+#module load openmm
+
+touch test.txt
+
+python openmm_streamer.py ./application/sim.out ./application/trajectory.dcd ./application/input.pdb ./application/input.pdb &
+
+cd application
+python alanine_dipeptide.py > sim.out
+sleep 5
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/openmm_streamer.py
----------------------------------------------------------------------
diff --git a/modules/simstream/example/openmm_example/openmm_streamer.py b/modules/simstream/example/openmm_example/openmm_streamer.py
new file mode 100644
index 0000000..da95614
--- /dev/null
+++ b/modules/simstream/example/openmm_example/openmm_streamer.py
@@ -0,0 +1,130 @@
+from simstream import SimStream, DataReporter
+
+import sys, json
+
+class LogMonitor(object):
+ """
+ A callable class that returns unprocessed lines in an open logfile.
+
+ Instance Variables:
+ logfile -- the path to the logfile to monitor
+ """
+
+ def __init__(self, logfile):
+ """
+ Set up a monitor for a logfile.
+
+ Arguments:
+ logfile -- the path to the logfile to monitor
+ """
+ self.logfile = logfile
+ self._generator = None
+ self._version = sys.version_info[0]
+
+ def __call__(self):
+ """
+ Get the next line from the logfile.
+ """
+ if not self._generator:
+ self._generator = self._monitor_logfile()
+
+ lines = []
+
+ line = self._next()
+ while line is not None:
+ lines.append(line)
+ line = self._next()
+ print(lines)
+ return lines
+
+ def _monitor_logfile(self):
+ """
+ Yield the next set of lines from the logfile.
+ """
+ try:
+ # Make the file persistent for the lifetime of the generator
+ with open(self.logfile) as f:
+ f.seek(0,2) # Move to the end of the file
+ while True:
+ # Get the next line or indicate the end of the file
+ line = f.readline()
+ if line:
+ yield line.strip()
+ else:
+ yield None
+
+ except EnvironmentError as e:
+ # Handle I/O exceptions in an OS-agnostic way
+ print("Error: Could not open file %s: %s" % (self.logfile, e))
+
+ def _next(self):
+ """
+ Python 2/3 agnostic retrieval of generator values.
+ """
+ return self._generator.__next__() if self._version == 3 else self._generator.next()
+
+
+def get_relevant_log_lines(log_lines):
+ import re
+ relevant_lines = []
+ pattern = r'^\[.+\]'
+ for line in log_lines:
+ if re.match(pattern, line) is not None:
+ relevant_lines.append(line)
+ return relevant_lines
+
+
+def calculate_rmsd(trajectory, topology, reference):
+ import mdtraj
+ traj = mdtraj.load(trajectory, top=topology)
+ ref = mdtraj.load(reference)
+ rmsd = mdtraj.rmsd(traj, ref)
+ data = {"step": str(traj.n_frames), "rmsd": str(rmsd[-1])}
+ return data
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+ settings = json.load(f)
+
+
+if __name__ == "__main__":
+ logfile = sys.argv[1]
+ trajectory = sys.argv[2]
+ topology = sys.argv[3]
+ reference = sys.argv[4]
+
+ open(logfile, 'a').close()
+ open(trajectory, 'a').close()
+
+ log_reporter = DataReporter()
+ log_reporter.add_collector("logger",
+ LogMonitor(logfile),
+ settings["url"],
+ settings["exchange"],
+ limit=10,
+ interval=2,
+ exchange_type="direct", # settings["exchange_type"],
+ postprocessor=get_relevant_log_lines)
+
+ log_reporter.start_streaming("logger", "openmm.log")
+
+ rmsd_reporter = DataReporter()
+ rmsd_reporter.add_collector("rmsd",
+ calculate_rmsd,
+ settings["url"],
+ settings["exchange"],
+ limit=1,
+ interval=2,
+ exchange_type="direct", # settings["exchange_type"],
+ callback_args=[trajectory, topology, reference])
+
+ rmsd_reporter.start_streaming("rmsd", "openmm.rmsd")
+
+ streamer = SimStream(config=settings, reporters={"log_reporter": log_reporter, "rmsd_reporter": rmsd_reporter})
+ streamer.setup()
+
+ try:
+ streamer.start()
+ except KeyboardInterrupt:
+ streamer.stop()
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/test.txt
----------------------------------------------------------------------
diff --git a/modules/simstream/example/openmm_example/test.txt b/modules/simstream/example/openmm_example/test.txt
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/settings.json
----------------------------------------------------------------------
diff --git a/modules/simstream/example/settings.json b/modules/simstream/example/settings.json
new file mode 100644
index 0000000..d354d46
--- /dev/null
+++ b/modules/simstream/example/settings.json
@@ -0,0 +1,6 @@
+{
+ "url": "amqp://guest:guest@localhost:5672",
+ "exchange": "simstream",
+ "queue": "test",
+ "exchange_type": "topic"
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/setup.py
----------------------------------------------------------------------
diff --git a/modules/simstream/setup.py b/modules/simstream/setup.py
new file mode 100755
index 0000000..2f3b3fd
--- /dev/null
+++ b/modules/simstream/setup.py
@@ -0,0 +1,19 @@
+"""
+ Setup for simstream module.
+
+ Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+from setuptools import setup, find_packages
+
+setup(
+ name="simstream",
+ version="0.1dev",
+ author="Jeff Kinnison",
+ author_email="jkinniso@nd.edu",
+ packages=find_packages(),
+ description="",
+ install_requires=[
+ "pika >= 0.10.0"
+ ],
+)
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/__init__.py
----------------------------------------------------------------------
diff --git a/modules/simstream/simstream/__init__.py b/modules/simstream/simstream/__init__.py
new file mode 100755
index 0000000..9d403cb
--- /dev/null
+++ b/modules/simstream/simstream/__init__.py
@@ -0,0 +1,11 @@
+"""
+Utilties for collecting and distributing system data.
+
+Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+from .simstream import SimStream
+from .datareporter import DataReporter, CollectorExistsException, CollectorDoesNotExistException
+from .datacollector import DataCollector
+from .pikaasyncconsumer import PikaAsyncConsumer
+from .pikaproducer import PikaProducer
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/datacollector.py
----------------------------------------------------------------------
diff --git a/modules/simstream/simstream/datacollector.py b/modules/simstream/simstream/datacollector.py
new file mode 100755
index 0000000..f7f99c1
--- /dev/null
+++ b/modules/simstream/simstream/datacollector.py
@@ -0,0 +1,110 @@
+"""
+Utilties for collecting system data.
+
+Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+from .pikaproducer import PikaProducer
+
+from threading import Thread, Lock, Event
+
+import copy
+
+# TODO: Refactor into subclass of Thread
+
+class DataCollector(Thread):
+ """Collects data by running user-specified routines.
+
+ Inherits from: threading.Thread
+
+ Instance variables:
+ name -- the name of the collector
+ limit -- the maximum number of maintained data points
+ interval -- the interval (in seconds) at which data collection is performed
+
+ Public methods:
+ activate -- start collecting data
+ add_routing_key -- add a new streaming endpoint
+ deactivate -- stop further data collection
+ remove_routing_key -- remove a streaming endpoint
+ run -- collect data if active
+ """
+ def __init__(self, name, callback, rabbitmq_url, exchange, exchange_type="direct", limit=250, interval=10,
+ postprocessor=None, callback_args=[], postprocessor_args=[]):
+ """
+ Arguments:
+ name -- the name of the collector
+ callback -- the data collection function to run
+
+ Keyword arguments:
+ limit -- the maximum number of maintained data points (default 250)
+ interval -- the time interval in seconds at which to collect data
+ (default: 10)
+ postprocessor -- a function to run on the return value of callback
+ (default None)
+ callback_args -- the list of arguments to pass to the callback
+ (default [])
+ postprocessor_args -- the list of arguments to pass to the
+ postprocessor (default [])
+ """
+ super(DataCollector, self).__init__()
+ self.name = name if name else "Unknown Resource"
+ self.limit = limit
+ self.interval = interval
+ self._callback = callback
+ self._callback_args = callback_args
+ self._postprocessor = postprocessor
+ self._postprocessor_args = postprocessor_args
+ self._data = []
+ self._data_lock = Lock()
+ self._active = False
+ self._producer = PikaProducer(rabbitmq_url, exchange, exchange_type=exchange_type, routing_keys=[])
+
+ def activate(self):
+ """
+ Start collecting data.
+ """
+ self._active = True
+
+ def add_routing_key(self, key):
+ """
+ Add a new producer endpoint.
+ """
+ self._producer.add_routing_key(key)
+
+
+ def deactivate(self):
+ """
+ Stop collecting data.
+ """
+ self._active = False
+
+ def remove_routing_key(self, key):
+ self._producer.remove_routing_key(key)
+ if len(self._producer.endpoints) == 0:
+ self._producer.shutdown()
+
+ def run(self):
+ """
+ Run the callback and postprocessing subroutines and record result.
+
+ Catches generic exceptions because the function being run is not
+ known beforehand.
+ """
+ self._collection_event = Event()
+ while self._active and not self._collection_event.wait(timeout=self.interval):
+ try:
+ result = self._callback(*self._callback_args)
+ result = self._postprocessor(result, *self._postprocessor_args) if self._postprocessor else result
+ #print("Found the value ", result, " in ", self.name)
+ self._data.append(result)
+ if len(self._data) > self.limit:
+ self._data.pop(0)
+ self._producer(copy.copy(self._data))
+
+ except Exception as e:
+ print("[ERROR] %s" % (e))
+
+ def stop(self):
+ for key in self.producer.routing_keys:
+ self.remove_routing_key(key)
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/datareporter.py
----------------------------------------------------------------------
diff --git a/modules/simstream/simstream/datareporter.py b/modules/simstream/simstream/datareporter.py
new file mode 100755
index 0000000..156cc08
--- /dev/null
+++ b/modules/simstream/simstream/datareporter.py
@@ -0,0 +1,169 @@
+"""
+Utilties for collecting system data.
+
+Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+# TODO: Refactor to iterate over producers, not collectors. Collectors should
+# execute concurrently.
+# TODO: Add method to deactivate reporter
+
+from threading import Thread, Event
+
+from .datacollector import DataCollector
+
+
+class CollectorExistsException(Exception):
+ """Thrown when attempting to add a collector with a conflicting name."""
+ pass
+
+
+class CollectorDoesNotExistException(Exception):
+ """Thrown when attempting to access a collector that does not exist."""
+ pass
+
+
+class DataReporter(object):
+ """Manages collecting specified data.
+
+ Subclass of threading.Thread that modifies Thread.join() and Thread.run()
+
+ Instance variables:
+ collectors -- a dict of DataCollectors that are run at interval
+
+ Public methods:
+ add_collector -- add a new DataCollector to the list
+ run -- start the data collection loop
+ join -- end data collection and return control to main thread
+ start_collecting -- begin data collection for all collectors
+ start_collector -- begin data collection for a specific collector
+ stop_collecting -- stop all data collection
+ stop_collector -- stop a running DataCollector
+ """
+
+ def __init__(self, collectors={}):
+ super(DataReporter, self).__init__()
+ self.collectors = {}
+ for key, value in collectors:
+ self.add_collector(
+ key,
+ value.limit,
+ value.callback,
+ value.url,
+ value.exchange,
+ value.postprocessor,
+ value.callback_args,
+ value.postprocessor_args
+ )
+
+ def add_collector(self, name, callback, rabbitmq_url, exchange, limit=250, interval=10, postprocessor=None,
+ exchange_type="direct", callback_args=[], postprocessor_args=[]):
+ """Add a new collector.
+
+ Arguments:
+ name -- name of the new DataCollector
+ callback -- the data collection callback to run
+
+ Keyword arguments:
+ limit -- the number of data points to store (default 100)
+ postprocessor -- a postprocessing function to run on each data point
+ (default None)
+ callback_args -- a list of arguments to pass to the callback
+ (default [])
+ postprocessor_args -- a list of arguments to pass to the postprocessor
+ (default [])
+
+ Raises:
+ CollectorExistsException if a collector named name already exists
+ """
+ if name in self.collectors:
+ raise CollectorExistsException
+
+ self.collectors[name] = DataCollector(
+ name,
+ callback,
+ rabbitmq_url,
+ exchange,
+ limit=limit,
+ interval=interval,
+ postprocessor=postprocessor,
+ exchange_type=exchange_type,
+ callback_args=callback_args,
+ postprocessor_args=postprocessor_args
+ )
+
+ def start_collecting(self):
+ """
+ Start data collection for all associated collectors.
+ """
+ for collector in self.collectors:
+ self.start_collector(collector)
+
+ def start_collector(self, name):
+ """
+ Activate the specified collector.
+
+ Arguments:
+ name -- the name of the collector to start
+
+ Raises:
+ RuntimeError if the collector has already been started.
+ """
+ try:
+ self.collectors[name].activate()
+ self.collectors[name].start()
+ except RuntimeError as e:
+ print("Error starting collector ", name)
+ print(e)
+
+ def stop_collecting(self):
+ """
+ Stop all collectors.
+ """
+ for collector in self.collectors:
+ self.stop_collector(collector)
+
+ def stop_collector(self, name):
+ """Deactivate the specified collector.
+
+ Arguments:
+ name -- the name of the collector to stop
+
+ Raises:
+ CollectorDoesNotExistException if no collector named name exists
+ """
+ if name not in self.collectors:
+ raise CollectorDoesNotExistException
+
+ try:
+ self.collectors[name].deactivate()
+ self.collectors[name].join()
+ except RuntimeError as e: # Catch deadlock
+ print(e)
+
+
+ def start_streaming(self, collector_name, routing_key):
+ """
+ Begin streaming data from a collector to a particular recipient.
+
+ Arguments:
+ routing_key -- the routing key to reach the intended recipient
+ """
+ if collector_name not in self.collectors: # Make sure collector exists
+ raise CollectorDoesNotExistException
+ self.collectors[collector_name].add_routing_key(routing_key)
+
+ def stop_streaming(self, collector_name, routing_key):
+ """
+ Stop a particular stream.
+
+ Arguments:
+ collector_name -- the collector associated with the producer to stop
+ routing_key -- the routing key to reach the intended recipient
+
+ Raises:
+ ProducerDoesNotExistException if no producer named name exists
+ ValueError if the producer is removed by another call to this method
+ after the for loop begins
+ """
+ pass
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/eventhandler.py
----------------------------------------------------------------------
diff --git a/modules/simstream/simstream/eventhandler.py b/modules/simstream/simstream/eventhandler.py
new file mode 100755
index 0000000..9f4f3f2
--- /dev/null
+++ b/modules/simstream/simstream/eventhandler.py
@@ -0,0 +1,17 @@
+"""
+A utility for responding to user-defined events.
+
+Author: Jeff Kinnison (jkinniso)
+"""
+
+
+class EventHandler(object):
+ """
+ """
+ def __init__(self, name, handler, handler_args=[]):
+ self.name = name
+ self._handler = handler
+ self._handler_args
+
+ def __call__(self):
+ pass
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/eventmonitor.py
----------------------------------------------------------------------
diff --git a/modules/simstream/simstream/eventmonitor.py b/modules/simstream/simstream/eventmonitor.py
new file mode 100755
index 0000000..d8c79f4
--- /dev/null
+++ b/modules/simstream/simstream/eventmonitor.py
@@ -0,0 +1,46 @@
+"""
+Utility for monitoring collected data.
+
+Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+# TODO: Add method to add handlers
+# TODO: Add method to create PikaProducer
+# TODO: Add method to use PikaProducer to respond to events
+# TODO: Add method to deactivate monitor
+
+
+class EventCheckerNotCallableException(Exception):
+ pass
+
+
+class EventHandlerNotCallableException(Exception):
+ pass
+
+
+class EventHandlerDoesNotExistException(Exception):
+ pass
+
+
+class EventMonitor(object):
+ """Checks data for user-defined bounds violations.
+
+ Instance variables:
+ handlers -- a dict of EventHandler objects indexed by name
+ """
+ def __init__(self, event_check, handlers={}):
+ self._event_check = event_check
+ self.handlers = handlers
+
+ def __call__(self, val):
+ if not callable(self._event_check):
+ raise EventCheckerNotCallableException
+ self._run_handler(self.event_check(val))
+
+ def _run_handler(self, handler_names):
+ for name in handler_names:
+ if name not in self.handlers:
+ raise EventHandlerDoesNotExistException
+ if not callable(self.handlers[name]):
+ raise EventHandlerNotCallableException
+ self.handlers[name]()
http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/pikaasyncconsumer.py
----------------------------------------------------------------------
diff --git a/modules/simstream/simstream/pikaasyncconsumer.py b/modules/simstream/simstream/pikaasyncconsumer.py
new file mode 100755
index 0000000..1c58687
--- /dev/null
+++ b/modules/simstream/simstream/pikaasyncconsumer.py
@@ -0,0 +1,203 @@
+"""
+Streaming utility for system and simulation data.
+
+author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+import json
+import pika
+
+class PikaAsyncConsumer(object):
+ """
+ The primary entry point for routing incoming messages to the proper handler.
+ """
+
+ def __init__(self, rabbitmq_url, exchange_name, queue_name, message_handler,
+ exchange_type="direct", routing_key="#"):
+ """
+ Create a new instance of Streamer.
+
+ Arguments:
+ rabbitmq_url -- URL to RabbitMQ server
+ exchange_name -- name of RabbitMQ exchange to join
+ queue_name -- name of RabbitMQ queue to join
+
+ Keyword Arguments:
+ exchange_type -- one of 'direct', 'topic', 'fanout', 'headers'
+ (default 'direct')
+ routing_keys -- the routing key that this consumer listens for
+ (default '#', receives all messages)
+ """
+ self._connection = None
+ self._channel = None
+ self._shut_down = False
+ self._consumer_tag = None
+ self._url = rabbitmq_url
+ self._message_handler = message_handler
+
+ # The following are necessary to guarantee that both the RabbitMQ
+ # server and Streamer know where to look for messages. These names will
+ # be decided before dispatch and should be recorded in a config file or
+ # else on a per-job basis.
+ self._exchange = exchange_name
+ self._exchange_type = exchange_type
+ self._queue = queue_name
+ self._routing_key = routing_key
+
+ def connect(self):
+ """
+ Create an asynchronous connection to the RabbitMQ server at URL.
+ """
+ return pika.SelectConnection(pika.URLParameters(self._url),
+ on_open_callback=self.on_connection_open,
+ on_close_callback=self.on_connection_close,
+ stop_ioloop_on_close=False)
+
+ def on_connection_open(self, unused_connection):
+ """
+ Actions to perform when the connection opens. This may not happen
+ immediately, so defer action to this callback.
+
+ Arguments:
+ unused_connection -- the created connection (by this point already
+ available as self._connection)
+ """
+ self._connection.channel(on_open_callback=self.on_channel_open)
+
+ def on_connection_close(self, connection, code, text):
+ """
+ Actions to perform when the connection is unexpectedly closed by the
+ RabbitMQ server.
+
+ Arguments:
+ connection -- the connection that was closed (same as self._connection)
+ code -- response code from the RabbitMQ server
+ text -- response body from the RabbitMQ server
+ """
+ self._channel = None
+ if self._shut_down:
+ self._connection.ioloop.stop()
+ else:
+ self._connection.add_timeout(5, self.reconnect)
+
+ def reconnect(self):
+ """
+ Attempt to reestablish a connection with the RabbitMQ server.
+ """
+ self._connection.ioloop.stop() # Stop the ioloop to completely close
+
+ if not self._shut_down: # Connect and restart the ioloop
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+
+ def on_channel_open(self, channel):
+ """
+ Store the opened channel for future use and set up the exchange and
+ queue to be used.
+
+ Arguments:
+ channel -- the Channel instance opened by the Channel.Open RPC
+ """
+ self._channel = channel
+ self._channel.add_on_close_callback(self.on_channel_close)
+ self.declare_exchange()
+
+
+ def on_channel_close(self, channel, code, text):
+ """
+ Actions to perform when the channel is unexpectedly closed by the
+ RabbitMQ server.
+
+ Arguments:
+ connection -- the connection that was closed (same as self._connection)
+ code -- response code from the RabbitMQ server
+ text -- response body from the RabbitMQ server
+ """
+ self._connection.close()
+
+ def declare_exchange(self):
+ """
+ Set up the exchange that will route messages to this consumer. Each
+ RabbitMQ exchange is uniquely identified by its name, so it does not
+ matter if the exchange has already been declared.
+ """
+ self._channel.exchange_declare(self.declare_exchange_success,
+ self._exchange,
+ self._exchange_type)
+
+ def declare_exchange_success(self, unused_connection):
+ """
+ Actions to perform on successful exchange declaration.
+ """
+ self.declare_queue()
+
+ def declare_queue(self):
+ """
+ Set up the queue that will route messages to this consumer. Each
+ RabbitMQ queue can be defined with routing keys to use only one
+ queue for multiple jobs.
+ """
+ self._channel.queue_declare(self.declare_queue_success,
+ self._queue)
+
+ def declare_queue_success(self, method_frame):
+ """
+ Actions to perform on successful queue declaration.
+ """
+ self._channel.queue_bind(self.munch,
+ self._queue,
+ self._exchange,
+ self._routing_key
+ )
+
+ def munch(self, unused):
+ """
+ Begin consuming messages from the Airavata API server.
+ """
+ self._channel.add_on_cancel_callback(self.cancel_channel)
+ self._consumer_tag = self._channel.basic_consume(self._process_message)
+
+ def cancel_channel(self, method_frame):
+ if self._channel is not None:
+ self._channel._close()
+
+ def _process_message(self, ch, method, properties, body):
+ """
+ Receive and verify a message, then pass it to the router.
+
+ Arguments:
+ ch -- the channel that routed the message
+ method -- delivery information
+ properties -- message properties
+ body -- the message
+ """
+ print("Received Message: %s" % body)
+ self._message_handler(body)
+ #self._channel.basic_ack(delivery_tag=method.delivery_tag)
+
+ def stop_consuming(self):
+ """
+ Stop the consumer if active.
+ """
+ if self._channel:
+ self._channel.basic_cancel(self.close_channel, self._consumer_tag)
+
+ def close_channel(self):
+ """
+ Close the channel to shut down the consumer and connection.
+ """
+ self._channel.close()
+
+ def start(self):
+ """
+ Start a connection with the RabbitMQ server.
+ """
+ self._connection = self.connect()
+ self._connection.ioloop.start()
+
+ def stop(self):
+ """
+ Stop an active connection with the RabbitMQ server.
+ """
+ self._closing = True
+ self.stop_consuming()
[4/4] airavata git commit: merging jeffs gsoc project
Posted by sc...@apache.org.
merging jeffs gsoc project
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ff18106a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ff18106a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ff18106a
Branch: refs/heads/develop
Commit: ff18106ab2559d053392c6b3e8a2c6e3172f9ad4
Parents: ef310d3 7b14e0f
Author: scnakandala <su...@gmail.com>
Authored: Mon Aug 22 23:36:46 2016 -0400
Committer: scnakandala <su...@gmail.com>
Committed: Mon Aug 22 23:36:46 2016 -0400
----------------------------------------------------------------------
modules/amqpwstunnel/python/amqpwstunnel.py | 583 +++++++++++++++++++++++
modules/amqpwstunnel/python/config.json | 10 +
modules/amqpwstunnel/wstest.html | 157 ++++++
3 files changed, 750 insertions(+)
----------------------------------------------------------------------