You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2017/04/06 19:02:49 UTC

[1/2] airavata-sandbox git commit: adding jeff kinneson gsoc project

Repository: airavata-sandbox
Updated Branches:
  refs/heads/master 690b52977 -> ef2916920


http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/simstream/eventmonitor.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/simstream/eventmonitor.py b/gsoc2016/simstream/simstream/eventmonitor.py
new file mode 100755
index 0000000..f4cbf5d
--- /dev/null
+++ b/gsoc2016/simstream/simstream/eventmonitor.py
@@ -0,0 +1,66 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Utility for monitoring collected data.
+
+Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+# TODO: Add method to add handlers
+# TODO: Add method to create PikaProducer
+# TODO: Add method to use PikaProducer to respond to events
+# TODO: Add method to deactivate monitor
+
+
+class EventCheckerNotCallableException(Exception):
+    pass
+
+
+class EventHandlerNotCallableException(Exception):
+    pass
+
+
+class EventHandlerDoesNotExistException(Exception):
+    pass
+
+
+class EventMonitor(object):
+    """Checks data for user-defined bounds violations.
+
+    Instance variables:
+    handlers -- a dict of EventHandler objects indexed by name
+    """
+    def __init__(self, event_check, handlers={}):
+        self._event_check = event_check
+        self.handlers = handlers
+
+    def __call__(self, val):
+        if not callable(self._event_check):
+            raise EventCheckerNotCallableException
+        self._run_handler(self.event_check(val))
+
+    def _run_handler(self, handler_names):
+        for name in handler_names:
+            if name not in self.handlers:
+                raise EventHandlerDoesNotExistException
+            if not callable(self.handlers[name]):
+                raise EventHandlerNotCallableException
+            self.handlers[name]()

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/simstream/pikaasyncconsumer.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/simstream/pikaasyncconsumer.py b/gsoc2016/simstream/simstream/pikaasyncconsumer.py
new file mode 100755
index 0000000..3500d81
--- /dev/null
+++ b/gsoc2016/simstream/simstream/pikaasyncconsumer.py
@@ -0,0 +1,223 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Streaming utility for system and simulation data.
+
+author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+import json
+import pika
+
+class PikaAsyncConsumer(object):
+    """
+    The primary entry point for routing incoming messages to the proper handler.
+    """
+
+    def __init__(self, rabbitmq_url, exchange_name, queue_name, message_handler,
+                 exchange_type="direct", routing_key="#"):
+        """
+        Create a new instance of Streamer.
+
+        Arguments:
+        rabbitmq_url -- URL to RabbitMQ server
+        exchange_name -- name of RabbitMQ exchange to join
+        queue_name -- name of RabbitMQ queue to join
+
+        Keyword Arguments:
+        exchange_type -- one of 'direct', 'topic', 'fanout', 'headers'
+                         (default 'direct')
+        routing_keys -- the routing key that this consumer listens for
+                        (default '#', receives all messages)
+        """
+        self._connection = None
+        self._channel = None
+        self._shut_down = False
+        self._consumer_tag = None
+        self._url = rabbitmq_url
+        self._message_handler = message_handler
+
+        # The following are necessary to guarantee that both the RabbitMQ
+        # server and Streamer know where to look for messages. These names will
+        # be decided before dispatch and should be recorded in a config file or
+        # else on a per-job basis.
+        self._exchange = exchange_name
+        self._exchange_type = exchange_type
+        self._queue = queue_name
+        self._routing_key = routing_key
+
+    def connect(self):
+        """
+        Create an asynchronous connection to the RabbitMQ server at URL.
+        """
+        return pika.SelectConnection(pika.URLParameters(self._url),
+                                     on_open_callback=self.on_connection_open,
+                                     on_close_callback=self.on_connection_close,
+                                     stop_ioloop_on_close=False)
+
+    def on_connection_open(self, unused_connection):
+        """
+        Actions to perform when the connection opens. This may not happen
+        immediately, so defer action to this callback.
+
+        Arguments:
+        unused_connection -- the created connection (by this point already
+                             available as self._connection)
+        """
+        self._connection.channel(on_open_callback=self.on_channel_open)
+
+    def on_connection_close(self, connection, code, text):
+        """
+        Actions to perform when the connection is unexpectedly closed by the
+        RabbitMQ server.
+
+        Arguments:
+        connection -- the connection that was closed (same as self._connection)
+        code -- response code from the RabbitMQ server
+        text -- response body from the RabbitMQ server
+        """
+        self._channel = None
+        if self._shut_down:
+            self._connection.ioloop.stop()
+        else:
+            self._connection.add_timeout(5, self.reconnect)
+
+    def reconnect(self):
+        """
+        Attempt to reestablish a connection with the RabbitMQ server.
+        """
+        self._connection.ioloop.stop() # Stop the ioloop to completely close
+
+        if not self._shut_down: # Connect and restart the ioloop
+            self._connection = self.connect()
+            self._connection.ioloop.start()
+
+    def on_channel_open(self, channel):
+        """
+        Store the opened channel for future use and set up the exchange and
+        queue to be used.
+
+        Arguments:
+        channel -- the Channel instance opened by the Channel.Open RPC
+        """
+        self._channel = channel
+        self._channel.add_on_close_callback(self.on_channel_close)
+        self.declare_exchange()
+
+
+    def on_channel_close(self, channel, code, text):
+        """
+        Actions to perform when the channel is unexpectedly closed by the
+        RabbitMQ server.
+
+        Arguments:
+        connection -- the connection that was closed (same as self._connection)
+        code -- response code from the RabbitMQ server
+        text -- response body from the RabbitMQ server
+        """
+        self._connection.close()
+
+    def declare_exchange(self):
+        """
+        Set up the exchange that will route messages to this consumer. Each
+        RabbitMQ exchange is uniquely identified by its name, so it does not
+        matter if the exchange has already been declared.
+        """
+        self._channel.exchange_declare(self.declare_exchange_success,
+                                        self._exchange,
+                                        self._exchange_type)
+
+    def declare_exchange_success(self, unused_connection):
+        """
+        Actions to perform on successful exchange declaration.
+        """
+        self.declare_queue()
+
+    def declare_queue(self):
+        """
+        Set up the queue that will route messages to this consumer. Each
+        RabbitMQ queue can be defined with routing keys to use only one
+        queue for multiple jobs.
+        """
+        self._channel.queue_declare(self.declare_queue_success,
+                                    self._queue)
+
+    def declare_queue_success(self, method_frame):
+        """
+        Actions to perform on successful queue declaration.
+        """
+        self._channel.queue_bind(self.munch,
+                                 self._queue,
+                                 self._exchange,
+                                 self._routing_key
+                                )
+
+    def munch(self, unused):
+        """
+        Begin consuming messages from the Airavata API server.
+        """
+        self._channel.add_on_cancel_callback(self.cancel_channel)
+        self._consumer_tag = self._channel.basic_consume(self._process_message)
+
+    def cancel_channel(self, method_frame):
+        if self._channel is not None:
+            self._channel._close()
+
+    def _process_message(self, ch, method, properties, body):
+        """
+        Receive and verify a message, then pass it to the router.
+
+        Arguments:
+        ch -- the channel that routed the message
+        method -- delivery information
+        properties -- message properties
+        body -- the message
+        """
+        print("Received Message: %s" % body)
+        self._message_handler(body)
+        #self._channel.basic_ack(delivery_tag=method.delivery_tag)
+
+    def stop_consuming(self):
+        """
+        Stop the consumer if active.
+        """
+        if self._channel:
+            self._channel.basic_cancel(self.close_channel, self._consumer_tag)
+
+    def close_channel(self):
+        """
+        Close the channel to shut down the consumer and connection.
+        """
+        self._channel.close()
+
+    def start(self):
+        """
+        Start a connection with the RabbitMQ server.
+        """
+        self._connection = self.connect()
+        self._connection.ioloop.start()
+
+    def stop(self):
+        """
+        Stop an active connection with the RabbitMQ server.
+        """
+        self._closing = True
+        self.stop_consuming()

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/simstream/pikaproducer.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/simstream/pikaproducer.py b/gsoc2016/simstream/simstream/pikaproducer.py
new file mode 100755
index 0000000..3302d67
--- /dev/null
+++ b/gsoc2016/simstream/simstream/pikaproducer.py
@@ -0,0 +1,222 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Utilties for sending data.
+
+Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+import json
+import pika
+
+
+class PikaProducer(object):
+    """
+    Utility for sending job data to a set of endpoints.
+    """
+
+    def __init__(self, rabbitmq_url, exchange, exchange_type="direct", routing_keys=[]):
+        """
+        Instantiate a new PikaProducer.
+
+        Arguments:
+        rabbitmq_url -- the url of the RabbitMQ server to send to
+        exchange -- the name of the exchange to send to
+
+        Keyword Arguments:
+        exchange_type -- one of one of 'direct', 'topic', 'fanout', 'headers'
+                         (default 'direct')
+        routing_key -- the routing keys to the endpoints for this producer
+                       (default [])
+        """
+        self._url = rabbitmq_url
+        self._exchange = exchange
+        self._exchange_type = exchange_type
+        self._routing_keys = routing_keys
+
+        self._connection = None # RabbitMQ connection object
+        self._channel = None    # RabbitMQ channel object
+
+        import random
+        self._name = random.randint(0,100)
+
+    def __call__(self, data):
+        """
+        Publish data to the RabbitMQ server.
+
+        Arguments:
+        data -- JSON serializable data to send
+        """
+        if self._connection is None: # Start the connection if it is inactive
+            self.start()
+        else: # Serialize and send the data
+            message = self.pack_data(data)
+            self.send_data(message)
+
+    def add_routing_key(self, key):
+        """
+        Add a new endpoint that will receive this data.
+
+        Arguments:
+        key -- the routing key for the new endpoint
+        """
+        if key not in self._routing_keys:
+            #print("Adding key %s to %s" % (key, self._name))
+            self._routing_keys.append(key)
+            #print(self._routing_keys)
+
+    def remove_routing_key(self, key):
+        """
+        Stop sending data to an existing endpoint.
+
+        Arguments:
+        key -- the routing key for the existing endpoint
+        """
+        try:
+            self._routing_keys.remove(key)
+        except ValueError:
+            pass
+
+    def pack_data(self, data):
+        """
+        JSON-serialize the data for transport.
+
+        Arguments:
+        data -- JSON-serializable data
+        """
+        try: # Generate a JSON string from the data
+            msg = json.dumps(data)
+        except TypeError as e: # Generate and return an error if serialization fails
+            msg = json.dumps({"err": str(e)})
+        finally:
+            return msg
+
+    def send_data(self, data):
+        """
+        Send the data to all active endpoints.
+
+        Arguments:
+        data -- the message to send
+        """
+        if self._channel is not None: # Make sure the connection is active
+            for key in self._routing_keys: # Send to all endpoints
+                #print(self._exchange, key, self._name)
+                self._channel.basic_publish(exchange = self._exchange,
+                                            routing_key=key,
+                                            body=data)
+
+    def start(self):
+        """
+        Open a connection if one does not exist.
+        """
+        print("Starting new connection")
+        if self._connection is None:
+            print("Creating connection object")
+            self._connection = pika.BlockingConnection(pika.URLParameters(self._url))
+            self._channel = self._connection.channel()
+            self._channel.exchange_declare(exchange=self._exchange,
+                                           type=self._exchange_type)
+
+    def shutdown(self):
+        """
+        Close an existing connection.
+        """
+        if self._channel is not None:
+            self._channel.close()
+
+    def _on_connection_open(self, unused_connection):
+        """
+        Create a new channel if the connection opens successful.
+
+        Arguments:
+        unused_connection -- a reference to self._connection
+        """
+        print("Connection is open")
+        self._connection.channel(on_open_callback=self._on_channel_open)
+
+    def _on_connection_close(self, connection, code, text):
+        """
+        Actions to take when the connection is closed for any reason.
+
+        Arguments:
+        connection -- the connection that was closed (same as self._connection)
+        code -- response code from the RabbitMQ server
+        text -- response body from the RabbitMQ server
+        """
+        print("Connection is closed")
+        self._channel = None
+        self._connection = None
+
+    def _on_channel_open(self, channel):
+        """
+        Actions to take when the channel opens.
+
+        Arguments:
+        channel -- the newly opened channel
+        """
+        print("Channel is open")
+        self._channel = channel
+        self._channel.add_on_close_callback(self._on_channel_close)
+        self._declare_exchange()
+
+    def _on_channel_close(self, channel, code, text):
+        """
+        Actions to take when the channel closes for any reason.
+
+        Arguments:
+        channel -- the channel that was closed (same as self._channel)
+        code -- response code from the RabbitMQ server
+        text -- response body from the RabbitMQ server
+        """
+        print("Channel is closed")
+        self._connection.close()
+
+    def _declare_exchange(self):
+        """
+        Set up the exchange to publish to even if it already exists.
+        """
+        print("Exchange is declared")
+        self._channel.exchange_declare(exchange=self._exchange,
+                                       type=self.exchange_type)
+
+if __name__ == "__main__":
+    import time
+
+    config = {
+        "url": "amqp://guest:guest@localhost:5672",
+        "exchange": "simstream",
+        "routing_key": "test_consumer",
+        "exchange_type": "topic"
+    }
+
+    producer = PikaProducer(config["url"],
+                            config["exchange"],
+                            exchange_type=config["exchange_type"],
+                            routing_keys=[config["routing_key"]])
+    producer.start()
+
+    while True:
+        try:
+            time.sleep(5)
+            data = str(time.time()) + ": Hello SimStream"
+            producer.send_data(data)
+        except KeyboardInterrupt:
+            producer.shutdown()

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/simstream/simstream.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/simstream/simstream.py b/gsoc2016/simstream/simstream/simstream.py
new file mode 100755
index 0000000..501f7c0
--- /dev/null
+++ b/gsoc2016/simstream/simstream/simstream.py
@@ -0,0 +1,187 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import pika
+
+from .pikaasyncconsumer import PikaAsyncConsumer
+from .datacollector import DataCollector
+from .datareporter import DataReporter
+from .eventhandler import EventHandler
+from .eventmonitor import EventMonitor
+
+
+class ReporterExistsException(Exception):
+    """Thrown when attempting to add a DataReporter with a conflicting name"""
+    pass
+
+
+class SimStream(object):
+    """
+    Manager for routing messages to their correct reporter.
+    """
+
+    DEFAULT_CONFIG_PATH="simstream.cnf"
+
+
+    class MessageParser(object):
+        """
+        Internal message parsing facilities.
+        """
+
+        def __init__(self):
+            self.parsed = None
+
+        def __call__(self, message):
+            pass
+
+
+    def __init__(self, reporters={}, config={}):
+        self.reporters = reporters
+        self.consumer = None
+        self.config = config
+
+    def add_data_reporter(self, reporter):
+        """
+        Add a new DataReporter object.
+
+        Arguments:
+        reporter -- the DataReporter to add
+        """
+        if reporter.name in self.reporters:
+            raise ReporterExistsException
+        self.reporters[reporter.name] = reporter
+
+    def parse_config(self):
+        """
+        Read the config file and set up the specified, data collection and
+        event handling resources.
+        """
+        # TODO: Read in config
+        # TODO: Set up configuration dict
+        pass
+
+    def route_message(self, message):
+        """
+        Send a message to the correct reporter.
+        """
+        # TODO: Create new MessageParser
+        # TODO: Run message through MessageParser
+        # TODO: Route message to the correct DataReporter/EventMonitor
+        parser = MessageParser()
+        parser(message)
+        if parser.reporter_name in self.reporters:
+            self.reporters[parser.reporter_name].start_streaming(
+                    parser.collector_name,
+                    parser.routing_key
+                )
+
+    def start_collecting(self):
+        """
+        Begin collecting data and monitoring for events.
+        """
+        for reporter in self.reporters:
+            self.reporters[reporter].start_collecting()
+
+    def setup(self):
+        """
+        Set up the SimStream instance: create DataCollectors, create
+        EventMonitors, configure AMQP consumer.
+        """
+        self.parse_config()
+        #self.setup_consumer()
+        self.setup_data_collection()
+        self.setup_event_monitoring()
+
+    def setup_data_collection(self):
+        """
+        Set up all DataReporters and DataCollectors.
+        """
+        # TODO: Create and configure all DataReporters
+        # TODO: Create and configure all DataCollectors
+        # TODO: Assign each DataCollector to the correct DataReporter
+        if "reporters" in self.config:
+            for reporter in self.config.reporters:
+                pass
+            for collector in self.config.collectors:
+                pass
+
+    def setup_event_monitoring(self):
+        #TODO: Create and configure all EventMonitors
+        #TODO: Create and configure all EventHandlers
+        #TODO: Assign each EventHandler to the correct EventMonitor
+        #TODO: Assign each EventMonitor to the correct DataCollector
+        pass
+
+    def setup_consumer(self):
+        """
+        Set up and configure the consumer.
+        """
+        if len(self.config) > 0 and self.consumer is None:
+            if "message_handler" in self.config:
+                message_handler = self.config["message_handler"]
+            else:
+                message_handler = self.route_message
+            self.consumer = PikaAsyncConsumer(self.config["url"],
+                                              self.config["exchange"],
+                                              self.config["queue"],
+                                              message_handler,
+                                              exchange_type=self.config["exchange_type"],
+                                              routing_key=self.config["routing_key"]
+                                             )
+
+    def start(self):
+        """
+        Configure and start SimStream.
+        """
+        if self.consumer is None:
+            self.setup()
+        self.start_collecting()
+        #self.consumer.start()
+
+    def stop(self):
+        """
+        Stop all data collection, event monitoring, and message consumption.
+        """
+        self.consumer.stop()
+        self.stop_collecting()
+
+
+if __name__ == "__main__":
+    def print_message(message):
+        with open("test.out", "w") as f:
+            print(message)
+
+    print(SimStream.DEFAULT_CONFIG_PATH)
+
+    config = {
+        "url": "amqp://guest:guest@localhost:5672",
+        "exchange": "simstream",
+        "queue": "simstream_test",
+        "message_handler": print_message,
+        "routing_key": "test_consumer",
+        "exchange_type": "topic"
+    }
+
+    streamer = SimStream(config=config)
+
+    try:
+        streamer.start()
+    except KeyboardInterrupt:
+        streamer.stop()


