You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by kl...@apache.org on 2018/10/16 12:42:21 UTC

[mesos] branch master updated: Added Record-IO encoder and decoder to Python library.

This is an automated email from the ASF dual-hosted git repository.

klueska pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new e553216  Added Record-IO encoder and decoder to Python library.
e553216 is described below

commit e5532167c91ebabaa3add336a787f8192af59ec3
Author: Armand Grillet <ag...@mesosphere.io>
AuthorDate: Tue Oct 16 08:26:51 2018 -0400

    Added Record-IO encoder and decoder to Python library.
    
    This code was pulled directly from:
    https://github.com/dcos/dcos-core-cli/blob/
        7fd55421939a7782c237e2b8719c0fe2f543acd7/
            python/lib/dcos/dcos/recordio.py
    https://github.com/dcos/dcos-core-cli/blob/
        7fd55421939a7782c237e2b8719c0fe2f543acd7/
            python/lib/dcos/tests/test_recordio.py
    
    It will be used by the new CLI for commands such as `task exec`.
    
    Review: https://reviews.apache.org/r/68977/
---
 src/python/lib/mesos/__init__.py      |   3 +-
 src/python/lib/mesos/recordio.py      | 175 ++++++++++++++++++++++++++++++++++
 src/python/lib/tests/test_recordio.py | 100 +++++++++++++++++++
 3 files changed, 277 insertions(+), 1 deletion(-)

diff --git a/src/python/lib/mesos/__init__.py b/src/python/lib/mesos/__init__.py
index bbe82e2..968a1f2 100644
--- a/src/python/lib/mesos/__init__.py
+++ b/src/python/lib/mesos/__init__.py
@@ -20,6 +20,7 @@ Client library for the Mesos HTTP ReST API
 
 __version__ = '0.0.0.dev'
 
-__all__ = ['exceptions', 'http']
+__all__ = ['exceptions', 'http', 'recordio']
 from . import exceptions
 from . import http
