You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by kl...@apache.org on 2018/10/16 12:42:21 UTC
[mesos] branch master updated: Added Record-IO encoder and decoder
to Python library.
This is an automated email from the ASF dual-hosted git repository.
klueska pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new e553216 Added Record-IO encoder and decoder to Python library.
e553216 is described below
commit e5532167c91ebabaa3add336a787f8192af59ec3
Author: Armand Grillet <ag...@mesosphere.io>
AuthorDate: Tue Oct 16 08:26:51 2018 -0400
Added Record-IO encoder and decoder to Python library.
This code was pulled directly from:
https://github.com/dcos/dcos-core-cli/blob/
7fd55421939a7782c237e2b8719c0fe2f543acd7/
python/lib/dcos/dcos/recordio.py
https://github.com/dcos/dcos-core-cli/blob/
7fd55421939a7782c237e2b8719c0fe2f543acd7/
python/lib/dcos/tests/test_recordio.py
It will be used by the new CLI for commands such as `task exec`.
Review: https://reviews.apache.org/r/68977/
---
src/python/lib/mesos/__init__.py | 3 +-
src/python/lib/mesos/recordio.py | 175 ++++++++++++++++++++++++++++++++++
src/python/lib/tests/test_recordio.py | 100 +++++++++++++++++++
3 files changed, 277 insertions(+), 1 deletion(-)
diff --git a/src/python/lib/mesos/__init__.py b/src/python/lib/mesos/__init__.py
index bbe82e2..968a1f2 100644
--- a/src/python/lib/mesos/__init__.py
+++ b/src/python/lib/mesos/__init__.py
@@ -20,6 +20,7 @@ Client library for the Mesos HTTP ReST API
__version__ = '0.0.0.dev'
-__all__ = ['exceptions', 'http']
+__all__ = ['exceptions', 'http', 'recordio']
from . import exceptions
from . import http
+from . import recordio
diff --git a/src/python/lib/mesos/recordio.py b/src/python/lib/mesos/recordio.py
new file mode 100644
index 0000000..7cb62b3
--- /dev/null
+++ b/src/python/lib/mesos/recordio.py
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Provides facilities for "Record-IO" encoding of data.
+"Record-IO" encoding allows one to encode a sequence
+of variable-length records by prefixing each record
+with its size in bytes:
+
+5\n
+hello
+6\n
+world!
+
+Note that this currently only supports record lengths
+encoded as base 10 integer values with newlines as a
+delimiter. This is to provide better language
+portability: parsing a base 10 integer is simple. Most
+other "Record-IO" implementations use a fixed-size
+header of 4 bytes to directly encode an unsigned 32 bit
+length.
+"""
+
+from mesos.exceptions import MesosException
+
+
+class Encoder():
+ """
+ Encode an arbitray message type into a 'RecordIO' message.
+
+ This class encapsulates the process of encoding an
+ arbitrary message into a 'RecordIO' message. Its
+ constructor takes a serialization function of the form
+ 'serialize(message)'. This serialization function is
+ responsible for knowing how to take whatever message type
+ is passed to 'encode()' and serializing it to a 'UTF-8'
+ encoded byte array.
+
+ Once 'encode(message)' is called, it will use the
+ serialization function to convert 'message' into a 'UTF-8'
+ encoded byte array, wrap it in a 'RecordIO' frame,
+ and return it.
+
+ :param serialize: a function to serialize any message
+ passed to 'encode()' into a 'UTF-8'
+ encoded byte array
+ :type serialize: function
+ """
+
+ def __init__(self, serialize):
+ self.serialize = serialize
+
+ def encode(self, message):
+ """
+ Encode a message into 'RecordIO' format.
+
+ :param message: a message to serialize and then wrap in
+ a 'RecordIO' frame.
+ :type message: object
+ :returns: a serialized message wrapped in a 'RecordIO' frame
+ :rtype: bytes
+ """
+
+ s = self.serialize(message)
+
+ if not isinstance(s, bytes):
+ raise MesosException("Calling 'serialize(message)' must"
+ " return a 'bytes' object")
+
+ return bytes(str(len(s)) + "\n", "UTF-8") + s
+
+
+class Decoder():
+ """
+ Decode a 'RecordIO' message back to an arbitrary message type.
+
+ This class encapsulates the process of decoding a message
+ previously encoded with 'RecordIO' back to an arbitrary
+ message type. Its constructor takes a deserialization
+ function of the form 'deserialize(data)'. This
+ deserialization function is responsible for knowing how to
+ take a fully constructed 'RecordIO' message containing a
+ 'UTF-8' encoded byte array and deserialize it back into the
+ original message type.
+
+ The 'decode(data)' message takes a 'UTF-8' encoded byte array
+ as input and buffers it across subsequent calls to
+ construct a set of fully constructed 'RecordIO' messages that
+ are decoded and returned in a list.
+
+ :param deserialize: a function to deserialize from 'RecordIO'
+ messages built up by subsequent calls
+ to 'decode(data)'
+ :type deserialize: function
+ """
+
+ HEADER = 0
+ RECORD = 1
+ FAILED = 2
+
+ def __init__(self, deserialize):
+ self.deserialize = deserialize
+ self.state = self.HEADER
+ self.buffer = bytes("", "UTF-8")
+ self.length = 0
+
+ def decode(self, data):
+ """
+ Decode a 'RecordIO' formatted message to its original type.
+
+ :param data: an array of 'UTF-8' encoded bytes that make up a
+ partial 'RecordIO' message. Subsequent calls to this
+ function maintain state to build up a full 'RecordIO'
+ message and decode it
+ :type data: bytes
+ :returns: a list of deserialized messages
+ :rtype: list
+ """
+
+ if not isinstance(data, bytes):
+ raise MesosException("Parameter 'data' must of of type 'bytes'")
+
+ if self.state == self.FAILED:
+ raise MesosException("Decoder is in a FAILED state")
+
+ records = []
+
+ for c in data:
+ if self.state == self.HEADER:
+ if c != ord('\n'):
+ self.buffer += bytes([c])
+ continue
+
+ try:
+ self.length = int(self.buffer.decode("UTF-8"))
+ except Exception as exception:
+ self.state = self.FAILED
+ raise MesosException("Failed to decode length"
+ "'{buffer}': {error}"
+ .format(buffer=self.buffer,
+ error=exception))
+
+ self.buffer = bytes("", "UTF-8")
+ self.state = self.RECORD
+
+ # Note that for 0 length records, we immediately decode.
+ if self.length <= 0:
+ records.append(self.deserialize(self.buffer))
+ self.state = self.HEADER
+
+ elif self.state == self.RECORD:
+ assert self.length
+ assert len(self.buffer) < self.length
+
+ self.buffer += bytes([c])
+
+ if len(self.buffer) == self.length:
+ records.append(self.deserialize(self.buffer))
+ self.buffer = bytes("", "UTF-8")
+ self.state = self.HEADER
+
+ return records
diff --git a/src/python/lib/tests/test_recordio.py b/src/python/lib/tests/test_recordio.py
new file mode 100644
index 0000000..06b33ac
--- /dev/null
+++ b/src/python/lib/tests/test_recordio.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Tests that test the RecordIO encoder and decoder.
+"""
+
+import json
+
+from mesos import recordio
+from mesos.exceptions import MesosException
+
+
+def test_encode():
+ """
+ Test encoding of a message into 'RecordIO' format.
+ """
+ try:
+ encoder = recordio.Encoder(lambda s: bytes(json.dumps(s), "UTF-8"))
+ except Exception as exception:
+ raise MesosException("Error instantiating 'RecordIO' encoder: {error}"
+ .format(error=exception))
+
+ try:
+ message = {
+ "type": "ATTACH_CONTAINER_OUTPUT",
+ "containerId": "123456789"
+ }
+
+ encoded = encoder.encode(message)
+
+ except Exception as exception:
+ raise MesosException("Error encoding 'RecordIO' message: {error}"
+ .format(error=exception))
+
+ string = json.dumps(message)
+ assert encoded == bytes(str(len(string)) + "\n" + string, "UTF-8")
+
+
+def test_encode_decode():
+ """
+ Test encoding/decoding of a message and records into 'RecordIO' format.
+ """
+ total_messages = 10
+
+ try:
+ encoder = recordio.Encoder(lambda s: bytes(json.dumps(s), "UTF-8"))
+ except Exception as exception:
+ raise MesosException("Error instantiating 'RecordIO' encoder: {error}"
+ .format(error=exception))
+
+ try:
+ decoder = recordio.Decoder(lambda s: json.loads(s.decode("UTF-8")))
+ except Exception as exception:
+ raise MesosException("Error instantiating 'RecordIO' decoder: {error}"
+ .format(error=exception))
+
+ try:
+ message = {
+ "type": "ATTACH_CONTAINER_OUTPUT",
+ "containerId": "123456789"
+ }
+
+ encoded = b""
+ for _ in range(total_messages):
+ encoded += encoder.encode(message)
+
+ except Exception as exception:
+ raise MesosException("Error encoding 'RecordIO' message: {error}"
+ .format(error=exception))
+
+ try:
+ all_records = []
+ offset = 0
+ chunk_size = 5
+ while offset < len(encoded):
+ records = decoder.decode(encoded[offset:offset + chunk_size])
+ all_records.extend(records)
+ offset += chunk_size
+
+ assert len(all_records) == total_messages
+
+ for record in all_records:
+ assert record == message
+ except Exception as exception:
+ raise MesosException("Error decoding 'RecordIO' messages: {error}"
+ .format(error=exception))