[2/2] airavata-sandbox git commit: adding jeff kinneson gsoc project

Posted by sm...@apache.org.
adding jeff kinneson gsoc project


Project: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/commit/ef291692
Tree: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/tree/ef291692
Diff: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/diff/ef291692

Branch: refs/heads/master
Commit: ef2916920f9dee215610aed828ccb35046e4925b
Parents: 690b529
Author: Suresh Marru <sm...@apache.org>
Authored: Thu Apr 6 15:02:42 2017 -0400
Committer: Suresh Marru <sm...@apache.org>
Committed: Thu Apr 6 15:02:42 2017 -0400

----------------------------------------------------------------------
 .../mock-airavata-php-sdk.iml                   |   2 +-
 gsoc2016/simstream/README.md                    |  18 +
 gsoc2016/simstream/example/README.md            |   9 +
 .../simstream/example/logfile_checker/README.md |  23 +
 .../example/logfile_checker/generate_logs.sh    |  42 ++
 .../example/logfile_checker/log_consumer.py     |  63 ++
 .../example/logfile_checker/log_streamer.py     | 131 ++++
 .../example/logfile_checker/remote_log.slurm    |  21 +
 .../simstream/example/logfile_checker/test.txt  | 677 +++++++++++++++++++
 .../simstream/example/mem_streamer/README.md    |  17 +
 .../example/mem_streamer/memory_consumption.py  | 103 +++
 .../example/mem_streamer/memory_streamer.py     |  66 ++
 .../simstream/example/openmm_example/README.md  |  33 +
 .../application/alanine_dipeptide.py            |  75 ++
 .../openmm_example/application/input.pdb        |  24 +
 .../openmm_example/application/trajectory.dcd   |   0
 .../example/openmm_example/openmm_consumer.py   |  28 +
 .../openmm_example/openmm_log_consumer.py       |  52 ++
 .../openmm_example/openmm_rmsd_consumer.py      |  56 ++
 .../example/openmm_example/openmm_stream.slurm  |  19 +
 .../example/openmm_example/openmm_streamer.py   | 150 ++++
 .../simstream/example/openmm_example/test.txt   |  20 +
 gsoc2016/simstream/example/settings.json        |   6 +
 gsoc2016/simstream/setup.py                     |  39 ++
 gsoc2016/simstream/simstream/__init__.py        |  31 +
 gsoc2016/simstream/simstream/datacollector.py   | 130 ++++
 gsoc2016/simstream/simstream/datareporter.py    | 189 ++++++
 gsoc2016/simstream/simstream/eventhandler.py    |  37 +
 gsoc2016/simstream/simstream/eventmonitor.py    |  66 ++
 .../simstream/simstream/pikaasyncconsumer.py    | 223 ++++++
 gsoc2016/simstream/simstream/pikaproducer.py    | 222 ++++++
 gsoc2016/simstream/simstream/simstream.py       | 187 +++++
 32 files changed, 2758 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/airavata-mock-multiplexed-api/mock-airavata-api-php-stubs/mock-airavata-php-sdk.iml