+from . import recordio
diff --git a/src/python/lib/mesos/recordio.py b/src/python/lib/mesos/recordio.py
new file mode 100644
index 0000000..7cb62b3
--- /dev/null
+++ b/src/python/lib/mesos/recordio.py
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Provides facilities for "Record-IO" encoding of data.
+"Record-IO" encoding allows one to encode a sequence
+of variable-length records by prefixing each record
+with its size in bytes:
+
+5\n
+hello
+6\n
+world!
+
+Note that this currently only supports record lengths
+encoded as base 10 integer values with newlines as a
+delimiter. This is to provide better language
+portability: parsing a base 10 integer is simple. Most
+other "Record-IO" implementations use a fixed-size
+header of 4 bytes to directly encode an unsigned 32 bit
+length.
+"""
+
+from mesos.exceptions import MesosException
+
+
+class Encoder():
+    """
+    Encode an arbitray message type into a 'RecordIO' message.
+
+    This class encapsulates the process of encoding an
+    arbitrary message into a 'RecordIO' message. Its
+    constructor takes a serialization function of the form
+    'serialize(message)'. This serialization function is
+    responsible for knowing how to take whatever message type
+    is passed to 'encode()' and serializing it to a 'UTF-8'
+    encoded byte array.
+
+    Once 'encode(message)' is called, it will use the
+    serialization function to convert 'message' into a 'UTF-8'
+    encoded byte array, wrap it in a 'RecordIO' frame,
+    and return it.
+
+    :param serialize: a function to serialize any message
+                        passed to 'encode()' into a 'UTF-8'
+                        encoded byte array
+    :type serialize: function
+    """
+
+    def __init__(self, serialize):
+        self.serialize = serialize
+
+    def encode(self, message):
+        """
+        Encode a message into 'RecordIO' format.
+
+        :param message: a message to serialize and then wrap in
+                        a 'RecordIO' frame.
+        :type message: object
+        :returns: a serialized message wrapped in a 'RecordIO' frame
+        :rtype: bytes
+        """
+
+        s = self.serialize(message)
+
+        if not isinstance(s, bytes):
+            raise MesosException("Calling 'serialize(message)' must"
+                                 " return a 'bytes' object")
+
+        return bytes(str(len(s)) + "\n", "UTF-8") + s
+
+
+class Decoder():
+    """
+    Decode a 'RecordIO' message back to an arbitrary message type.
+
+    This class encapsulates the process of decoding a message
+    previously encoded with 'RecordIO' back to an arbitrary
+    message type. Its constructor takes a deserialization
+    function of the form 'deserialize(data)'. This
+    deserialization function is responsible for knowing how to
+    take a fully constructed 'RecordIO' message containing a
+    'UTF-8' encoded byte array and deserialize it back into the
+    original message type.
+
+    The 'decode(data)' message takes a 'UTF-8' encoded byte array
+    as input and buffers it across subsequent calls to
+    construct a set of fully constructed 'RecordIO' messages that
+    are decoded and returned in a list.
+
+    :param deserialize: a function to deserialize from 'RecordIO'
+                        messages built up by subsequent calls
+                        to 'decode(data)'
+    :type deserialize: function
+    """
+
+    HEADER = 0
+    RECORD = 1
+    FAILED = 2
+
+    def __init__(self, deserialize):
+        self.deserialize = deserialize
+        self.state = self.HEADER
+        self.buffer = bytes("", "UTF-8")
+        self.length = 0
+
+    def decode(self, data):
+        """
+        Decode a 'RecordIO' formatted message to its original type.
+
+        :param data: an array of 'UTF-8' encoded bytes that make up a
+                      partial 'RecordIO' message. Subsequent calls to this
+                      function maintain state to build up a full 'RecordIO'
+                      message and decode it
+        :type data: bytes
+        :returns: a list of deserialized messages
+        :rtype: list
+        """
+
+        if not isinstance(data, bytes):
+            raise MesosException("Parameter 'data' must of of type 'bytes'")
+
+        if self.state == self.FAILED:
+            raise MesosException("Decoder is in a FAILED state")
+
+        records = []
+
+        for c in data:
+            if self.state == self.HEADER:
+                if c != ord('\n'):
+                    self.buffer += bytes([c])
+                    continue
+
+                try:
+                    self.length = int(self.buffer.decode("UTF-8"))
+                except Exception as exception:
+                    self.state = self.FAILED
+                    raise MesosException("Failed to decode length"
+                                         "'{buffer}': {error}"
+                                         .format(buffer=self.buffer,
+                                                 error=exception))
+
+                self.buffer = bytes("", "UTF-8")
+                self.state = self.RECORD
+
+                # Note that for 0 length records, we immediately decode.
+                if self.length <= 0:
+                    records.append(self.deserialize(self.buffer))
+                    self.state = self.HEADER
+
+            elif self.state == self.RECORD:
+                assert self.length
+                assert len(self.buffer) < self.length
+
+                self.buffer += bytes([c])
+
+                if len(self.buffer) == self.length:
+                    records.append(self.deserialize(self.buffer))
+                    self.buffer = bytes("", "UTF-8")
+                    self.state = self.HEADER
+
+        return records
diff --git a/src/python/lib/tests/test_recordio.py b/src/python/lib/tests/test_recordio.py
new file mode 100644
index 0000000..06b33ac
--- /dev/null
+++ b/src/python/lib/tests/test_recordio.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Tests that test the RecordIO encoder and decoder.
+"""
+
+import json
+
+from mesos import recordio
+from mesos.exceptions import MesosException
+
+
+def test_encode():
+    """
+    Test encoding of a message into 'RecordIO' format.
+    """
+    try:
+        encoder = recordio.Encoder(lambda s: bytes(json.dumps(s), "UTF-8"))
+    except Exception as exception:
+        raise MesosException("Error instantiating 'RecordIO' encoder: {error}"
+                             .format(error=exception))
+
+    try:
+        message = {
+            "type": "ATTACH_CONTAINER_OUTPUT",
+            "containerId": "123456789"
+        }
+
+        encoded = encoder.encode(message)
+
+    except Exception as exception:
+        raise MesosException("Error encoding 'RecordIO' message: {error}"
+                             .format(error=exception))
+
+    string = json.dumps(message)
+    assert encoded == bytes(str(len(string)) + "\n" + string, "UTF-8")
+
+
+def test_encode_decode():
+    """
+    Test encoding/decoding of a message and records into 'RecordIO' format.
+    """
+    total_messages = 10
+
+    try:
+        encoder = recordio.Encoder(lambda s: bytes(json.dumps(s), "UTF-8"))
+    except Exception as exception:
+        raise MesosException("Error instantiating 'RecordIO' encoder: {error}"
+                             .format(error=exception))
+
+    try:
+        decoder = recordio.Decoder(lambda s: json.loads(s.decode("UTF-8")))
+    except Exception as exception:
+        raise MesosException("Error instantiating 'RecordIO' decoder: {error}"
+                             .format(error=exception))
+
+    try:
+        message = {
+            "type": "ATTACH_CONTAINER_OUTPUT",
+            "containerId": "123456789"
+        }
+
+        encoded = b""
+        for _ in range(total_messages):
+            encoded += encoder.encode(message)
+
+    except Exception as exception:
+        raise MesosException("Error encoding 'RecordIO' message: {error}"
+                             .format(error=exception))
+
+    try:
+        all_records = []
+        offset = 0
+        chunk_size = 5
+        while offset < len(encoded):
+            records = decoder.decode(encoded[offset:offset + chunk_size])
+            all_records.extend(records)
+            offset += chunk_size
+
+        assert len(all_records) == total_messages
+
+        for record in all_records:
+            assert record == message
+    except Exception as exception:
+        raise MesosException("Error decoding 'RecordIO' messages: {error}"
+                             .format(error=exception))