You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/04/06 09:34:02 UTC

[41/41] ambari git commit: AMBARI-20684. Implement a websocket adapter for stomp.py (aonishuk)

AMBARI-20684. Implement a websocket adapter for stomp.py (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8de3961b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8de3961b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8de3961b

Branch: refs/heads/branch-3.0-perf
Commit: 8de3961b62dcf23826e16134b9dd5c5c461ab50c
Parents: fb4637b
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Thu Apr 6 12:33:00 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Thu Apr 6 12:33:00 2017 +0300

----------------------------------------------------------------------
 LICENSE.txt                                     |  28 +
 NOTICE.txt                                      |   5 +
 ambari-agent/conf/unix/install-helper.sh        |  11 +
 ambari-agent/pom.xml                            |   1 +
 .../main/python/ambari_agent/client_example.py  |  69 +++
 ambari-agent/src/packages/tarball/all.xml       |   5 +
 .../python/ambari_stomp/adapter/websocket.py    | 106 ++++
 .../src/main/python/ambari_ws4py/__init__.py    |  67 +++
 .../main/python/ambari_ws4py/client/__init__.py | 339 ++++++++++++
 .../python/ambari_ws4py/client/geventclient.py  |  92 ++++
 .../ambari_ws4py/client/threadedclient.py       |  98 ++++
 .../python/ambari_ws4py/client/tornadoclient.py | 155 ++++++
 .../src/main/python/ambari_ws4py/compat.py      |  46 ++
 .../src/main/python/ambari_ws4py/exc.py         |  27 +
 .../src/main/python/ambari_ws4py/framing.py     | 273 ++++++++++
 .../src/main/python/ambari_ws4py/manager.py     | 368 +++++++++++++
 .../src/main/python/ambari_ws4py/messaging.py   | 169 ++++++
 .../src/main/python/ambari_ws4py/streaming.py   | 319 +++++++++++
 .../main/python/ambari_ws4py/utf8validator.py   | 117 ++++
 .../src/main/python/ambari_ws4py/websocket.py   | 535 +++++++++++++++++++
 ambari-project/pom.xml                          | 100 +++-
 ambari-server/pom.xml                           |  82 ++-
 pom.xml                                         |   2 +
 23 files changed, 3009 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/LICENSE.txt
----------------------------------------------------------------------
diff --git a/LICENSE.txt b/LICENSE.txt
index f05016f..f2dc400 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -344,6 +344,34 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
+For ws4py websocket python library.
+
+Copyright (c) 2011-2016, Sylvain Hellegouarch
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+   this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+   notice, this list of conditions and the following disclaimer in the
+   documentation and/or other materials provided with the distribution.
+ * Neither the name of ws4py nor the names of its contributors may be used
+   to endorse or promote products derived from this software without
+   specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
 
 For jQuery 1.9.1 (ambari-web/vendor/scripts.jquery-1.9.1.js):
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index 50f982c..7429765 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -28,4 +28,9 @@ Some rights reserved.
 This product includes Simplejson, library fast encoding and decoding of json. (https://github.com/simplejson/simplejson - MIT license)
 
 Copyright (c) 2006 Bob Ippolito.
+All rights reserved.
+
+Ws4py, python library for websocket connectivity
+
+Copyright (c) 2011-2016, Sylvain Hellegouarch
 All rights reserved.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-agent/conf/unix/install-helper.sh
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/install-helper.sh b/ambari-agent/conf/unix/install-helper.sh
index 35aec15..a790160 100644
--- a/ambari-agent/conf/unix/install-helper.sh
+++ b/ambari-agent/conf/unix/install-helper.sh
@@ -23,6 +23,7 @@ RESOURCE_MANAGEMENT_DIR="/usr/lib/python2.6/site-packages/resource_management"
 JINJA_DIR="/usr/lib/python2.6/site-packages/ambari_jinja2"
 SIMPLEJSON_DIR="/usr/lib/python2.6/site-packages/ambari_simplejson"
 STOMP_DIR="/usr/lib/python2.6/site-packages/ambari_stomp"
+WS4PY_DIR="/usr/lib/python2.6/site-packages/ambari_ws4py"
 OLD_COMMON_DIR="/usr/lib/python2.6/site-packages/common_functions"
 INSTALL_HELPER_SERVER="/var/lib/ambari-server/install-helper.sh"
 COMMON_DIR_AGENT="/usr/lib/ambari-agent/lib/ambari_commons"
@@ -30,6 +31,7 @@ RESOURCE_MANAGEMENT_DIR_AGENT="/usr/lib/ambari-agent/lib/resource_management"
 JINJA_AGENT_DIR="/usr/lib/ambari-agent/lib/ambari_jinja2"
 SIMPLEJSON_AGENT_DIR="/usr/lib/ambari-agent/lib/ambari_simplejson"
 STOMP_AGENT_DIR="/usr/lib/ambari-agent/lib/ambari_stomp"
+WS4PY_AGENT_DIR="/usr/lib/ambari-agent/lib/ambari_ws4py"
 AMBARI_AGENT="/usr/lib/python2.6/site-packages/ambari_agent"
 PYTHON_WRAPER_TARGET="/usr/bin/ambari-python-wrap"
 AMBARI_AGENT_VAR="/var/lib/ambari-agent"
@@ -70,6 +72,11 @@ do_install(){
   if [ ! -d "$STOMP_DIR" ]; then
     ln -s "$STOMP_AGENT_DIR" "$STOMP_DIR"
   fi
+
+  # setting ws4py shared resource
+  if [ ! -d "$WS4PY_DIR" ]; then
+    ln -s "$WS4PY_AGENT_DIR" "$WS4PY_DIR"
+  fi
   
   # on nano Ubuntu, when umask=027 those folders are created without 'x' bit for 'others'.
   # which causes failures when hadoop users try to access tmp_dir
@@ -159,6 +166,10 @@ do_remove(){
     rm -f $STOMP_DIR
   fi
 
+  if [ -d "$WS4PY_DIR" ]; then
+    rm -f $WS4PY_DIR
+  fi
+
   if [ -d "$OLD_COMMON_DIR" ]; then
     rm -f $OLD_COMMON_DIR
   fi

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-agent/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml
index 4807a35..c232947 100644
--- a/ambari-agent/pom.xml
+++ b/ambari-agent/pom.xml
@@ -42,6 +42,7 @@
     <jinja.install.dir>/usr/lib/ambari-agent/lib/ambari_jinja2</jinja.install.dir>
     <simplejson.install.dir>/usr/lib/ambari-agent/lib/ambari_simplejson</simplejson.install.dir>
     <stomp.install.dir>/usr/lib/ambari-agent/lib/ambari_stomp</stomp.install.dir>
+    <ws4py.install.dir>/usr/lib/ambari-agent/lib/ambari_ws4py</ws4py.install.dir>
     <lib.dir>/usr/lib/ambari-agent/lib</lib.dir>
     <deb.architecture>amd64</deb.architecture>
     <ambari.server.module>../ambari-server</ambari.server.module>

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-agent/src/main/python/ambari_agent/client_example.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/client_example.py b/ambari-agent/src/main/python/ambari_agent/client_example.py
new file mode 100644
index 0000000..96e76be
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/client_example.py
@@ -0,0 +1,69 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+# TODO: remove this
+
+import time
+import ambari_stomp
+from ambari_stomp.adapter import websocket
+import base64
+
+correlationId = 0
+
+def get_headers():
+  global correlationId
+  correlationId += 1
+  headers = {
+    "content-type": "text/plain",
+    "correlationId": correlationId
+  }
+  return headers
+
+class MyListener(ambari_stomp.ConnectionListener):
+  def on_message(self, headers, message):
+    print('MyListener:\nreceived a message "{0}"\n'.format(message))
+    global read_messages
+    print headers
+    print message
+    read_messages.append({'id': headers['message-id'], 'subscription':headers['subscription']})
+
+
+class MyStatsListener(ambari_stomp.StatsListener):
+  def on_disconnected(self):
+    super(MyStatsListener, self).on_disconnected()
+    print('MyStatsListener:\n{0}\n'.format(self))
+
+read_messages = []
+
+conn = websocket.WsConnection('ws://gc6401:8080/api/stomp/v1')
+conn.transport.ws.extra_headers = [("Authorization", "Basic " + base64.b64encode('admin:admin'))]
+conn.set_listener('my_listener', MyListener())
+conn.set_listener('stats_listener', MyStatsListener())
+conn.start()
+
+conn.connect(wait=True, headers=get_headers())
+
+conn.subscribe(destination='/user/', id='sub-0', ack='client-individual')
+
+#conn.send(body="", destination='/test/time', headers=get_headers())
+conn.send(body="some message", destination='/test/echo', headers=get_headers())
+time.sleep(1)
+for message in read_messages:
+  conn.ack(message['id'], message['subscription'])
+
+conn.disconnect()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-agent/src/packages/tarball/all.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/src/packages/tarball/all.xml b/ambari-agent/src/packages/tarball/all.xml
index a22f0bb..c830dd1 100644
--- a/ambari-agent/src/packages/tarball/all.xml
+++ b/ambari-agent/src/packages/tarball/all.xml
@@ -77,6 +77,11 @@
     </fileSet>
     <fileSet>
       <directoryMode>755</directoryMode>
+      <directory>${project.basedir}/../ambari-common/src/main/python/ambari_ws4py</directory>
+      <outputDirectory>${ws4py.install.dir}</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directoryMode>755</directoryMode>
       <directory>src/examples</directory>
       <outputDirectory>${lib.dir}/examples</outputDirectory>
     </fileSet>

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
new file mode 100644
index 0000000..8416a27
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
@@ -0,0 +1,106 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import copy
+import logging
+
+from Queue import Queue
+
+from ambari_stomp.connect import BaseConnection
+from ambari_stomp.protocol import Protocol12
+from ambari_stomp.transport import Transport, DEFAULT_SSL_VERSION
+
+from ambari_ws4py.client.threadedclient import WebSocketClient
+
+logger = logging.getLogger(__name__)
+
+class QueuedWebSocketClient(WebSocketClient):
+  def __init__(self, *args, **kwargs):
+    WebSocketClient.__init__(self, *args, **kwargs)
+    self.messages = Queue()
+
+  def received_message(self, message):
+    """
+    Override the base class to store the incoming message
+    in the `messages` queue.
+    """
+    self.messages.put(copy.deepcopy(message))
+
+  def receive(self):
+    """
+    Returns messages that were stored into the
+    `messages` queue and returns `None` when the
+    websocket is terminated or closed.
+    """
+    # If the websocket was terminated and there are no messages
+    # left in the queue, return None immediately otherwise the client
+    # will block forever
+    if self.terminated and self.messages.empty():
+      return None
+    message = self.messages.get()
+    if message is StopIteration:
+      return None
+    return message
+
+  def closed(self, code, reason=None):
+    self.messages.put(StopIteration)
+
+class WsTransport(Transport):
+  def __init__(self, url):
+    Transport.__init__(self, (0, 0), False, False, 0.0, 0.0, 0.0, 0.0, 0, False, None, None, None, None, False,
+    DEFAULT_SSL_VERSION, None, None, None)
+    self.current_host_and_port = (0, 0) # mocking
+    self.ws = QueuedWebSocketClient(url, protocols=['http-only', 'chat'])
+    self.ws.daemon = False
+
+  def is_connected(self):
+    return self.connected
+
+  def attempt_connection(self):
+    self.ws.connect()
+
+  def send(self, encoded_frame):
+    logger.debug("Outgoing STOMP message:\n>>> " + encoded_frame)
+    self.ws.send(encoded_frame)
+
+  def receive(self):
+    try:
+      msg = str(self.ws.receive())
+      logger.debug("Incoming STOMP message:\n<<< " + msg)
+      return msg
+    except:
+      # exceptions from this method are hidden by the framework so implementing logging by ourselves
+      logger.exception("Exception while handling incoming STOMP message:")
+    return None
+
+  def stop(self):
+    self.running = False
+    self.ws.close_connection()
+    self.disconnect_socket()
+    Transport.stop(self)
+
+class WsConnection(BaseConnection, Protocol12):
+  def __init__(self, url, wait_on_receipt=False):
+    self.transport = WsTransport(url)
+    self.transport.set_listener('ws-listener', self)
+    self.transactions = {}
+    Protocol12.__init__(self, self.transport, (0, 0))
+
+  def disconnect(self, receipt=None, headers=None, **keyword_headers):
+    Protocol12.disconnect(self, receipt, headers, **keyword_headers)
+    self.transport.stop()

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/__init__.py b/ambari-common/src/main/python/ambari_ws4py/__init__.py
new file mode 100644
index 0000000..81d30b5
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/__init__.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of ambari_ws4py nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+import logging
+import logging.handlers as handlers
+
+__author__ = "Sylvain Hellegouarch"
+__version__ = "0.4.2"
+__all__ = ['WS_KEY', 'WS_VERSION', 'configure_logger', 'format_addresses']
+
+WS_KEY = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
+WS_VERSION = (8, 13)
+
+def configure_logger(stdout=True, filepath=None, level=logging.INFO):
+    logger = logging.getLogger('ambari_ws4py')
+    logger.setLevel(level)
+    logfmt = logging.Formatter("[%(asctime)s] %(levelname)s %(message)s")
+
+    if filepath:
+        h = handlers.RotatingFileHandler(filepath, maxBytes=10485760, backupCount=3)
+        h.setLevel(level)
+        h.setFormatter(logfmt)
+        logger.addHandler(h)
+
+    if stdout:
+        import sys
+        h = logging.StreamHandler(sys.stdout)
+        h.setLevel(level)
+        h.setFormatter(logfmt)
+        logger.addHandler(h)
+
+    return logger
+
+def format_addresses(ws):
+    me = ws.local_address
+    peer = ws.peer_address
+    if isinstance(me, tuple) and isinstance(peer, tuple):
+        me_ip, me_port = ws.local_address
+        peer_ip, peer_port = ws.peer_address
+        return "[Local => %s:%d | Remote => %s:%d]" % (me_ip, me_port, peer_ip, peer_port)
+
+    return "[Bound to '%s']" % me

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/client/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/client/__init__.py b/ambari-common/src/main/python/ambari_ws4py/client/__init__.py
new file mode 100644
index 0000000..89598ab
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/client/__init__.py
@@ -0,0 +1,339 @@
+# -*- coding: utf-8 -*-
+from base64 import b64encode
+from hashlib import sha1
+import os
+import socket
+import ssl
+
+from ambari_ws4py import WS_KEY, WS_VERSION
+from ambari_ws4py.exc import HandshakeError
+from ambari_ws4py.websocket import WebSocket
+from ambari_ws4py.compat import urlsplit
+
+__all__ = ['WebSocketBaseClient']
+
+class WebSocketBaseClient(WebSocket):
+    def __init__(self, url, protocols=None, extensions=None,
+                 heartbeat_freq=None, ssl_options=None, headers=None):
+        """
+        A websocket client that implements :rfc:`6455` and provides a simple
+        interface to communicate with a websocket server.
+
+        This class works on its own but will block if not run in
+        its own thread.
+
+        When an instance of this class is created, a :py:mod:`socket`
+        is created. If the connection is a TCP socket,
+        the nagle's algorithm is disabled.
+
+        The address of the server will be extracted from the given
+        websocket url.
+
+        The websocket key is randomly generated, reset the
+        `key` attribute if you want to provide yours.
+
+        For instance to create a TCP client:
+
+        .. code-block:: python
+
+           >>> from websocket.client import WebSocketBaseClient
+           >>> ws = WebSocketBaseClient('ws://localhost/ws')
+
+
+        Here is an example for a TCP client over SSL:
+
+        .. code-block:: python
+
+           >>> from websocket.client import WebSocketBaseClient
+           >>> ws = WebSocketBaseClient('wss://localhost/ws')
+
+
+        Finally an example of a Unix-domain connection:
+
+        .. code-block:: python
+
+           >>> from websocket.client import WebSocketBaseClient
+           >>> ws = WebSocketBaseClient('ws+unix:///tmp/my.sock')
+
+        Note that in this case, the initial Upgrade request
+        will be sent to ``/``. You may need to change this
+        by setting the resource explicitely before connecting:
+
+        .. code-block:: python
+
+           >>> from websocket.client import WebSocketBaseClient
+           >>> ws = WebSocketBaseClient('ws+unix:///tmp/my.sock')
+           >>> ws.resource = '/ws'
+           >>> ws.connect()
+
+        You may provide extra headers by passing a list of tuples
+        which must be unicode objects.
+
+        """
+        self.url = url
+        self.host = None
+        self.scheme = None
+        self.port = None
+        self.unix_socket_path = None
+        self.resource = None
+        self.ssl_options = ssl_options or {}
+        self.extra_headers = headers or []
+
+        if self.scheme == "wss":
+            # Prevent check_hostname requires server_hostname (ref #187)
+            if "cert_reqs" not in self.ssl_options:
+                self.ssl_options["cert_reqs"] = ssl.CERT_NONE
+
+        self._parse_url()
+
+        if self.unix_socket_path:
+            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
+        else:
+            # Let's handle IPv4 and IPv6 addresses
+            # Simplified from CherryPy's code
+            try:
+                family, socktype, proto, canonname, sa = socket.getaddrinfo(self.host, self.port,
+                                                                            socket.AF_UNSPEC,
+                                                                            socket.SOCK_STREAM,
+                                                                            0, socket.AI_PASSIVE)[0]
+            except socket.gaierror:
+                family = socket.AF_INET
+                if self.host.startswith('::'):
+                    family = socket.AF_INET6
+
+                socktype = socket.SOCK_STREAM
+                proto = 0
+                canonname = ""
+                sa = (self.host, self.port, 0, 0)
+
+            sock = socket.socket(family, socktype, proto)
+            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            if hasattr(socket, 'AF_INET6') and family == socket.AF_INET6 and \
+              self.host.startswith('::'):
+                try:
+                    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
+                except (AttributeError, socket.error):
+                    pass
+
+        WebSocket.__init__(self, sock, protocols=protocols,
+                           extensions=extensions,
+                           heartbeat_freq=heartbeat_freq)
+
+        self.stream.always_mask = True
+        self.stream.expect_masking = False
+        self.key = b64encode(os.urandom(16))
+
+    # Adpated from: https://github.com/liris/websocket-client/blob/master/websocket.py#L105
+    def _parse_url(self):
+        """
+        Parses a URL which must have one of the following forms:
+
+        - ws://host[:port][path]
+        - wss://host[:port][path]
+        - ws+unix:///path/to/my.socket
+
+        In the first two cases, the ``host`` and ``port``
+        attributes will be set to the parsed values. If no port
+        is explicitely provided, it will be either 80 or 443
+        based on the scheme. Also, the ``resource`` attribute is
+        set to the path segment of the URL (alongside any querystring).
+
+        In addition, if the scheme is ``ws+unix``, the
+        ``unix_socket_path`` attribute is set to the path to
+        the Unix socket while the ``resource`` attribute is
+        set to ``/``.
+        """
+        # Python 2.6.1 and below don't parse ws or wss urls properly. netloc is empty.
+        # See: https://github.com/Lawouach/WebSocket-for-Python/issues/59
+        scheme, url = self.url.split(":", 1)
+
+        parsed = urlsplit(url, scheme="http")
+        if parsed.hostname:
+            self.host = parsed.hostname
+        elif '+unix' in scheme:
+            self.host = 'localhost'
+        else:
+            raise ValueError("Invalid hostname from: %s", self.url)
+
+        if parsed.port:
+            self.port = parsed.port
+
+        if scheme == "ws":
+            if not self.port:
+                self.port = 80
+        elif scheme == "wss":
+            if not self.port:
+                self.port = 443
+        elif scheme in ('ws+unix', 'wss+unix'):
+            pass
+        else:
+            raise ValueError("Invalid scheme: %s" % scheme)
+
+        if parsed.path:
+            resource = parsed.path
+        else:
+            resource = "/"
+
+        if '+unix' in scheme:
+            self.unix_socket_path = resource
+            resource = '/'
+
+        if parsed.query:
+            resource += "?" + parsed.query
+
+        self.scheme = scheme
+        self.resource = resource
+
+    @property
+    def bind_addr(self):
+        """
+        Returns the Unix socket path if or a tuple
+        ``(host, port)`` depending on the initial
+        URL's scheme.
+        """
+        return self.unix_socket_path or (self.host, self.port)
+
+    def close(self, code=1000, reason=''):
+        """
+        Initiate the closing handshake with the server.
+        """
+        if not self.client_terminated:
+            self.client_terminated = True
+            self._write(self.stream.close(code=code, reason=reason).single(mask=True))
+
+    def connect(self):
+        """
+        Connects this websocket and starts the upgrade handshake
+        with the remote endpoint.
+        """
+        if self.scheme == "wss":
+            # default port is now 443; upgrade self.sender to send ssl
+            self.sock = ssl.wrap_socket(self.sock, **self.ssl_options)
+            self._is_secure = True
+            
+        self.sock.connect(self.bind_addr)
+
+        self._write(self.handshake_request)
+
+        response = b''
+        doubleCLRF = b'\r\n\r\n'
+        while True:
+            bytes = self.sock.recv(128)
+            if not bytes:
+                break
+            response += bytes
+            if doubleCLRF in response:
+                break
+
+        if not response:
+            self.close_connection()
+            raise HandshakeError("Invalid response")
+
+        headers, _, body = response.partition(doubleCLRF)
+        response_line, _, headers = headers.partition(b'\r\n')
+
+        try:
+            self.process_response_line(response_line)
+            self.protocols, self.extensions = self.process_handshake_header(headers)
+        except HandshakeError:
+            self.close_connection()
+            raise
+
+        self.handshake_ok()
+        if body:
+            self.process(body)
+
+    @property
+    def handshake_headers(self):
+        """
+        List of headers appropriate for the upgrade
+        handshake.
+        """
+        headers = [
+            ('Host', '%s:%s' % (self.host, self.port)),
+            ('Connection', 'Upgrade'),
+            ('Upgrade', 'websocket'),
+            ('Sec-WebSocket-Key', self.key.decode('utf-8')),
+            ('Sec-WebSocket-Version', str(max(WS_VERSION)))
+            ]
+        
+        if self.protocols:
+            headers.append(('Sec-WebSocket-Protocol', ','.join(self.protocols)))
+
+        if self.extra_headers:
+            headers.extend(self.extra_headers)
+
+        if not any(x for x in headers if x[0].lower() == 'origin'):
+
+            scheme, url = self.url.split(":", 1)
+            parsed = urlsplit(url, scheme="http")
+            if parsed.hostname:
+                self.host = parsed.hostname
+            else:
+                self.host = 'localhost'
+            origin = scheme + '://' + self.host
+            if parsed.port:
+                origin = origin + ':' + str(parsed.port)
+            headers.append(('Origin', origin))
+
+        return headers
+
+    @property
+    def handshake_request(self):
+        """
+        Prepare the request to be sent for the upgrade handshake.
+        """
+        headers = self.handshake_headers
+        request = [("GET %s HTTP/1.1" % self.resource).encode('utf-8')]
+        for header, value in headers:
+            request.append(("%s: %s" % (header, value)).encode('utf-8'))
+        request.append(b'\r\n')
+
+        return b'\r\n'.join(request)
+
+    def process_response_line(self, response_line):
+        """
+        Ensure that we received a HTTP `101` status code in
+        response to our request and if not raises :exc:`HandshakeError`.
+        """
+        protocol, code, status = response_line.split(b' ', 2)
+        if code != b'101':
+            raise HandshakeError("Invalid response status: %s %s" % (code, status))
+
+    def process_handshake_header(self, headers):
+        """
+        Read the upgrade handshake's response headers and
+        validate them against :rfc:`6455`.
+        """
+        protocols = []
+        extensions = []
+
+        headers = headers.strip()
+
+        for header_line in headers.split(b'\r\n'):
+            header, value = header_line.split(b':', 1)
+            header = header.strip().lower()
+            value = value.strip().lower()
+
+            if header == b'upgrade' and value != b'websocket':
+                raise HandshakeError("Invalid Upgrade header: %s" % value)
+
+            elif header == b'connection' and value != b'upgrade':
+                raise HandshakeError("Invalid Connection header: %s" % value)
+
+            elif header == b'sec-websocket-accept':
+                match = b64encode(sha1(self.key + WS_KEY).digest())
+                if value != match.lower():
+                    raise HandshakeError("Invalid challenge response: %s" % value)
+
+            elif header == b'sec-websocket-protocol':
+                protocols = ','.join(value)
+
+            elif header == b'sec-websocket-extensions':
+                extensions = ','.join(value)
+
+        return protocols, extensions
+
+    def handshake_ok(self):
+        self.opened()

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/client/geventclient.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/client/geventclient.py b/ambari-common/src/main/python/ambari_ws4py/client/geventclient.py
new file mode 100644
index 0000000..a1527b0
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/client/geventclient.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+import copy
+
+import gevent
+from gevent import Greenlet
+from gevent.queue import Queue
+
+from ambari_ws4py.client import WebSocketBaseClient
+
+__all__ = ['WebSocketClient']
+
+class WebSocketClient(WebSocketBaseClient):
+    def __init__(self, url, protocols=None, extensions=None, heartbeat_freq=None, ssl_options=None, headers=None):
+        """
+        WebSocket client that executes the
+        :meth:`run() <ambari_ws4py.websocket.WebSocket.run>` into a gevent greenlet.
+
+        .. code-block:: python
+
+          ws = WebSocketClient('ws://localhost:9000/echo', protocols=['http-only', 'chat'])
+          ws.connect()
+
+          ws.send("Hello world")
+
+          def incoming():
+            while True:
+               m = ws.receive()
+               if m is not None:
+                  print str(m)
+               else:
+                  break
+
+          def outgoing():
+            for i in range(0, 40, 5):
+               ws.send("*" * i)
+
+          greenlets = [
+             gevent.spawn(incoming),
+             gevent.spawn(outgoing),
+          ]
+          gevent.joinall(greenlets)
+        """
+        WebSocketBaseClient.__init__(self, url, protocols, extensions, heartbeat_freq,
+                                     ssl_options=ssl_options, headers=headers)
+        self._th = Greenlet(self.run)
+
+        self.messages = Queue()
+        """
+        Queue that will hold received messages.
+        """
+
+    def handshake_ok(self):
+        """
+        Called when the upgrade handshake has completed
+        successfully.
+
+        Starts the client's thread.
+        """
+        self._th.start()
+
+    def received_message(self, message):
+        """
+        Override the base class to store the incoming message
+        in the `messages` queue.
+        """
+        self.messages.put(copy.deepcopy(message))
+
+    def closed(self, code, reason=None):
+        """
+        Puts a :exc:`StopIteration` as a message into the
+        `messages` queue.
+        """
+        # When the connection is closed, put a StopIteration
+        # on the message queue to signal there's nothing left
+        # to wait for
+        self.messages.put(StopIteration)
+
+    def receive(self):
+        """
+        Returns messages that were stored into the
+        `messages` queue and returns `None` when the
+        websocket is terminated or closed.
+        """
+        # If the websocket was terminated and there are no messages
+        # left in the queue, return None immediately otherwise the client
+        # will block forever
+        if self.terminated and self.messages.empty():
+            return None
+        message = self.messages.get()
+        if message is StopIteration:
+            return None
+        return message

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/client/threadedclient.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/client/threadedclient.py b/ambari-common/src/main/python/ambari_ws4py/client/threadedclient.py
new file mode 100644
index 0000000..c6ebbbc
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/client/threadedclient.py
@@ -0,0 +1,98 @@
+# -*- coding: utf-8 -*-
+import threading
+
+from ambari_ws4py.client import WebSocketBaseClient
+
+__all__ = ['WebSocketClient']
+
+class WebSocketClient(WebSocketBaseClient):
+    def __init__(self, url, protocols=None, extensions=None, heartbeat_freq=None,
+                 ssl_options=None, headers=None):
+        """
+        .. code-block:: python
+
+           from ambari_ws4py.client.threadedclient import WebSocketClient
+
+           class EchoClient(WebSocketClient):
+               def opened(self):
+                  for i in range(0, 200, 25):
+                     self.send("*" * i)
+
+               def closed(self, code, reason):
+                  print(("Closed down", code, reason))
+
+               def received_message(self, m):
+                  print("=> %d %s" % (len(m), str(m)))
+
+           try:
+               ws = EchoClient('ws://localhost:9000/echo', protocols=['http-only', 'chat'])
+               ws.connect()
+           except KeyboardInterrupt:
+              ws.close()
+
+        """
+        WebSocketBaseClient.__init__(self, url, protocols, extensions, heartbeat_freq,
+                                     ssl_options, headers=headers)
+        self._th = threading.Thread(target=self.run, name='WebSocketClient')
+        self._th.daemon = True
+
+    @property
+    def daemon(self):
+        """
+        `True` if the client's thread is set to be a daemon thread.
+        """
+        return self._th.daemon
+
+    @daemon.setter
+    def daemon(self, flag):
+        """
+        Set to `True` if the client's thread should be a daemon.
+        """
+        self._th.daemon = flag
+
+    def run_forever(self):
+        """
+        Simply blocks the thread until the
+        websocket has terminated.
+        """
+        while not self.terminated:
+            self._th.join(timeout=0.1)
+
+    def handshake_ok(self):
+        """
+        Called when the upgrade handshake has completed
+        successfully.
+
+        Starts the client's thread.
+        """
+        self._th.start()
+
+if __name__ == '__main__':
+    from ambari_ws4py.client.threadedclient import WebSocketClient
+
+    class EchoClient(WebSocketClient):
+        def opened(self):
+            def data_provider():
+                for i in range(0, 200, 25):
+                    yield "#" * i
+
+            self.send(data_provider())
+
+            for i in range(0, 200, 25):
+                self.send("*" * i)
+
+        def closed(self, code, reason):
+            print(("Closed down", code, reason))
+
+        def received_message(self, m):
+            print("#%d" % len(m))
+            if len(m) == 175:
+                self.close(reason='bye bye')
+
+    try:
+        ws = EchoClient('ws://localhost:9000/ws', protocols=['http-only', 'chat'],
+                        headers=[('X-Test', 'hello there')])
+        ws.connect()
+        ws.run_forever()
+    except KeyboardInterrupt:
+        ws.close()

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/client/tornadoclient.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/client/tornadoclient.py b/ambari-common/src/main/python/ambari_ws4py/client/tornadoclient.py
new file mode 100644
index 0000000..b99cc54
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/client/tornadoclient.py
@@ -0,0 +1,155 @@
+# -*- coding: utf-8 -*-
+import ssl
+
+from tornado import iostream, escape
+from ambari_ws4py.client import WebSocketBaseClient
+from ambari_ws4py.exc import HandshakeError
+
+__all__ = ['TornadoWebSocketClient']
+
+class TornadoWebSocketClient(WebSocketBaseClient):
+    def __init__(self, url, protocols=None, extensions=None,
+                 io_loop=None, ssl_options=None, headers=None):
+        """
+        .. code-block:: python
+
+            from tornado import ioloop
+
+            class MyClient(TornadoWebSocketClient):
+                def opened(self):
+                    for i in range(0, 200, 25):
+                        self.send("*" * i)
+
+                def received_message(self, m):
+                    print((m, len(str(m))))
+
+                def closed(self, code, reason=None):
+                    ioloop.IOLoop.instance().stop()
+
+            ws = MyClient('ws://localhost:9000/echo', protocols=['http-only', 'chat'])
+            ws.connect()
+
+            ioloop.IOLoop.instance().start()
+        """
+        WebSocketBaseClient.__init__(self, url, protocols, extensions,
+                                     ssl_options=ssl_options, headers=headers)
+        if self.scheme == "wss":
+            self.sock = ssl.wrap_socket(self.sock, do_handshake_on_connect=False, **self.ssl_options)
+            self._is_secure = True
+            self.io = iostream.SSLIOStream(self.sock, io_loop, ssl_options=self.ssl_options)
+        else:
+            self.io = iostream.IOStream(self.sock, io_loop)
+        self.io_loop = io_loop
+
+    def connect(self):
+        """
+        Connects the websocket and initiate the upgrade handshake.
+        """
+        self.io.set_close_callback(self.__connection_refused)
+        self.io.connect((self.host, int(self.port)), self.__send_handshake)
+
+    def _write(self, b):
+        """
+        Trying to prevent a write operation
+        on an already closed websocket stream.
+
+        This cannot be bullet proof but hopefully
+        will catch almost all use cases.
+        """
+        if self.terminated:
+            raise RuntimeError("Cannot send on a terminated websocket")
+
+        self.io.write(b)
+
+    def __connection_refused(self, *args, **kwargs):
+        self.server_terminated = True
+        self.closed(1005, 'Connection refused')
+
+    def __send_handshake(self):
+        self.io.set_close_callback(self.__connection_closed)
+        self.io.write(escape.utf8(self.handshake_request),
+                      self.__handshake_sent)
+
+    def __connection_closed(self, *args, **kwargs):
+        self.server_terminated = True
+        self.closed(1006, 'Connection closed during handshake')
+
+    def __handshake_sent(self):
+        self.io.read_until(b"\r\n\r\n", self.__handshake_completed)
+
+    def __handshake_completed(self, data):
+        self.io.set_close_callback(None)
+        try:
+            response_line, _, headers = data.partition(b'\r\n')
+            self.process_response_line(response_line)
+            protocols, extensions = self.process_handshake_header(headers)
+        except HandshakeError:
+            self.close_connection()
+            raise
+
+        self.opened()
+        self.io.set_close_callback(self.__stream_closed)
+        self.io.read_bytes(self.reading_buffer_size, self.__fetch_more)
+
+    def __fetch_more(self, bytes):
+        try:
+            should_continue = self.process(bytes)
+        except:
+            should_continue = False
+
+        if should_continue:
+            self.io.read_bytes(self.reading_buffer_size, self.__fetch_more)
+        else:
+            self.__gracefully_terminate()
+
+    def __gracefully_terminate(self):
+        self.client_terminated = self.server_terminated = True
+
+        try:
+            if not self.stream.closing:
+                self.closed(1006)
+        finally:
+            self.close_connection()
+
+    def __stream_closed(self, *args, **kwargs):
+        self.io.set_close_callback(None)
+        code = 1006
+        reason = None
+        if self.stream.closing:
+            code, reason = self.stream.closing.code, self.stream.closing.reason
+        self.closed(code, reason)
+        self.stream._cleanup()
+
+    def close_connection(self):
+        """
+        Close the underlying connection
+        """
+        self.io.close()
+
+if __name__ == '__main__':
+    from tornado import ioloop
+
+    class MyClient(TornadoWebSocketClient):
+        def opened(self):
+            def data_provider():
+                for i in range(0, 200, 25):
+                    yield "#" * i
+
+            self.send(data_provider())
+
+            for i in range(0, 200, 25):
+                self.send("*" * i)
+
+        def received_message(self, m):
+            print("#%d" % len(m))
+            if len(m) == 175:
+                self.close()
+
+        def closed(self, code, reason=None):
+            ioloop.IOLoop.instance().stop()
+            print(("Closed down", code, reason))
+
+    ws = MyClient('ws://localhost:9000/ws', protocols=['http-only', 'chat'])
+    ws.connect()
+
+    ioloop.IOLoop.instance().start()

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/compat.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/compat.py b/ambari-common/src/main/python/ambari_ws4py/compat.py
new file mode 100644
index 0000000..e5c299c
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/compat.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+__doc__ = """
+This compatibility module is inspired by the one found
+in CherryPy. It provides a common entry point for the various
+functions and types that are used with ambari_ws4py but which
+differ from Python 2.x to Python 3.x
+
+There are likely better ways for some of them so feel
+free to provide patches.
+
+Note this has been tested against 2.7 and 3.3 only but
+should hopefully work fine with other versions too.
+"""
+import sys
+
+if sys.version_info >= (3, 0):
+    py3k = True
+    from urllib.parse import urlsplit
+    range = range
+    unicode = str
+    basestring = (bytes, str)
+    _ord = ord
+
+    def get_connection(fileobj):
+        return fileobj.raw._sock
+
+    def detach_connection(fileobj):
+        fileobj.detach()
+
+    def ord(c):
+        if isinstance(c, int):
+            return c
+        return _ord(c)
+else:
+    py3k = False
+    from urlparse import urlsplit
+    range = xrange
+    unicode = unicode
+    basestring = basestring
+    ord = ord
+
+    def get_connection(fileobj):
+        return fileobj._sock
+
+    def detach_connection(fileobj):
+        fileobj._sock = None

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/exc.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/exc.py b/ambari-common/src/main/python/ambari_ws4py/exc.py
new file mode 100644
index 0000000..bfefea4
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/exc.py
@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+
+__all__ = ['WebSocketException', 'FrameTooLargeException', 'ProtocolException',
+           'UnsupportedFrameTypeException', 'TextFrameEncodingException',
+           'UnsupportedFrameTypeException', 'TextFrameEncodingException',
+           'StreamClosed', 'HandshakeError', 'InvalidBytesError']
+
+class WebSocketException(Exception): pass
+
+class ProtocolException(WebSocketException): pass
+
+class FrameTooLargeException(WebSocketException): pass
+
+class UnsupportedFrameTypeException(WebSocketException): pass
+
+class TextFrameEncodingException(WebSocketException): pass
+
+class InvalidBytesError(WebSocketException): pass
+
+class StreamClosed(Exception): pass
+
+class HandshakeError(WebSocketException):
+    def __init__(self, msg):
+        self.msg = msg
+
+    def __str__(self):
+        return self.msg

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/framing.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/framing.py b/ambari-common/src/main/python/ambari_ws4py/framing.py
new file mode 100644
index 0000000..a7f62c8
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/framing.py
@@ -0,0 +1,273 @@
+# -*- coding: utf-8 -*-
+from struct import pack, unpack
+
+from ambari_ws4py.exc import FrameTooLargeException, ProtocolException
+from ambari_ws4py.compat import py3k, ord, range
+
+# Frame opcodes defined in the spec.
+OPCODE_CONTINUATION = 0x0
+OPCODE_TEXT = 0x1
+OPCODE_BINARY = 0x2
+OPCODE_CLOSE = 0x8
+OPCODE_PING = 0x9
+OPCODE_PONG = 0xa
+
+__all__ = ['Frame']
+
+class Frame(object):
+    def __init__(self, opcode=None, body=b'', masking_key=None, fin=0, rsv1=0, rsv2=0, rsv3=0):
+        """
+        Implements the framing protocol as defined by RFC 6455.
+
+        .. code-block:: python
+           :linenos:
+
+           >>> test_mask = 'XXXXXX' # perhaps from os.urandom(4)
+           >>> f = Frame(OPCODE_TEXT, 'hello world', masking_key=test_mask, fin=1)
+           >>> bytes = f.build()
+           >>> bytes.encode('hex')
+           '818bbe04e66ad6618a06d1249105cc6882'
+           >>> f = Frame()
+           >>> f.parser.send(bytes[0])
+           1
+           >>> f.parser.send(bytes[1])
+           4
+
+        .. seealso:: Data Framing http://tools.ietf.org/html/rfc6455#section-5.2
+        """
+        if not isinstance(body, bytes):
+            raise TypeError("The body must be properly encoded")
+
+        self.opcode = opcode
+        self.body = body
+        self.masking_key = masking_key
+        self.fin = fin
+        self.rsv1 = rsv1
+        self.rsv2 = rsv2
+        self.rsv3 = rsv3
+        self.payload_length = len(body)
+
+        self._parser = None
+
+    @property
+    def parser(self):
+        if self._parser is None:
+            self._parser = self._parsing()
+            # Python generators must be initialized once.
+            next(self.parser)
+        return self._parser
+
+    def _cleanup(self):
+        if self._parser:
+            self._parser.close()
+            self._parser = None
+
+    def build(self):
+        """
+        Builds a frame from the instance's attributes and returns
+        its bytes representation.
+        """
+        header = b''
+
+        if self.fin > 0x1:
+            raise ValueError('FIN bit parameter must be 0 or 1')
+
+        if 0x3 <= self.opcode <= 0x7 or 0xB <= self.opcode:
+            raise ValueError('Opcode cannot be a reserved opcode')
+
+        ## +-+-+-+-+-------+
+        ## |F|R|R|R| opcode|
+        ## |I|S|S|S|  (4)  |
+        ## |N|V|V|V|       |
+        ## | |1|2|3|       |
+        ## +-+-+-+-+-------+
+        header = pack('!B', ((self.fin << 7)
+                             | (self.rsv1 << 6)
+                             | (self.rsv2 << 5)
+                             | (self.rsv3 << 4)
+                             | self.opcode))
+
+        ##                 +-+-------------+-------------------------------+
+        ##                 |M| Payload len |    Extended payload length    |
+        ##                 |A|     (7)     |             (16/63)           |
+        ##                 |S|             |   (if payload len==126/127)   |
+        ##                 |K|             |                               |
+        ## +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
+        ## |     Extended payload length continued, if payload len == 127  |
+        ## + - - - - - - - - - - - - - - - +-------------------------------+
+        if self.masking_key: mask_bit = 1 << 7
+        else: mask_bit = 0
+
+        length = self.payload_length
+        if length < 126:
+            header += pack('!B', (mask_bit | length))
+        elif length < (1 << 16):
+            header += pack('!B', (mask_bit | 126)) + pack('!H', length)
+        elif length < (1 << 63):
+            header += pack('!B', (mask_bit | 127)) + pack('!Q', length)
+        else:
+            raise FrameTooLargeException()
+        
+        ## + - - - - - - - - - - - - - - - +-------------------------------+
+        ## |                               |Masking-key, if MASK set to 1  |
+        ## +-------------------------------+-------------------------------+
+        ## | Masking-key (continued)       |          Payload Data         |
+        ## +-------------------------------- - - - - - - - - - - - - - - - +
+        ## :                     Payload Data continued ...                :
+        ## + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
+        ## |                     Payload Data continued ...                |
+        ## +---------------------------------------------------------------+
+        body = self.body
+        if not self.masking_key:
+            return bytes(header + body)
+
+        return bytes(header + self.masking_key + self.mask(body))
+
+    def _parsing(self):
+        """
+        Generator to parse bytes into a frame. Yields until
+        enough bytes have been read or an error is met.
+        """
+        buf = b''
+        some_bytes = b''
+
+        # yield until we get the first header's byte
+        while not some_bytes:
+            some_bytes = (yield 1)
+
+        first_byte = some_bytes[0] if isinstance(some_bytes, bytearray) else ord(some_bytes[0])
+        # frame-fin = %x0 ; more frames of this message follow
+        #           / %x1 ; final frame of this message
+        self.fin = (first_byte >> 7) & 1
+        self.rsv1 = (first_byte >> 6) & 1
+        self.rsv2 = (first_byte >> 5) & 1
+        self.rsv3 = (first_byte >> 4) & 1
+        self.opcode = first_byte & 0xf
+
+        # frame-rsv1 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
+        # frame-rsv2 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
+        # frame-rsv3 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
+        if self.rsv1 or self.rsv2 or self.rsv3:
+            raise ProtocolException()
+
+        # control frames between 3 and 7 as well as above 0xA are currently reserved
+        if 2 < self.opcode < 8 or self.opcode > 0xA:
+            raise ProtocolException()
+
+        # control frames cannot be fragmented
+        if self.opcode > 0x7 and self.fin == 0:
+            raise ProtocolException()
+
+        # do we already have enough some_bytes to continue?
+        some_bytes = some_bytes[1:] if some_bytes and len(some_bytes) > 1 else b''
+
+        # Yield until we get the second header's byte
+        while not some_bytes:
+            some_bytes = (yield 1)
+
+        second_byte = some_bytes[0] if isinstance(some_bytes, bytearray) else ord(some_bytes[0])
+        mask = (second_byte >> 7) & 1
+        self.payload_length = second_byte & 0x7f
+
+        # All control frames MUST have a payload length of 125 some_bytes or less
+        if self.opcode > 0x7 and self.payload_length > 125:
+            raise FrameTooLargeException()
+
+        if some_bytes and len(some_bytes) > 1:
+            buf = some_bytes[1:]
+            some_bytes = buf
+        else:
+            buf = b''
+            some_bytes = b''
+
+        if self.payload_length == 127:
+            # This will compute the actual application data size
+            if len(buf) < 8:
+                nxt_buf_size = 8 - len(buf)
+                some_bytes = (yield nxt_buf_size)
+                some_bytes = buf + (some_bytes or b'')
+                while len(some_bytes) < 8:
+                    b = (yield 8 - len(some_bytes))
+                    if b is not None:
+                        some_bytes = some_bytes + b
+                if len(some_bytes) > 8:
+                    buf = some_bytes[8:]
+                    some_bytes = some_bytes[:8]
+            else:
+                some_bytes = buf[:8]
+                buf = buf[8:]
+            extended_payload_length = some_bytes
+            self.payload_length = unpack(
+                '!Q', extended_payload_length)[0]
+            if self.payload_length > 0x7FFFFFFFFFFFFFFF:
+                raise FrameTooLargeException()
+        elif self.payload_length == 126:
+            if len(buf) < 2:
+                nxt_buf_size = 2 - len(buf)
+                some_bytes = (yield nxt_buf_size)
+                some_bytes = buf + (some_bytes or b'')
+                while len(some_bytes) < 2:
+                    b = (yield 2 - len(some_bytes))
+                    if b is not None:
+                        some_bytes = some_bytes + b
+                if len(some_bytes) > 2:
+                    buf = some_bytes[2:]
+                    some_bytes = some_bytes[:2]
+            else:
+                some_bytes = buf[:2]
+                buf = buf[2:]
+            extended_payload_length = some_bytes
+            self.payload_length = unpack(
+                '!H', extended_payload_length)[0]
+
+        if mask:
+            if len(buf) < 4:
+                nxt_buf_size = 4 - len(buf)
+                some_bytes = (yield nxt_buf_size)
+                some_bytes = buf + (some_bytes or b'')
+                while not some_bytes or len(some_bytes) < 4:
+                    b = (yield 4 - len(some_bytes))
+                    if b is not None:
+                        some_bytes = some_bytes + b
+                if len(some_bytes) > 4:
+                    buf = some_bytes[4:]
+            else:
+                some_bytes = buf[:4]
+                buf = buf[4:]
+            self.masking_key = some_bytes
+
+        if len(buf) < self.payload_length:
+            nxt_buf_size = self.payload_length - len(buf)
+            some_bytes = (yield nxt_buf_size)
+            some_bytes = buf + (some_bytes or b'')
+            while len(some_bytes) < self.payload_length:
+                l = self.payload_length - len(some_bytes)
+                b = (yield l)
+                if b is not None:
+                    some_bytes = some_bytes + b
+        else:
+            if self.payload_length == len(buf):
+                some_bytes = buf
+            else:
+                some_bytes = buf[:self.payload_length]
+
+        self.body = some_bytes
+        yield
+
+    def mask(self, data):
+        """
+        Performs the masking or unmasking operation on data
+        using the simple masking algorithm:
+
+        ..
+           j                   = i MOD 4
+           transformed-octet-i = original-octet-i XOR masking-key-octet-j
+
+        """
+        masked = bytearray(data)
+        if py3k: key = self.masking_key
+        else: key = map(ord, self.masking_key)
+        for i in range(len(data)):
+            masked[i] = masked[i] ^ key[i%4]
+        return masked
+    unmask = mask

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/manager.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/manager.py b/ambari-common/src/main/python/ambari_ws4py/manager.py
new file mode 100644
index 0000000..23fd8e1
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/manager.py
@@ -0,0 +1,368 @@
+# -*- coding: utf-8 -*-
+__doc__ = """
+The manager module provides a selected classes to
+handle websocket's execution.
+
+Initially the rationale was to:
+
+- Externalize the way the CherryPy server had been setup
+  as its websocket management was too tightly coupled with
+  the plugin implementation.
+- Offer a management that could be used by other
+  server or client implementations.
+- Move away from the threaded model to the event-based
+  model by relying on `select` or `epoll` (when available).
+
+
+A simple usage for handling websocket clients:
+
+.. code-block:: python
+
+    from ambari_ws4py.client import WebSocketBaseClient
+    from ambari_ws4py.manager import WebSocketManager
+
+    m = WebSocketManager()
+
+    class EchoClient(WebSocketBaseClient):
+        def handshake_ok(self):
+            m.add(self)  # register the client once the handshake is done
+
+        def received_message(self, msg):
+            print str(msg)
+
+    m.start()
+
+    client = EchoClient('ws://localhost:9000/ws')
+    client.connect()
+
+    m.join()  # blocks forever
+
+Managers are not compulsory but hopefully will help your
+workflow. For clients, you can still rely on threaded, gevent or
+tornado based implementations of course.
+"""
+import logging
+import select
+import threading
+import time
+
+from ambari_ws4py import format_addresses
+from ambari_ws4py.compat import py3k
+
+logger = logging.getLogger('ambari_ws4py')
+
+class SelectPoller(object):
+    def __init__(self, timeout=0.1):
+        """
+        A socket poller that uses the `select`
+        implementation to determines which
+        file descriptors have data available to read.
+
+        It is available on all platforms.
+        """
+        self._fds = []
+        self.timeout = timeout
+
+    def release(self):
+        """
+        Cleanup resources.
+        """
+        self._fds = []
+
+    def register(self, fd):
+        """
+        Register a new file descriptor to be
+        part of the select polling next time around.
+        """
+        if fd not in self._fds:
+            self._fds.append(fd)
+
+    def unregister(self, fd):
+        """
+        Unregister the given file descriptor.
+        """
+        if fd in self._fds:
+            self._fds.remove(fd)
+
+    def poll(self):
+        """
+        Polls once and returns a list of
+        ready-to-be-read file descriptors.
+        """
+        if not self._fds:
+            time.sleep(self.timeout)
+            return []
+        try:
+            r, w, x = select.select(self._fds, [], [], self.timeout)
+        except IOError as e:
+            return []
+        return r
+
+class EPollPoller(object):
+    def __init__(self, timeout=0.1):
+        """
+        An epoll poller that uses the ``epoll``
+        implementation to determines which
+        file descriptors have data available to read.
+
+        Available on Unix flavors mostly.
+        """
+        self.poller = select.epoll()
+        self.timeout = timeout
+
+    def release(self):
+        """
+        Cleanup resources.
+        """
+        self.poller.close()
+
+    def register(self, fd):
+        """
+        Register a new file descriptor to be
+        part of the select polling next time around.
+        """
+        try:
+            self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI)
+        except IOError:
+            pass
+
+    def unregister(self, fd):
+        """
+        Unregister the given file descriptor.
+        """
+        self.poller.unregister(fd)
+
+    def poll(self):
+        """
+        Polls once and yields each ready-to-be-read
+        file-descriptor
+        """
+        try:
+            events = self.poller.poll(timeout=self.timeout)
+        except IOError:
+            events = []
+
+        for fd, event in events:
+            if event | select.EPOLLIN | select.EPOLLPRI:
+                yield fd
+
+class KQueuePoller(object):
+    def __init__(self, timeout=0.1):
+        """
+        An epoll poller that uses the ``epoll``
+        implementation to determines which
+        file descriptors have data available to read.
+
+        Available on Unix flavors mostly.
+        """
+        self.poller = select.epoll()
+        self.timeout = timeout
+
+    def release(self):
+        """
+        Cleanup resources.
+        """
+        self.poller.close()
+
+    def register(self, fd):
+        """
+        Register a new file descriptor to be
+        part of the select polling next time around.
+        """
+        try:
+            self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI)
+        except IOError:
+            pass
+
+    def unregister(self, fd):
+        """
+        Unregister the given file descriptor.
+        """
+        self.poller.unregister(fd)
+
+    def poll(self):
+        """
+        Polls once and yields each ready-to-be-read
+        file-descriptor
+        """
+        try:
+            events = self.poller.poll(timeout=self.timeout)
+        except IOError:
+            events = []
+        for fd, event in events:
+            if event | select.EPOLLIN | select.EPOLLPRI:
+                yield fd
+
+class WebSocketManager(threading.Thread):
+    def __init__(self, poller=None):
+        """
+        An event-based websocket manager. By event-based, we mean
+        that the websockets will be called when their
+        sockets have data to be read from.
+
+        The manager itself runs in its own thread as not to
+        be the blocking mainloop of your application.
+
+        The poller's implementation is automatically chosen
+        with ``epoll`` if available else ``select`` unless you
+        provide your own ``poller``.
+        """
+        threading.Thread.__init__(self)
+        self.name = "WebSocketManager"
+        self.lock = threading.Lock()
+        self.websockets = {}
+        self.running = False
+
+        if poller:
+            self.poller = poller
+        else:
+            if hasattr(select, "epoll"):
+                self.poller = EPollPoller()
+                logger.info("Using epoll")
+            else:
+                self.poller = SelectPoller()
+                logger.info("Using select as epoll is not available")
+
+    def __len__(self):
+        return len(self.websockets)
+
+    def __iter__(self):
+        if py3k:
+            return iter(self.websockets.values())
+        else:
+            return self.websockets.itervalues()
+
+    def __contains__(self, ws):
+        fd = ws.sock.fileno()
+        # just in case the file descriptor was reused
+        # we actually check the instance (well, this might
+        # also have been reused...)
+        return self.websockets.get(fd) is ws
+
+    def add(self, websocket):
+        """
+        Manage a new websocket.
+
+        First calls its :meth:`opened() <ambari_ws4py.websocket.WebSocket.opened>`
+        method and register its socket against the poller
+        for reading events.
+        """
+        if websocket in self:
+            return
+
+        logger.info("Managing websocket %s" % format_addresses(websocket))
+        websocket.opened()
+        with self.lock:
+            fd = websocket.sock.fileno()
+            self.websockets[fd] = websocket
+            self.poller.register(fd)
+
+    def remove(self, websocket):
+        """
+        Remove the given ``websocket`` from the manager.
+
+        This does not call its :meth:`closed() <ambari_ws4py.websocket.WebSocket.closed>`
+        method as it's out-of-band by your application
+        or from within the manager's run loop.
+        """
+        if websocket not in self:
+            return
+
+        logger.info("Removing websocket %s" % format_addresses(websocket))
+        with self.lock:
+            fd = websocket.sock.fileno()
+            self.websockets.pop(fd, None)
+            self.poller.unregister(fd)
+
+    def stop(self):
+        """
+        Mark the manager as terminated and
+        releases its resources.
+        """
+        self.running = False
+        with self.lock:
+            self.websockets.clear()
+            self.poller.release()
+
+    def run(self):
+        """
+        Manager's mainloop executed from within a thread.
+
+        Constantly poll for read events and, when available,
+        call related websockets' `once` method to
+        read and process the incoming data.
+
+        If the :meth:`once() <ambari_ws4py.websocket.WebSocket.once>`
+        method returns a `False` value, its :meth:`terminate() <ambari_ws4py.websocket.WebSocket.terminate>`
+        method is also applied to properly close
+        the websocket and its socket is unregistered from the poller.
+
+        Note that websocket shouldn't take long to process
+        their data or they will block the remaining
+        websockets with data to be handled. As for what long means,
+        it's up to your requirements.
+        """
+        self.running = True
+        while self.running:
+            with self.lock:
+                polled = self.poller.poll()
+            if not self.running:
+                break
+
+            for fd in polled:
+                if not self.running:
+                    break
+
+                ws = self.websockets.get(fd)
+                if ws and not ws.terminated:
+                    # I don't know what kind of errors might spew out of here
+                    # but they probably shouldn't crash the entire server.
+                    try:
+                        x = ws.once()
+                    # Treat the error as if once() had returned None
+                    except Exception as e:
+                        x = None
+                        logger.error("Terminating websocket %s due to exception: %s in once method" % (format_addresses(ws), repr(e)) )
+                    if not x:
+                        with self.lock:
+                            self.websockets.pop(fd, None)
+                            self.poller.unregister(fd)
+
+                        if not ws.terminated:
+                            logger.info("Terminating websocket %s" % format_addresses(ws))
+                            ws.terminate()
+
+
+    def close_all(self, code=1001, message='Server is shutting down'):
+        """
+        Execute the :meth:`close() <ambari_ws4py.websocket.WebSocket.close>`
+        method of each registered websockets to initiate the closing handshake.
+        It doesn't wait for the handshake to complete properly.
+        """
+        with self.lock:
+            logger.info("Closing all websockets with [%d] '%s'" % (code, message))
+            for ws in iter(self):
+                ws.close(code=code, reason=message)
+
+    def broadcast(self, message, binary=False):
+        """
+        Broadcasts the given message to all registered
+        websockets, at the time of the call.
+
+        Broadcast may fail on a given registered peer
+        but this is silent as it's not the method's
+        purpose to handle websocket's failures.
+        """
+        with self.lock:
+            websockets = self.websockets.copy()
+            if py3k:
+                ws_iter = iter(websockets.values())
+            else:
+                ws_iter = websockets.itervalues()
+
+        for ws in ws_iter:
+            if not ws.terminated:
+                try:
+                    ws.send(message, binary)
+                except:
+                    pass

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/messaging.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/messaging.py b/ambari-common/src/main/python/ambari_ws4py/messaging.py
new file mode 100644
index 0000000..d94ee6e
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/messaging.py
@@ -0,0 +1,169 @@
+# -*- coding: utf-8 -*-
+import os
+import struct
+
+from ambari_ws4py.framing import Frame, OPCODE_CONTINUATION, OPCODE_TEXT, \
+     OPCODE_BINARY, OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG
+from ambari_ws4py.compat import unicode, py3k
+
+__all__ = ['Message', 'TextMessage', 'BinaryMessage', 'CloseControlMessage',
+           'PingControlMessage', 'PongControlMessage']
+
+class Message(object):
+    def __init__(self, opcode, data=b'', encoding='utf-8'):
+        """
+        A message is a application level entity. It's usually built
+        from one or many frames. The protocol defines several kind
+        of messages which are grouped into two sets:
+
+        * data messages which can be text or binary typed
+        * control messages which provide a mechanism to perform
+          in-band control communication between peers
+
+        The ``opcode`` indicates the message type and ``data`` is
+        the possible message payload.
+
+        The payload is held internally as a a :func:`bytearray` as they are
+        faster than pure strings for append operations.
+
+        Unicode data will be encoded using the provided ``encoding``.
+        """
+        self.opcode = opcode
+        self._completed = False
+        self.encoding = encoding
+
+        if isinstance(data, unicode):
+            if not encoding:
+                raise TypeError("unicode data without an encoding")
+            data = data.encode(encoding)
+        elif isinstance(data, bytearray):
+            data = bytes(data)
+        elif not isinstance(data, bytes):
+            raise TypeError("%s is not a supported data type" % type(data))
+
+        self.data = data
+
+    def single(self, mask=False):
+        """
+        Returns a frame bytes with the fin bit set and a random mask.
+
+        If ``mask`` is set, automatically mask the frame
+        using a generated 4-byte token.
+        """
+        mask = os.urandom(4) if mask else None
+        return Frame(body=self.data, opcode=self.opcode,
+                     masking_key=mask, fin=1).build()
+
+    def fragment(self, first=False, last=False, mask=False):
+        """
+        Returns a :class:`ambari_ws4py.framing.Frame` bytes.
+
+        The behavior depends on the given flags:
+
+        * ``first``: the frame uses ``self.opcode`` else a continuation opcode
+        * ``last``: the frame has its ``fin`` bit set
+        * ``mask``: the frame is masked using a automatically generated 4-byte token
+        """
+        fin = 1 if last is True else 0
+        opcode = self.opcode if first is True else OPCODE_CONTINUATION
+        mask = os.urandom(4) if mask else None
+        return Frame(body=self.data,
+                     opcode=opcode, masking_key=mask,
+                     fin=fin).build()
+
+    @property
+    def completed(self):
+        """
+        Indicates the the message is complete, meaning
+        the frame's ``fin`` bit was set.
+        """
+        return self._completed
+
+    @completed.setter
+    def completed(self, state):
+        """
+        Sets the state for this message. Usually
+        set by the stream's parser.
+        """
+        self._completed = state
+
+    def extend(self, data):
+        """
+        Add more ``data`` to the message.
+        """
+        if isinstance(data, bytes):
+            self.data += data
+        elif isinstance(data, bytearray):
+            self.data += bytes(data)
+        elif isinstance(data, unicode):
+            self.data += data.encode(self.encoding)
+        else:
+            raise TypeError("%s is not a supported data type" % type(data))
+
+    def __len__(self):
+        return len(self.__unicode__())
+
+    def __str__(self):
+        if py3k:
+            return self.data.decode(self.encoding)
+        return self.data
+
+    def __unicode__(self):
+        return self.data.decode(self.encoding)
+
+class TextMessage(Message):
+    def __init__(self, text=None):
+        Message.__init__(self, OPCODE_TEXT, text)
+
+    @property
+    def is_binary(self):
+        return False
+
+    @property
+    def is_text(self):
+        return True
+
+class BinaryMessage(Message):
+    def __init__(self, bytes=None):
+        Message.__init__(self, OPCODE_BINARY, bytes, encoding=None)
+
+    @property
+    def is_binary(self):
+        return True
+
+    @property
+    def is_text(self):
+        return False
+
+    def __len__(self):
+        return len(self.data)
+
+class CloseControlMessage(Message):
+    def __init__(self, code=1000, reason=''):
+        data = b""
+        if code:
+            data += struct.pack("!H", code)
+        if reason is not None:
+            if isinstance(reason, unicode):
+                reason = reason.encode('utf-8')
+            data += reason
+
+        Message.__init__(self, OPCODE_CLOSE, data, 'utf-8')
+        self.code = code
+        self.reason = reason
+
+    def __str__(self):
+        if py3k:
+            return self.reason.decode('utf-8')
+        return self.reason
+
+    def __unicode__(self):
+        return self.reason.decode(self.encoding)
+
+class PingControlMessage(Message):
+    def __init__(self, data=None):
+        Message.__init__(self, OPCODE_PING, data)
+
+class PongControlMessage(Message):
+    def __init__(self, data):
+        Message.__init__(self, OPCODE_PONG, data)

http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3961b/ambari-common/src/main/python/ambari_ws4py/streaming.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/streaming.py b/ambari-common/src/main/python/ambari_ws4py/streaming.py
new file mode 100644
index 0000000..61063ae
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_ws4py/streaming.py
@@ -0,0 +1,319 @@
+# -*- coding: utf-8 -*-
+import struct
+from struct import unpack
+
+from ambari_ws4py.utf8validator import Utf8Validator
+from ambari_ws4py.messaging import TextMessage, BinaryMessage, CloseControlMessage,\
+     PingControlMessage, PongControlMessage
+from ambari_ws4py.framing import Frame, OPCODE_CONTINUATION, OPCODE_TEXT, \
+     OPCODE_BINARY, OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG
+from ambari_ws4py.exc import FrameTooLargeException, ProtocolException, InvalidBytesError,\
+     TextFrameEncodingException, UnsupportedFrameTypeException, StreamClosed
+from ambari_ws4py.compat import py3k
+
+VALID_CLOSING_CODES = [1000, 1001, 1002, 1003, 1007, 1008, 1009, 1010, 1011]
+
+class Stream(object):
+    def __init__(self, always_mask=False, expect_masking=True):
+        """ Represents a websocket stream of bytes flowing in and out.
+
+        The stream doesn't know about the data provider itself and
+        doesn't even know about sockets. Instead the stream simply
+        yields for more bytes whenever it requires them. The stream owner
+        is responsible to provide the stream with those bytes until
+        a frame can be interpreted.
+
+        .. code-block:: python
+           :linenos:
+
+           >>> s = Stream()
+           >>> s.parser.send(BYTES)
+           >>> s.has_messages
+           False
+           >>> s.parser.send(MORE_BYTES)
+           >>> s.has_messages
+           True
+           >>> s.message
+           <TextMessage ... >
+
+        Set ``always_mask`` to mask all frames built.
+
+        Set ``expect_masking`` to indicate masking will be
+        checked on all parsed frames.
+        """
+
+        self.message = None
+        """
+        Parsed test or binary messages. Whenever the parser
+        reads more bytes from a fragment message, those bytes
+        are appended to the most recent message.
+        """
+
+        self.pings = []
+        """
+        Parsed ping control messages. They are instances of
+        :class:`ambari_ws4py.messaging.PingControlMessage`
+        """
+
+        self.pongs = []
+        """
+        Parsed pong control messages. They are instances of
+        :class:`ambari_ws4py.messaging.PongControlMessage`
+        """
+
+        self.closing = None
+        """
+        Parsed close control messsage. Instance of
+        :class:`ambari_ws4py.messaging.CloseControlMessage`
+        """
+
+        self.errors = []
+        """
+        Detected errors while parsing. Instances of
+        :class:`ambari_ws4py.messaging.CloseControlMessage`
+        """
+
+        self._parser = None
+        """
+        Parser in charge to process bytes it is fed with.
+        """
+
+        self.always_mask = always_mask
+        self.expect_masking = expect_masking
+
+    @property
+    def parser(self):
+        if self._parser is None:
+            self._parser = self.receiver()
+            # Python generators must be initialized once.
+            next(self.parser)
+        return self._parser
+
+    def _cleanup(self):
+        """
+        Frees the stream's resources rendering it unusable.
+        """
+        self.message = None
+        if self._parser is not None:
+            if not self._parser.gi_running:
+                self._parser.close()
+            self._parser = None
+        self.errors = None
+        self.pings = None
+        self.pongs = None
+        self.closing = None
+
+    def text_message(self, text):
+        """
+        Returns a :class:`ambari_ws4py.messaging.TextMessage` instance
+        ready to be built. Convenience method so
+        that the caller doesn't need to import the
+        :class:`ambari_ws4py.messaging.TextMessage` class itself.
+        """
+        return TextMessage(text=text)
+
+    def binary_message(self, bytes):
+        """
+        Returns a :class:`ambari_ws4py.messaging.BinaryMessage` instance
+        ready to be built. Convenience method so
+        that the caller doesn't need to import the
+        :class:`ambari_ws4py.messaging.BinaryMessage` class itself.
+        """
+        return BinaryMessage(bytes)
+
+    @property
+    def has_message(self):
+        """
+        Checks if the stream has received any message
+        which, if fragmented, is now completed.
+        """
+        if self.message is not None:
+            return self.message.completed
+
+        return False
+
+    def close(self, code=1000, reason=''):
+        """
+        Returns a close control message built from
+        a :class:`ambari_ws4py.messaging.CloseControlMessage` instance,
+        using the given status ``code`` and ``reason`` message.
+        """
+        return CloseControlMessage(code=code, reason=reason)
+
+    def ping(self, data=''):
+        """
+        Returns a ping control message built from
+        a :class:`ambari_ws4py.messaging.PingControlMessage` instance.
+        """
+        return PingControlMessage(data).single(mask=self.always_mask)
+
+    def pong(self, data=''):
+        """
+        Returns a ping control message built from
+        a :class:`ambari_ws4py.messaging.PongControlMessage` instance.
+        """
+        return PongControlMessage(data).single(mask=self.always_mask)
+
+    def receiver(self):
+        """
+        Parser that keeps trying to interpret bytes it is fed with as
+        incoming frames part of a message.
+
+        Control message are single frames only while data messages, like text
+        and binary, may be fragmented accross frames.
+
+        The way it works is by instanciating a :class:`wspy.framing.Frame` object,
+        then running its parser generator which yields how much bytes
+        it requires to performs its task. The stream parser yields this value
+        to its caller and feeds the frame parser.
+
+        When the frame parser raises :exc:`StopIteration`, the stream parser
+        tries to make sense of the parsed frame. It dispatches the frame's bytes
+        to the most appropriate message type based on the frame's opcode.
+
+        Overall this makes the stream parser totally agonstic to
+        the data provider.
+        """
+        utf8validator = Utf8Validator()
+        running = True
+        frame = None
+        while running:
+            frame = Frame()
+            while 1:
+                try:
+                    some_bytes = (yield next(frame.parser))
+                    frame.parser.send(some_bytes)
+                except GeneratorExit:
+                    running = False
+                    break
+                except StopIteration:
+                    frame._cleanup()
+                    some_bytes = frame.body
+
+                    # Let's avoid unmasking when there is no payload
+                    if some_bytes:
+                        if frame.masking_key and self.expect_masking:
+                            some_bytes = frame.unmask(some_bytes)
+                        elif not frame.masking_key and self.expect_masking:
+                            msg = CloseControlMessage(code=1002, reason='Missing masking when expected')
+                            self.errors.append(msg)
+                            break
+                        elif frame.masking_key and not self.expect_masking:
+                            msg = CloseControlMessage(code=1002, reason='Masked when not expected')
+                            self.errors.append(msg)
+                            break
+                        else:
+                            # If we reach this stage, it's because
+                            # the frame wasn't masked and we didn't expect
+                            # it anyway. Therefore, on py2k, the bytes
+                            # are actually a str object and can't be used
+                            # in the utf8 validator as we need integers
+                            # when we get each byte one by one.
+                            # Our only solution here is to convert our
+                            # string to a bytearray.
+                            some_bytes = bytearray(some_bytes)
+
+                    if frame.opcode == OPCODE_TEXT:
+                        if self.message and not self.message.completed:
+                            # We got a text frame before we completed the previous one
+                            msg = CloseControlMessage(code=1002, reason='Received a new message before completing previous')
+                            self.errors.append(msg)
+                            break
+
+                        m = TextMessage(some_bytes)
+                        m.completed = (frame.fin == 1)
+                        self.message = m
+
+                        if some_bytes:
+                            is_valid, end_on_code_point, _, _ = utf8validator.validate(some_bytes)
+
+                            if not is_valid or (m.completed and not end_on_code_point):
+                                self.errors.append(CloseControlMessage(code=1007, reason='Invalid UTF-8 bytes'))
+                                break
+
+                    elif frame.opcode == OPCODE_BINARY:
+                        if self.message and not self.message.completed:
+                            # We got a text frame before we completed the previous one
+                            msg = CloseControlMessage(code=1002, reason='Received a new message before completing previous')
+                            self.errors.append(msg)
+                            break
+
+                        m = BinaryMessage(some_bytes)
+                        m.completed = (frame.fin == 1)
+                        self.message = m
+
+                    elif frame.opcode == OPCODE_CONTINUATION:
+                        m = self.message
+                        if m is None:
+                            self.errors.append(CloseControlMessage(code=1002, reason='Message not started yet'))
+                            break
+
+                        m.extend(some_bytes)
+                        m.completed = (frame.fin == 1)
+                        if m.opcode == OPCODE_TEXT:
+                            if some_bytes:
+                                is_valid, end_on_code_point, _, _ = utf8validator.validate(some_bytes)
+
+                                if not is_valid or (m.completed and not end_on_code_point):
+                                    self.errors.append(CloseControlMessage(code=1007, reason='Invalid UTF-8 bytes'))
+                                    break
+
+                    elif frame.opcode == OPCODE_CLOSE:
+                        code = 1005
+                        reason = ""
+                        if frame.payload_length == 0:
+                            self.closing = CloseControlMessage(code=1005)
+                        elif frame.payload_length == 1:
+                            self.closing = CloseControlMessage(code=1005, reason='Payload has invalid length')
+                        else:
+                            try:
+                                # at this stage, some_bytes have been unmasked
+                                # so actually are held in a bytearray
+                                code = int(unpack("!H", bytes(some_bytes[0:2]))[0])
+                            except struct.error:
+                                reason = 'Failed at decoding closing code'
+                            else:
+                                # Those codes are reserved or plainly forbidden
+                                if code not in VALID_CLOSING_CODES and not (2999 < code < 5000):
+                                    reason = 'Invalid Closing Frame Code: %d' % code
+                                    code = 1005
+                                elif frame.payload_length > 1:
+                                    reason = some_bytes[2:] if frame.masking_key else frame.body[2:]
+
+                                    if not py3k: reason = bytearray(reason)
+                                    is_valid, end_on_code_point, _, _ = utf8validator.validate(reason)
+                                    if not is_valid or not end_on_code_point:
+                                        self.errors.append(CloseControlMessage(code=1007, reason='Invalid UTF-8 bytes'))
+                                        break
+                                    reason = bytes(reason)
+                            self.closing = CloseControlMessage(code=code, reason=reason)
+
+                    elif frame.opcode == OPCODE_PING:
+                        self.pings.append(PingControlMessage(some_bytes))
+
+                    elif frame.opcode == OPCODE_PONG:
+                        self.pongs.append(PongControlMessage(some_bytes))
+
+                    else:
+                        self.errors.append(CloseControlMessage(code=1003))
+
+                    break
+
+                except ProtocolException:
+                    self.errors.append(CloseControlMessage(code=1002))
+                    break
+                except FrameTooLargeException:
+                    self.errors.append(CloseControlMessage(code=1002, reason="Frame was too large"))
+                    break
+
+            frame._cleanup()
+            frame.body = None
+            frame = None
+
+            if self.message is not None and self.message.completed:
+                utf8validator.reset()
+
+        utf8validator.reset()
+        utf8validator = None
+
+        self._cleanup()