----------------------------------------------------------------------
diff --git a/airavata-mock-multiplexed-api/mock-airavata-api-php-stubs/mock-airavata-php-sdk.iml b/airavata-mock-multiplexed-api/mock-airavata-api-php-stubs/mock-airavata-php-sdk.iml
index 2e2238c..2ff16b5 100644
--- a/airavata-mock-multiplexed-api/mock-airavata-api-php-stubs/mock-airavata-php-sdk.iml
+++ b/airavata-mock-multiplexed-api/mock-airavata-api-php-stubs/mock-airavata-php-sdk.iml
@@ -1,5 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<module type="JAVA_MODULE" version="4">
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
   <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8" inherit-compiler-output="false">
     <output url="file://$MODULE_DIR$/target/classes" />
     <output-test url="file://$MODULE_DIR$/target/test-classes" />

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/README.md
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/README.md b/gsoc2016/simstream/README.md
new file mode 100755
index 0000000..9ab1379
--- /dev/null
+++ b/gsoc2016/simstream/README.md
@@ -0,0 +1,18 @@
+# simstream
+A utility for user-defined remote system and simulation data monitoring.
+
+## Dependencies
+* pika >= 0.10.0 (`pip install pika`)
+* A running, accessible instance of RabbitMQ server
+
+## Installation
+1. Clone this repository
+2. `python setup.py install`
+
+## Running the Example
+The example runs a simple collector that records the maximum memory used by the server (MB) and a timestamp. It also generates a plot of the results.
+
+1. Edit `example/memory_consumption.py` and `example/memory_streamer.py` with the correct RabbitMQ settings
+2. From the repository root, run `python example/memory_consumption.py`
+3. Open a new terminal session and run `python example/memory_streamer.py`
+4. Memory usage information should now be collected in the current terminal and received in the original terminal

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/README.md
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/README.md b/gsoc2016/simstream/example/README.md
new file mode 100755
index 0000000..23f36d5
--- /dev/null
+++ b/gsoc2016/simstream/example/README.md
@@ -0,0 +1,9 @@
+# SimStream Examples
+
+This directory contains several examples showcasing the functionality of SimStream. To run them, download and install Python 2.7/3.5, install SimStream using setup.py, and modify the settings.json file to match your RabbitMQ server settings.
+
+## The Examples
+
+* mem_streamer: Stream max RSS memory consumed by a basic SimStream utility
+* logfile_checker: Collect, filter, and stream tagged log file entries
+* openmm_example: Run a molecular dynamics simulation and return log information and system state measured by root mean squared deviation
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/logfile_checker/README.md
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/logfile_checker/README.md b/gsoc2016/simstream/example/logfile_checker/README.md
new file mode 100755
index 0000000..30ed071
--- /dev/null
+++ b/gsoc2016/simstream/example/logfile_checker/README.md
@@ -0,0 +1,23 @@
+# SimStream Example: Logfile Streaming
+
+This example filters log file entries by starting tag and sends them to a remote listener. The listener prints the logs it receives to terminal.
+
+## Instructions
+
+### Start the Publisher
+1. Open a terminal
+2. `cd path/to/simstream/examples/logfile_checker`
+3. `python log_streamer.py`
+
+### Start the Consumer
+1. Open a terminal
+2. `cd path/to/simstream/examples/logfile_checker`
+3. `python log_consumer.py`
+
+### Write Some Logs
+1. Open a terminal
+2. `cd path/to/simstream/examples/logfile_checker`
+3. `chmod 700 generate_logs.sh`
+4. `./generate_logs.sh`
+
+This will write logs to `test.txt`. The Publisher will continuously check for new logs, filter based on the [STATUS] and [ERROR] tags, and send the filtered results to the RabbitMQ server. The Consumer will receive the filtered log entries and print them to the terminal.

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/logfile_checker/generate_logs.sh
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/logfile_checker/generate_logs.sh b/gsoc2016/simstream/example/logfile_checker/generate_logs.sh
new file mode 100755
index 0000000..b752a8e
--- /dev/null
+++ b/gsoc2016/simstream/example/logfile_checker/generate_logs.sh
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+outfile="test.txt"
+
+echo "[STATUS] Starting logfile generator" >> $outfile
+
+sleep 2
+
+echo "[STATUS] Doing stuff" >> $outfile
+echo "Stuff that doesn't need to be reported" >> $outfile
+echo "Stuff that also doesn't need to be reported" >> $outfile
+echo "[DATA] 7.267" >> $outfile
+
+sleep 2
+
+echo "[STATUS] Doing more stuff" >> $outfile
+echo "Yet more stuff that doesn't need to be reported" >> $outfile
+echo "[ERROR] Some non-fatal error that the user should know about" >> $outfile
+
+sleep 2
+
+echo "[STATUS] Finished generating logs" >> $outfile
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/logfile_checker/log_consumer.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/logfile_checker/log_consumer.py b/gsoc2016/simstream/example/logfile_checker/log_consumer.py
new file mode 100755
index 0000000..bba46c7
--- /dev/null
+++ b/gsoc2016/simstream/example/logfile_checker/log_consumer.py
@@ -0,0 +1,63 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import json
+from simstream import PikaAsyncConsumer
+
+#settings = {
+#    "url": "amqp://guest:guest@localhost:5672",
+#    "exchange": "simstream",
+#    "queue": "test",
+#    "routing_key": "logfile",
+#    "exchange_type": "topic"
+#}
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+    settings = json.load(f)
+    settings["routing_key"] = "memory"
+
+def print_log_line(body):
+    try:
+        lines = json.loads(body.decode())
+        if lines is not None:
+            for line in lines:
+                print(line)
+    except json.decoder.JSONDecodeError as e:
+        print("[Error]: Could not decode %s" % (body))
+    except UnicodeError as e:
+        print("[Error]: Could not decode from bytes to string: %s" % (e.reason))
+
+
+consumer = PikaAsyncConsumer(
+                            settings["url"],
+                            settings["exchange"],
+                            settings["queue"],
+                            print_log_line,
+                            exchange_type=settings["exchange_type"],
+                            routing_key=settings["routing_key"]
+                            )
+
+if __name__ == "__main__":
+    try:
+        consumer.start()
+    except KeyboardInterrupt:
+        consumer.stop()

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/logfile_checker/log_streamer.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/logfile_checker/log_streamer.py b/gsoc2016/simstream/example/logfile_checker/log_streamer.py
new file mode 100755
index 0000000..d4790ce
--- /dev/null
+++ b/gsoc2016/simstream/example/logfile_checker/log_streamer.py
@@ -0,0 +1,131 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from simstream import SimStream, DataReporter
+
+import sys, json
+
+class LogMonitor(object):
+    """
+    A callable class that returns unprocessed lines in an open logfile.
+
+    Instance Variables:
+    logfile -- the path to the logfile to monitor
+    """
+
+    def __init__(self, logfile):
+        """
+        Set up a monitor for a logfile.
+
+        Arguments:
+        logfile -- the path to the logfile to monitor
+        """
+        self.logfile = logfile
+        self._generator = None
+        self._version = sys.version_info[0]
+
+    def __call__(self):
+        """
+        Get the next line from the logfile.
+        """
+        if not self._generator:
+            self._generator = self._monitor_logfile()
+
+        lines = []
+
+        line = self._next()
+        while line is not None:
+            lines.append(line)
+            line = self._next()
+
+        return lines
+
+    def _monitor_logfile(self):
+        """
+        Yield the next set of lines from the logfile.
+        """
+        try:
+            # Make the file persistent for the lifetime of the generator
+            with open(self.logfile) as f:
+                f.seek(0,2) # Move to the end of the file
+                while True:
+                    # Get the next line or indicate the end of the file
+                    line = f.readline()
+                    if line:
+                        yield line.strip()
+                    else:
+                        yield None
+
+        except EnvironmentError as e:
+            # Handle I/O exceptions in an OS-agnostic way
+            print("Error: Could not open file %s: %s" % (self.logfile, e))
+
+    def _next(self):
+        """
+        Python 2/3 agnostic retrieval of generator values.
+        """
+        return self._generator.__next__() if self._version == 3 else self._generator.next()
+
+
+def get_relevant_log_lines(log_lines):
+    import re
+    relevant_lines = []
+    pattern = r'^\[(STATUS|ERROR)\]'
+    for line in log_lines:
+        if re.match(pattern, line) is not None:
+            relevant_lines.append(line)
+    return relevant_lines
+
+
+#settings = {
+#    "url": "amqp://guest:guest@localhost:5672",
+#    "exchange": "simstream",
+#    "queue": "test",
+#    "routing_key": "logfile",
+#    "exchange_type": "topic"
+#}
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+    settings = json.load(f)
+    settings["routing_key"] = "memory"
+
+if __name__ == "__main__":
+    logfile = sys.argv[1]
+    log_reporter = DataReporter()
+    log_reporter.add_collector("logger",
+                               LogMonitor(logfile),
+                               settings["url"],
+                               settings["exchange"],
+                               limit=10,
+                               interval=2,
+                               exchange_type=settings["exchange_type"],
+                               postprocessor=get_relevant_log_lines)
+
+    log_reporter.start_streaming("logger", settings["routing_key"])
+
+    streamer = SimStream(config=settings, reporters={"log_reporter": log_reporter})
+    streamer.setup()
+
+    try:
+        streamer.start()
+    except KeyboardInterrupt:
+        streamer.stop()

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/logfile_checker/remote_log.slurm
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/logfile_checker/remote_log.slurm b/gsoc2016/simstream/example/logfile_checker/remote_log.slurm
new file mode 100644
index 0000000..55834e9
--- /dev/null
+++ b/gsoc2016/simstream/example/logfile_checker/remote_log.slurm
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+
+#SBATCH -J remote_logger     # Job name
+#SBATCH -o remote_logger.o%j # Name of stdout output file(%j expands to jobId) 
+#SBATCH -e remote_logger.o%j # Name of stderr output file(%j expands to jobId)
+#SBATCH -p development         # large queue for jobs > 256 nodes
+#SBATCH -t 00:10:00      # Run time (hh:mm:ss) - 1.5 hours
+#SBATCH -n 1             # Nodes to use
+
+module use "/home1/03947/tg832463/modulefiles"
+module load openmm
+
+touch test.txt
+
+python log_streamer.py test.txt &
+
+while true; do
+    bash generate_logs.sh
+    sleep 5
+done
+

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/logfile_checker/test.txt
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/logfile_checker/test.txt b/gsoc2016/simstream/example/logfile_checker/test.txt
new file mode 100755
index 0000000..a6bb619
--- /dev/null
+++ b/gsoc2016/simstream/example/logfile_checker/test.txt
@@ -0,0 +1,677 @@
+====
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+====
+
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs
+[STATUS] Starting logfile generator
+[STATUS] Doing stuff
+Stuff that doesn't need to be reported
+Stuff that also doesn't need to be reported
+[DATA] 7.267
+[STATUS] Doing more stuff
+Yet more stuff that doesn't need to be reported
+[ERROR] Some non-fatal error that the user should know about
+[STATUS] Finished generating logs

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/mem_streamer/README.md
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/mem_streamer/README.md b/gsoc2016/simstream/example/mem_streamer/README.md
new file mode 100755
index 0000000..897b77a
--- /dev/null
+++ b/gsoc2016/simstream/example/mem_streamer/README.md
@@ -0,0 +1,17 @@
+# SimStream Example: Memory Usage Streamer
+
+This example collects data on the memory used by the Publisher and sends that data to the Consumer.
+
+## Instructions
+
+### Start the Consumer
+1. Open a terminal
+2. `cd path/to/simstream/examples/logfile_checker`
+3. `python log_consumer.py`
+
+### Starting the Consumer
+1. Open a new terminal
+2. `cd path/to/simstream/examples/mem_streamer`
+3. `python memory_consumer.py
+
+The Consumer should receive the memory used by the Publisher (KB) and the time that the data was collected (s since UNIX epoch) at a 2-second interval.

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/mem_streamer/memory_consumption.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/mem_streamer/memory_consumption.py b/gsoc2016/simstream/example/mem_streamer/memory_consumption.py
new file mode 100755
index 0000000..8f07794
--- /dev/null
+++ b/gsoc2016/simstream/example/mem_streamer/memory_consumption.py
@@ -0,0 +1,103 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import tornado.ioloop
+import tornado.web
+import tornado.websocket
+
+import json
+
+from simstream import PikaAsyncConsumer, PikaProducer
+
+#settings = {
+#    "url": "amqp://localhost:5672",
+#    "exchange": "simstream",
+#    "queue": "remote_node",
+#    "routing_key": "test",
+#    "exchange_type": "topic"
+#}
+
+settings = {}
+ 
+with open("../settings.json", 'r') as f:
+    settings = json.load(f)
+    settings["routing_key"] = "memory"
+
+
+def print_result(body):
+    try:
+        data = json.loads(body.decode())
+        print("%s: %s" % (data["x"], data["y"]))
+    except json.decoder.JSONDecodeError as e:
+        print("[ERROR] Could not decode JSON %s: %s", (body, e))
+    except UnicodeError as e:
+        print("[ERROR] Could not decode message %s: %s" % (body, e.reason))
+
+consumer = PikaAsyncConsumer(settings['url'],
+                             settings['exchange'],
+                             settings['queue'],
+                             print_result,
+                             exchange_type=settings['exchange_type'],
+                             routing_key=settings['routing_key'])
+
+consumer.start()
+
+# class PlotHandler(tornado.web.RequestHandler):
+
+#     def get(self):
+#         pass
+
+
+# class StreamingHandler(tornado.websocket.WebSocketHandler):
+
+#     def open(self):
+#         self.consumer = PikaAsyncConsumer(settings.url,
+#                                           settings.exchange,
+#                                           settings.queue,
+#                                           self.send_data,
+#                                           routing_keys=settings.routing_key,
+#                                           exchange_type=settings.exchange_type
+#                                           )
+#         self.producer = PikaProducer("",
+#                                      remote_settings.url,
+#                                      remote_settings.exchange,
+#                                      remote_settings.queue,
+#                                      remote_settings.routing_key)
+
+#     def on_message(self, message):
+#         if hasattr(self, producer) and producer is not None:
+#             self.producer.send_data(message)
+
+#     def on_close(self):
+#         self.consumer.stop()
+#         self.producer.shutdown()
+#         self.consumer = None
+#         self.producer = None
+
+#     def send_data(self, ch, method, properties, body):
+#         self.write_message(body)
+
+# if __name__ == "__main__":
+#     app = tornado.web.Application([
+#             (r"/plot/(.*)", )
+#             (r"/stream/(.*)", StreamingHandler)
+#         ])
+#     app.listen(8888)
+#     tornado.ioloop.IOLoop.current().start()

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/mem_streamer/memory_streamer.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/mem_streamer/memory_streamer.py b/gsoc2016/simstream/example/mem_streamer/memory_streamer.py
new file mode 100755
index 0000000..754ea2d
--- /dev/null
+++ b/gsoc2016/simstream/example/mem_streamer/memory_streamer.py
@@ -0,0 +1,66 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import resource
+import time
+import json
+
+from simstream import SimStream, DataReporter, DataCollector
+
+#settings = {
+#    "url": "amqp://localhost:5672",
+#    "exchange": "simstream",
+#    "queue": "remote_node",
+#    "routing_key": "stream_sender",
+#    "exchange_type": "topic"
+#}
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+    settings = json.load(f)
+    settings["routing_key"] = "memory"
+
+def mem_callback():
+    return {'x': time.time() * 1000,
+            'y': resource.getrusage(resource.RUSAGE_SELF).ru_maxrss}
+
+
+def mem_postprocessor(rss):
+    rss.y  = rss.y / 1000000
+    return rss
+
+mem_reporter = DataReporter()
+mem_reporter.add_collector("rss",
+                           mem_callback,
+                           settings["url"],
+                           settings["exchange"],
+                           limit=100,
+                           interval=2,
+                           postprocessor=mem_postprocessor,
+                           )
+
+mem_reporter.start_streaming("rss", "test")
+
+if __name__ == "__main__":
+    resource_streamer = SimStream(reporters={"memory": mem_reporter},
+                                  config=settings)
+    resource_streamer.setup()
+    resource_streamer.start()

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/openmm_example/README.md
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/openmm_example/README.md b/gsoc2016/simstream/example/openmm_example/README.md
new file mode 100644
index 0000000..59a0588
--- /dev/null
+++ b/gsoc2016/simstream/example/openmm_example/README.md
@@ -0,0 +1,33 @@
+# SimStream Example: Simulating Alanine Dipeptide
+
+This example runs a simulation of the small molecule Alanine Dipeptide and streams logs and RMSD. RMSD is a metric for judging how similar two molecular states are for the same model.
+
+## Instructions
+
+### Installing OpenMM
+The easiest way to install OpenMM is to use the Anaconda distribution of Python and run
+`conda install -c https://conda.anaconda.org/omnia openmm`
+
+If you do not wish to use Anaconda, install OpenMM from source by following the instructions in the [OpenMM docs](http://docs.openmm.org/7.0.0/userguide/application.html#installing-openmm "OpenMM documentation")
+
+### Start the Logfile Consumer
+1. Open a terminal
+2. `cd path/to/simstream/examples/openmm_example`
+3. `python openmm_log_consumer.py`
+
+### Start the RMSD Consumer
+1. Open a terminal
+2. `cd path/to/simstream/examples/openmm_example`
+3. `python openmm_rmsd_consumer.py`
+
+### Starting the Producer
+1. Open a new terminal
+2. `cd path/to/simstream/examples/openmm_example`
+3. `python openmm_streamer.py application/sim.out application/trajectory.dcd application/input.pdb application/input.pdb`
+
+### Starting the Simulation
+1. Open a new terminal
+2. `cd path/to/simstream/examples/openmm_example/application`
+3. `python alanine_dipeptide.py > sim.out`
+
+The Logfile Consumer should now be printing tagged log entries to the screen; the RMSD Consumer should be printing the calculated RMSD each time the trajectory file is written.

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/openmm_example/application/alanine_dipeptide.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/openmm_example/application/alanine_dipeptide.py b/gsoc2016/simstream/example/openmm_example/application/alanine_dipeptide.py
new file mode 100644
index 0000000..503096c
--- /dev/null
+++ b/gsoc2016/simstream/example/openmm_example/application/alanine_dipeptide.py
@@ -0,0 +1,75 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+##########################################################################
+# this script was generated by openmm-builder. to customize it further,
+# you can save the file to disk and edit it with your favorite editor.
+##########################################################################
+
+from __future__ import print_function
+from simtk.openmm import app
+import simtk.openmm as mm
+from simtk import unit
+from sys import stdout
+
+print("[START] Application is now running")
+
+pdb = app.PDBFile('input.pdb')
+print("[STATUS] Loaded model")
+forcefield = app.ForceField('amber03.xml', 'amber03_obc.xml')
+print("[STATUS] Loaded force field")
+
+system = forcefield.createSystem(pdb.topology, nonbondedMethod=app.NoCutoff, 
+     constraints=None, rigidWater=False)
+print("[STATUS] Created system")
+integrator = mm.LangevinIntegrator(300*unit.kelvin, 91/unit.picoseconds, 
+    1.0*unit.femtoseconds)
+print("[STATUS] Created integrator")
+
+try:
+    platform = mm.Platform.getPlatformByName('CPU')
+except Exception as e:
+    print("[ERROR] Could not load platform CPU. Running Reference")
+    platform = mm.Platform.getPlatformByName("Reference")
+
+simulation = app.Simulation(pdb.topology, system, integrator, platform)
+print("[STATUS] Set up compute platform")
+simulation.context.setPositions(pdb.positions)
+print("[STATUS] Set atomic positions")
+
+print('[STATUS] Minimizing...')
+simulation.minimizeEnergy()
+print('[STATUS] Equilibrating...')
+simulation.step(100)
+
+simulation.reporters.append(app.DCDReporter('trajectory.dcd', 1000))
+simulation.reporters.append(app.StateDataReporter(stdout, 1000, step=True, 
+    potentialEnergy=True, totalEnergy=True, temperature=True, separator='\t'))
+print("[STATUS] Set up reporters")
+
+print('[STATUS] Running Production...')
+
+increment = 1000
+
+for i in range(0,100000,increment): 
+    print("[STATUS] Step %s" % (i))
+    simulation.step(increment)
+
+print('[END] Done!')

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/openmm_example/application/input.pdb
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/openmm_example/application/input.pdb b/gsoc2016/simstream/example/openmm_example/application/input.pdb
new file mode 100644
index 0000000..a47f196
--- /dev/null
+++ b/gsoc2016/simstream/example/openmm_example/application/input.pdb
@@ -0,0 +1,24 @@
+ATOM      1 1HH3 ACE     1       4.300  13.100   8.600  1.00  0.00            
+ATOM      2  CH3 ACE     1       5.200  13.600   8.800  1.00  0.00            
+ATOM      3 2HH3 ACE     1       4.900  14.300   9.600  1.00  0.00            
+ATOM      4 3HH3 ACE     1       5.600  14.200   7.900  1.00  0.00            
+ATOM      5  C   ACE     1       6.100  12.500   9.400  1.00  0.00            
+ATOM      6  O   ACE     1       6.400  12.500  10.600  1.00  0.00            
+ATOM      7  N   ALA     2       6.600  11.600   8.500  1.00  0.00            
+ATOM      8  H   ALA     2       6.500  11.600   7.500  1.00  0.00            
+ATOM      9  CA  ALA     2       7.300  10.400   9.100  1.00  0.00            
+ATOM     10  HA  ALA     2       7.900  10.700  10.000  1.00  0.00            
+ATOM     11  CB  ALA     2       6.200   9.500   9.600  1.00  0.00            
+ATOM     12  HB1 ALA     2       5.700   9.100   8.800  1.00  0.00            
+ATOM     13  HB2 ALA     2       6.600   8.700  10.200  1.00  0.00            
+ATOM     14  HB3 ALA     2       5.400  10.000  10.200  1.00  0.00            
+ATOM     15  C   ALA     2       8.400   9.800   8.200  1.00  0.00            
+ATOM     16  O   ALA     2       8.400   9.900   7.000  1.00  0.00            
+ATOM     17  N   NME     3       9.300   9.000   8.800  1.00  0.00            
+ATOM     18  H   NME     3       9.100   9.000   9.800  1.00  0.00            
+ATOM     19  CH3 NME     3      10.500   8.400   8.300  1.00  0.00            
+ATOM     20 1HH3 NME     3      10.700   7.700   9.100  1.00  0.00            
+ATOM     21 2HH3 NME     3      10.400   8.000   7.300  1.00  0.00            
+ATOM     22 3HH3 NME     3      11.300   9.100   8.300  1.00  0.00            
+TER
+ENDMDL

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/openmm_example/application/trajectory.dcd
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/openmm_example/application/trajectory.dcd b/gsoc2016/simstream/example/openmm_example/application/trajectory.dcd
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/openmm_example/openmm_consumer.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/openmm_example/openmm_consumer.py b/gsoc2016/simstream/example/openmm_example/openmm_consumer.py
new file mode 100644
index 0000000..b19cbd7
--- /dev/null
+++ b/gsoc2016/simstream/example/openmm_example/openmm_consumer.py
@@ -0,0 +1,28 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import json
+from simstream import PikaAsyncConsumer
+
+def recv_log(body):
+    try:
+        logs = json.loads(body.decode())
+        for log in logs:
+            print(log)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/openmm_example/openmm_log_consumer.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/openmm_example/openmm_log_consumer.py b/gsoc2016/simstream/example/openmm_example/openmm_log_consumer.py
new file mode 100644
index 0000000..e1280bf
--- /dev/null
+++ b/gsoc2016/simstream/example/openmm_example/openmm_log_consumer.py
@@ -0,0 +1,52 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import json
+from simstream import PikaAsyncConsumer
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+    settings = json.load(f)
+settings["routing_key"] = "openmm.log"
+
+def print_log_line(body):
+    try:
+        lines = json.loads(body.decode())
+        if lines is not None:
+            for line in lines:
+                print(line)
+    except json.decoder.JSONDecodeError as e:
+        print("[Error]: Could not decode %s" % (body))
+    except UnicodeError as e:
+        print("[Error]: Could not decode from bytes to string: %s" % (e.reason))
+
+consumer = PikaAsyncConsumer(settings["url"],
+                             settings["exchange"],
+                             "openmm.log", # settings["queue"],
+                             message_handler=print_log_line,
+                             routing_key=settings["routing_key"],
+                             exchange_type=settings["exchange_type"])
+
+if __name__ == "__main__":
+    try:
+        consumer.start()
+    except KeyboardInterrupt:
+        consumer.stop()

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/openmm_example/openmm_rmsd_consumer.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/openmm_example/openmm_rmsd_consumer.py b/gsoc2016/simstream/example/openmm_example/openmm_rmsd_consumer.py
new file mode 100644
index 0000000..4cdf6d5
--- /dev/null
+++ b/gsoc2016/simstream/example/openmm_example/openmm_rmsd_consumer.py
@@ -0,0 +1,56 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import json
+from simstream import PikaAsyncConsumer
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+    settings = json.load(f)
+settings["routing_key"] = "openmm.rmsd"
+
+def print_rmsd(body):
+    try:
+        lines = json.loads(body.decode())
+        if lines is not None:
+            for line in lines:
+                print(line[0])
+    except json.decoder.JSONDecodeError as e:
+        print("[Error]: Could not decode %s" % (body))
+    except UnicodeError as e:
+        print("[Error]: Could not decode from bytes to string: %s" % (e.reason))
+    except IndexError as e:
+        print("[Error]: List is empty")
+    except KeyError:
+        print(lines)
+
+consumer = PikaAsyncConsumer(settings["url"],
+                             settings["exchange"],
+                             "openmm.rmsd", # settings["queue"],
+                             message_handler=print_rmsd,
+                             routing_key=settings["routing_key"],
+                             exchange_type=settings["exchange_type"])
+
+if __name__ == "__main__":
+    try:
+        consumer.start()
+    except KeyboardInterrupt:
+        consumer.stop()

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/openmm_example/openmm_stream.slurm
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/openmm_example/openmm_stream.slurm b/gsoc2016/simstream/example/openmm_example/openmm_stream.slurm
new file mode 100644
index 0000000..837e4d4
--- /dev/null
+++ b/gsoc2016/simstream/example/openmm_example/openmm_stream.slurm
@@ -0,0 +1,19 @@
+#!/usr/bin/env bash
+
+#SBATCH -J remote_logger     # Job name
+#SBATCH -o remote_logger.o%j # Name of stdout output file(%j expands to jobId) 
+#SBATCH -e remote_logger.o%j # Name of stderr output file(%j expands to jobId)
+#SBATCH -p development         # large queue for jobs > 256 nodes
+#SBATCH -t 00:10:00      # Run time (hh:mm:ss) - 1.5 hours
+#SBATCH -n 1             # Nodes to use
+
+#module use "/home1/03947/tg832463/modulefiles"
+#module load openmm
+
+touch test.txt
+
+python openmm_streamer.py ./application/sim.out ./application/trajectory.dcd ./application/input.pdb ./application/input.pdb &
+
+cd application
+python alanine_dipeptide.py > sim.out
+sleep 5

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/openmm_example/openmm_streamer.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/openmm_example/openmm_streamer.py b/gsoc2016/simstream/example/openmm_example/openmm_streamer.py
new file mode 100644
index 0000000..36ba838
--- /dev/null
+++ b/gsoc2016/simstream/example/openmm_example/openmm_streamer.py
@@ -0,0 +1,150 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from simstream import SimStream, DataReporter
+
+import sys, json
+
+class LogMonitor(object):
+    """
+    A callable class that returns unprocessed lines in an open logfile.
+
+    Instance Variables:
+    logfile -- the path to the logfile to monitor
+    """
+
+    def __init__(self, logfile):
+        """
+        Set up a monitor for a logfile.
+
+        Arguments:
+        logfile -- the path to the logfile to monitor
+        """
+        self.logfile = logfile
+        self._generator = None
+        self._version = sys.version_info[0]
+
+    def __call__(self):
+        """
+        Get the next line from the logfile.
+        """
+        if not self._generator:
+            self._generator = self._monitor_logfile()
+
+        lines = []
+
+        line = self._next()
+        while line is not None:
+            lines.append(line)
+            line = self._next()
+        print(lines)
+        return lines
+
+    def _monitor_logfile(self):
+        """
+        Yield the next set of lines from the logfile.
+        """
+        try:
+            # Make the file persistent for the lifetime of the generator
+            with open(self.logfile) as f:
+                f.seek(0,2) # Move to the end of the file
+                while True:
+                    # Get the next line or indicate the end of the file
+                    line = f.readline()
+                    if line:
+                        yield line.strip()
+                    else:
+                        yield None
+
+        except EnvironmentError as e:
+            # Handle I/O exceptions in an OS-agnostic way
+            print("Error: Could not open file %s: %s" % (self.logfile, e))
+
+    def _next(self):
+        """
+        Python 2/3 agnostic retrieval of generator values.
+        """
+        return self._generator.__next__() if self._version == 3 else self._generator.next()
+
+
+def get_relevant_log_lines(log_lines):
+    import re
+    relevant_lines = []
+    pattern = r'^\[.+\]'
+    for line in log_lines:
+        if re.match(pattern, line) is not None:
+            relevant_lines.append(line)
+    return relevant_lines
+
+
+def calculate_rmsd(trajectory, topology, reference):
+    import mdtraj
+    traj = mdtraj.load(trajectory, top=topology)
+    ref = mdtraj.load(reference)
+    rmsd = mdtraj.rmsd(traj, ref)
+    data = {"step": str(traj.n_frames), "rmsd": str(rmsd[-1])}
+    return data
+
+settings = {}
+
+with open("../settings.json", 'r') as f:
+    settings = json.load(f)
+
+
+if __name__ == "__main__":
+    logfile = sys.argv[1]
+    trajectory = sys.argv[2]
+    topology = sys.argv[3]
+    reference = sys.argv[4]
+
+    open(logfile, 'a').close()
+    open(trajectory, 'a').close()
+
+    log_reporter = DataReporter()
+    log_reporter.add_collector("logger",
+                               LogMonitor(logfile),
+                               settings["url"],
+                               settings["exchange"],
+                               limit=10,
+                               interval=2,
+                               exchange_type="direct", # settings["exchange_type"],
+                               postprocessor=get_relevant_log_lines)
+
+    log_reporter.start_streaming("logger", "openmm.log")
+
+    rmsd_reporter = DataReporter()
+    rmsd_reporter.add_collector("rmsd",
+                                calculate_rmsd,
+                                settings["url"],
+                                settings["exchange"],
+                                limit=1,
+                                interval=2,
+                                exchange_type="direct",  # settings["exchange_type"],
+                                callback_args=[trajectory, topology, reference])
+
+    rmsd_reporter.start_streaming("rmsd", "openmm.rmsd")
+
+    streamer = SimStream(config=settings, reporters={"log_reporter": log_reporter, "rmsd_reporter": rmsd_reporter})
+    streamer.setup()
+
+    try:
+        streamer.start()
+    except KeyboardInterrupt:
+        streamer.stop()

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/openmm_example/test.txt
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/openmm_example/test.txt b/gsoc2016/simstream/example/openmm_example/test.txt
new file mode 100644
index 0000000..029e53e
--- /dev/null
+++ b/gsoc2016/simstream/example/openmm_example/test.txt
@@ -0,0 +1,20 @@
+====
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+====
+

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/example/settings.json
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/example/settings.json b/gsoc2016/simstream/example/settings.json
new file mode 100644
index 0000000..d354d46
--- /dev/null
+++ b/gsoc2016/simstream/example/settings.json
@@ -0,0 +1,6 @@
+{
+    "url": "amqp://guest:guest@localhost:5672",
+    "exchange": "simstream",
+    "queue": "test",
+    "exchange_type": "topic"
+}

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/setup.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/setup.py b/gsoc2016/simstream/setup.py
new file mode 100755
index 0000000..6aad3a7
--- /dev/null
+++ b/gsoc2016/simstream/setup.py
@@ -0,0 +1,39 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+    Setup for simstream module.
+
+    Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+from setuptools import setup, find_packages
+
+setup(
+    name="simstream",
+    version="0.1dev",
+    author="Jeff Kinnison",
+    author_email="jkinniso@nd.edu",
+    packages=find_packages(),
+    description="",
+    install_requires=[
+        "pika >= 0.10.0"
+    ],
+)

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/simstream/__init__.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/simstream/__init__.py b/gsoc2016/simstream/simstream/__init__.py
new file mode 100755
index 0000000..385d330
--- /dev/null
+++ b/gsoc2016/simstream/simstream/__init__.py
@@ -0,0 +1,31 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Utilties for collecting and distributing system data.
+
+Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+from .simstream import SimStream
+from .datareporter import DataReporter, CollectorExistsException, CollectorDoesNotExistException
+from .datacollector import DataCollector
+from .pikaasyncconsumer import PikaAsyncConsumer
+from .pikaproducer import PikaProducer

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/simstream/datacollector.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/simstream/datacollector.py b/gsoc2016/simstream/simstream/datacollector.py
new file mode 100755
index 0000000..2aa35ec
--- /dev/null
+++ b/gsoc2016/simstream/simstream/datacollector.py
@@ -0,0 +1,130 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Utilties for collecting system data.
+
+Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+from .pikaproducer import PikaProducer
+
+from threading import Thread, Lock, Event
+
+import copy
+
+# TODO: Refactor into subclass of Thread
+
+class DataCollector(Thread):
+    """Collects data by running user-specified routines.
+
+    Inherits from: threading.Thread
+
+    Instance variables:
+    name -- the name of the collector
+    limit -- the maximum number of maintained data points
+    interval -- the interval (in seconds) at which data collection is performed
+
+    Public methods:
+    activate -- start collecting data
+    add_routing_key -- add a new streaming endpoint
+    deactivate -- stop further data collection
+    remove_routing_key -- remove a streaming endpoint
+    run -- collect data if active
+    """
+    def __init__(self, name, callback, rabbitmq_url, exchange, exchange_type="direct", limit=250, interval=10,
+                 postprocessor=None, callback_args=[], postprocessor_args=[]):
+        """
+        Arguments:
+        name -- the name of the collector
+        callback -- the data collection function to run
+
+        Keyword arguments:
+        limit -- the maximum number of maintained data points (default 250)
+        interval -- the time interval in seconds at which to collect data
+                    (default: 10)
+        postprocessor -- a function to run on the return value of callback
+                         (default None)
+        callback_args -- the list of arguments to pass to the callback
+                         (default [])
+        postprocessor_args -- the list of arguments to pass to the
+                              postprocessor (default [])
+        """
+        super(DataCollector, self).__init__()
+        self.name = name if name else "Unknown Resource"
+        self.limit = limit
+        self.interval = interval
+        self._callback = callback
+        self._callback_args = callback_args
+        self._postprocessor = postprocessor
+        self._postprocessor_args = postprocessor_args
+        self._data = []
+        self._data_lock = Lock()
+        self._active = False
+        self._producer = PikaProducer(rabbitmq_url, exchange, exchange_type=exchange_type, routing_keys=[])
+
+    def activate(self):
+        """
+        Start collecting data.
+        """
+        self._active = True
+
+    def add_routing_key(self, key):
+        """
+        Add a new producer endpoint.
+        """
+        self._producer.add_routing_key(key)
+
+
+    def deactivate(self):
+        """
+        Stop collecting data.
+        """
+        self._active = False
+
+    def remove_routing_key(self, key):
+        self._producer.remove_routing_key(key)
+        if len(self._producer.endpoints) == 0:
+            self._producer.shutdown()
+
+    def run(self):
+        """
+        Run the callback and postprocessing subroutines and record result.
+
+        Catches generic exceptions because the function being run is not
+        known beforehand.
+        """
+        self._collection_event = Event()
+        while self._active and not self._collection_event.wait(timeout=self.interval):
+            try:
+                result = self._callback(*self._callback_args)
+                result = self._postprocessor(result, *self._postprocessor_args) if self._postprocessor else result
+                #print("Found the value ", result, " in ", self.name)
+                self._data.append(result)
+                if len(self._data) > self.limit:
+                    self._data.pop(0)
+                self._producer(copy.copy(self._data))
+
+            except Exception as e:
+                print("[ERROR] %s" % (e))
+
+    def stop(self):
+        for key in self.producer.routing_keys:
+            self.remove_routing_key(key)

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/simstream/datareporter.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/simstream/datareporter.py b/gsoc2016/simstream/simstream/datareporter.py
new file mode 100755
index 0000000..53e1387
--- /dev/null
+++ b/gsoc2016/simstream/simstream/datareporter.py
@@ -0,0 +1,189 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Utilties for collecting system data.
+
+Author: Jeff Kinnison (jkinniso@nd.edu)
+"""
+
+# TODO: Refactor to iterate over producers, not collectors. Collectors should
+#       execute concurrently.
+# TODO: Add method to deactivate reporter
+
+from threading import Thread, Event
+
+from .datacollector import DataCollector
+
+
+class CollectorExistsException(Exception):
+    """Thrown when attempting to add a collector with a conflicting name."""
+    pass
+
+
+class CollectorDoesNotExistException(Exception):
+    """Thrown when attempting to access a collector that does not exist."""
+    pass
+
+
+class DataReporter(object):
+    """Manages collecting specified data.
+
+    Subclass of threading.Thread that modifies Thread.join() and Thread.run()
+
+    Instance variables:
+    collectors -- a dict of DataCollectors that are run at interval
+
+    Public methods:
+    add_collector -- add a new DataCollector to the list
+    run -- start the data collection loop
+    join -- end data collection and return control to main thread
+    start_collecting -- begin data collection for all collectors
+    start_collector -- begin data collection for a specific collector
+    stop_collecting -- stop all data collection
+    stop_collector -- stop a running DataCollector
+    """
+
+    def __init__(self, collectors={}):
+        super(DataReporter, self).__init__()
+        self.collectors = {}
+        for key, value in collectors:
+            self.add_collector(
+                key,
+                value.limit,
+                value.callback,
+                value.url,
+                value.exchange,
+                value.postprocessor,
+                value.callback_args,
+                value.postprocessor_args
+            )
+
+    def add_collector(self, name, callback, rabbitmq_url, exchange, limit=250, interval=10, postprocessor=None,
+                      exchange_type="direct", callback_args=[], postprocessor_args=[]):
+        """Add a new collector.
+
+        Arguments:
+        name -- name of the new DataCollector
+        callback -- the data collection callback to run
+
+        Keyword arguments:
+        limit -- the number of data points to store (default 100)
+        postprocessor -- a postprocessing function to run on each data point
+                         (default None)
+        callback_args -- a list of arguments to pass to the callback
+                         (default [])
+        postprocessor_args -- a list of arguments to pass to the postprocessor
+                              (default [])
+
+        Raises:
+        CollectorExistsException if a collector named name already exists
+        """
+        if name in self.collectors:
+            raise CollectorExistsException
+
+        self.collectors[name] = DataCollector(
+            name,
+            callback,
+            rabbitmq_url,
+            exchange,
+            limit=limit,
+            interval=interval,
+            postprocessor=postprocessor,
+            exchange_type=exchange_type,
+            callback_args=callback_args,
+            postprocessor_args=postprocessor_args
+        )
+
+    def start_collecting(self):
+        """
+        Start data collection for all associated collectors.
+        """
+        for collector in self.collectors:
+            self.start_collector(collector)
+
+    def start_collector(self, name):
+        """
+        Activate the specified collector.
+
+        Arguments:
+        name -- the name of the collector to start
+
+        Raises:
+        RuntimeError if the collector has already been started.
+        """
+        try:
+            self.collectors[name].activate()
+            self.collectors[name].start()
+        except RuntimeError as e:
+            print("Error starting collector ", name)
+            print(e)
+
+    def stop_collecting(self):
+        """
+        Stop all collectors.
+        """
+        for collector in self.collectors:
+            self.stop_collector(collector)
+
+    def stop_collector(self, name):
+        """Deactivate the specified collector.
+
+        Arguments:
+        name -- the name of the collector to stop
+
+        Raises:
+        CollectorDoesNotExistException if no collector named name exists
+        """
+        if name not in self.collectors:
+            raise CollectorDoesNotExistException
+
+        try:
+            self.collectors[name].deactivate()
+            self.collectors[name].join()
+        except RuntimeError as e: # Catch deadlock
+            print(e)
+
+
+    def start_streaming(self, collector_name, routing_key):
+        """
+        Begin streaming data from a collector to a particular recipient.
+
+        Arguments:
+        routing_key -- the routing key to reach the intended recipient
+        """
+        if collector_name not in self.collectors: # Make sure collector exists
+            raise CollectorDoesNotExistException
+        self.collectors[collector_name].add_routing_key(routing_key)
+
+    def stop_streaming(self, collector_name, routing_key):
+        """
+        Stop a particular stream.
+
+        Arguments:
+        collector_name -- the collector associated with the producer to stop
+        routing_key -- the routing key to reach the intended recipient
+
+        Raises:
+        ProducerDoesNotExistException if no producer named name exists
+        ValueError if the producer is removed by another call to this method
+                   after the for loop begins
+        """
+        pass

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ef291692/gsoc2016/simstream/simstream/eventhandler.py
----------------------------------------------------------------------
diff --git a/gsoc2016/simstream/simstream/eventhandler.py b/gsoc2016/simstream/simstream/eventhandler.py
new file mode 100755
index 0000000..c7faabd
--- /dev/null
+++ b/gsoc2016/simstream/simstream/eventhandler.py
@@ -0,0 +1,37 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A utility for responding to user-defined events.
+
+Author: Jeff Kinnison (jkinniso)
+"""
+
+
+class EventHandler(object):
+    """
+    """
+    def __init__(self, name, handler, handler_args=[]):
+        self.name = name
+        self._handler = handler
+        self._handler_args
+
+    def __call__(self):
+        pass