You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by jk...@apache.org on 2019/02/08 22:16:26 UTC

[thrift] branch master updated: THRIFT-4780: finish the server implementation of multi in python server - Add default processor handling to python multi

This is an automated email from the ASF dual-hosted git repository.

jking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new 9804ab9  THRIFT-4780: finish the server implementation of multi in python server - Add default processor handling to python multi
9804ab9 is described below

commit 9804ab983a6c43dec56e0dcc4155cc2c21073822
Author: James E. King III <jk...@apache.org>
AuthorDate: Thu Feb 7 16:59:05 2019 -0500

    THRIFT-4780: finish the server implementation of multi in python server
    - Add default processor handling to python multi
---
 lib/py/src/TMultiplexedProcessor.py |  22 +++++--
 lib/py/src/protocol/__init__.py     |   2 +-
 test/known_failures_Linux.json      | 122 ++++++++++++++++++++++++++++++++++++
 test/py/TestClient.py               |  19 ++++--
 test/py/TestServer.py               |  92 +++++++++++++++++++++++----
 test/rs/src/bin/test_client.rs      |   2 +-
 test/tests.json                     |  42 +++++++------
 7 files changed, 260 insertions(+), 41 deletions(-)

diff --git a/lib/py/src/TMultiplexedProcessor.py b/lib/py/src/TMultiplexedProcessor.py
index 8d929ac..bd10d9b 100644
--- a/lib/py/src/TMultiplexedProcessor.py
+++ b/lib/py/src/TMultiplexedProcessor.py
@@ -24,8 +24,18 @@ from thrift.protocol.TProtocol import TProtocolException
 
 class TMultiplexedProcessor(TProcessor):
     def __init__(self):
+        self.defaultProcessor = None
         self.services = {}
 
+    def registerDefault(self, processor):
+        """
+        If a non-multiplexed processor connects to the server and wants to
+        communicate, use the given processor to handle it.  This mechanism
+        allows servers to upgrade from non-multiplexed to multiplexed in a
+        backwards-compatible way and still handle old clients.
+        """
+        self.defaultProcessor = processor
+
     def registerProcessor(self, serviceName, processor):
         self.services[serviceName] = processor
 
@@ -38,10 +48,14 @@ class TMultiplexedProcessor(TProcessor):
 
         index = name.find(TMultiplexedProtocol.SEPARATOR)
         if index < 0:
-            raise TProtocolException(
-                TProtocolException.NOT_IMPLEMENTED,
-                "Service name not found in message name: " + name + ".  " +
-                "Did you forget to use TMultiplexedProtocol in your client?")
+            if self.defaultProcessor:
+                return self.defaultProcessor.process(
+                    StoredMessageProtocol(iprot, (name, type, seqid)), oprot)
+            else:
+                raise TProtocolException(
+                    TProtocolException.NOT_IMPLEMENTED,
+                    "Service name not found in message name: " + name + ".  " +
+                    "Did you forget to use TMultiplexedProtocol in your client?")
 
         serviceName = name[0:index]
         call = name[index + len(TMultiplexedProtocol.SEPARATOR):]
diff --git a/lib/py/src/protocol/__init__.py b/lib/py/src/protocol/__init__.py
index 7148f66..06647a2 100644
--- a/lib/py/src/protocol/__init__.py
+++ b/lib/py/src/protocol/__init__.py
@@ -18,4 +18,4 @@
 #
 
 __all__ = ['fastbinary', 'TBase', 'TBinaryProtocol', 'TCompactProtocol',
-           'TJSONProtocol', 'TProtocol']
+           'TJSONProtocol', 'TProtocol', 'TProtocolDecorator']
diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json
index 1ab2af5..dd7fb6b 100644
--- a/test/known_failures_Linux.json
+++ b/test/known_failures_Linux.json
@@ -139,6 +139,8 @@
   "cpp-py_multic_http-ip-ssl",
   "cpp-py_multih-header_http-ip",
   "cpp-py_multih-header_http-ip-ssl",
+  "cpp-py_multih_http-ip",
+  "cpp-py_multih_http-ip-ssl",
   "cpp-py_multij-json_http-ip",
   "cpp-py_multij-json_http-ip-ssl",
   "cpp-py_multij_http-ip",
@@ -397,6 +399,38 @@
   "py-cpp_header_http-ip-ssl",
   "py-cpp_json_http-ip",
   "py-cpp_json_http-ip-ssl",
+  "py-cpp_multi-binary_http-ip",
+  "py-cpp_multi-binary_http-ip-ssl",
+  "py-cpp_multi_http-ip",
+  "py-cpp_multi_http-ip-ssl",
+  "py-cpp_multia-binary_http-ip",
+  "py-cpp_multia-binary_http-ip-ssl",
+  "py-cpp_multia-binary_zlib-ip",
+  "py-cpp_multia-binary_zlib-ip-ssl",
+  "py-cpp_multia-multi_http-ip",
+  "py-cpp_multia-multi_http-ip-ssl",
+  "py-cpp_multia-multi_zlib-ip",
+  "py-cpp_multia-multi_zlib-ip-ssl",
+  "py-cpp_multiac-compact_http-ip",
+  "py-cpp_multiac-compact_http-ip-ssl",
+  "py-cpp_multiac-compact_zlib-ip",
+  "py-cpp_multiac-compact_zlib-ip-ssl",
+  "py-cpp_multiac-multic_http-ip",
+  "py-cpp_multiac-multic_http-ip-ssl",
+  "py-cpp_multiac-multic_zlib-ip",
+  "py-cpp_multiac-multic_zlib-ip-ssl",
+  "py-cpp_multic-compact_http-ip",
+  "py-cpp_multic-compact_http-ip-ssl",
+  "py-cpp_multic_http-ip",
+  "py-cpp_multic_http-ip-ssl",
+  "py-cpp_multih-header_http-ip",
+  "py-cpp_multih-header_http-ip-ssl",
+  "py-cpp_multih_http-ip",
+  "py-cpp_multih_http-ip-ssl",
+  "py-cpp_multij-json_http-ip",
+  "py-cpp_multij-json_http-ip-ssl",
+  "py-cpp_multij_http-ip",
+  "py-cpp_multij_http-ip-ssl",
   "py-d_accel-binary_http-ip",
   "py-d_accel-binary_http-ip-ssl",
   "py-d_accelc-compact_http-ip",
@@ -428,11 +462,39 @@
   "py-java_compact_http-ip-ssl",
   "py-java_json_http-ip",
   "py-java_json_http-ip-ssl",
+  "py-java_multi-binary_http-ip",
+  "py-java_multi-binary_http-ip-ssl",
+  "py-java_multi_http-ip",
+  "py-java_multi_http-ip-ssl",
+  "py-java_multia-binary_http-ip",
+  "py-java_multia-binary_http-ip-ssl",
+  "py-java_multia-multi_http-ip",
+  "py-java_multia-multi_http-ip-ssl",
+  "py-java_multiac-compact_http-ip",
+  "py-java_multiac-compact_http-ip-ssl",
+  "py-java_multiac-multic_http-ip",
+  "py-java_multiac-multic_http-ip-ssl",
+  "py-java_multic-compact_http-ip",
+  "py-java_multic-compact_http-ip-ssl",
+  "py-java_multic_http-ip",
+  "py-java_multic_http-ip-ssl",
+  "py-java_multij-json_http-ip",
+  "py-java_multij-json_http-ip-ssl",
+  "py-java_multij_http-ip",
+  "py-java_multij_http-ip-ssl",
   "py-lua_accel-binary_http-ip",
   "py-lua_accelc-compact_http-ip",
   "py-lua_binary_http-ip",
   "py-lua_compact_http-ip",
   "py-lua_json_http-ip",
+  "py-rs_multi_buffered-ip",
+  "py-rs_multi_framed-ip",
+  "py-rs_multia-multi_buffered-ip",
+  "py-rs_multia-multi_framed-ip",
+  "py-rs_multiac-multic_buffered-ip",
+  "py-rs_multiac-multic_framed-ip",
+  "py-rs_multic_buffered-ip",
+  "py-rs_multic_framed-ip",
   "py3-cpp_accel-binary_http-ip",
   "py3-cpp_accel-binary_http-ip-ssl",
   "py3-cpp_accel-binary_zlib-ip",
@@ -449,6 +511,38 @@
   "py3-cpp_header_http-ip-ssl",
   "py3-cpp_json_http-ip",
   "py3-cpp_json_http-ip-ssl",
+  "py3-cpp_multi-binary_http-ip",
+  "py3-cpp_multi-binary_http-ip-ssl",
+  "py3-cpp_multi_http-ip",
+  "py3-cpp_multi_http-ip-ssl",
+  "py3-cpp_multia-binary_http-ip",
+  "py3-cpp_multia-binary_http-ip-ssl",
+  "py3-cpp_multia-binary_zlib-ip",
+  "py3-cpp_multia-binary_zlib-ip-ssl",
+  "py3-cpp_multia-multi_http-ip",
+  "py3-cpp_multia-multi_http-ip-ssl",
+  "py3-cpp_multia-multi_zlib-ip",
+  "py3-cpp_multia-multi_zlib-ip-ssl",
+  "py3-cpp_multiac-compact_http-ip",
+  "py3-cpp_multiac-compact_http-ip-ssl",
+  "py3-cpp_multiac-compact_zlib-ip",
+  "py3-cpp_multiac-compact_zlib-ip-ssl",
+  "py3-cpp_multiac-multic_http-ip",
+  "py3-cpp_multiac-multic_http-ip-ssl",
+  "py3-cpp_multiac-multic_zlib-ip",
+  "py3-cpp_multiac-multic_zlib-ip-ssl",
+  "py3-cpp_multic-compact_http-ip",
+  "py3-cpp_multic-compact_http-ip-ssl",
+  "py3-cpp_multic_http-ip",
+  "py3-cpp_multic_http-ip-ssl",
+  "py3-cpp_multih-header_http-ip",
+  "py3-cpp_multih-header_http-ip-ssl",
+  "py3-cpp_multih_http-ip",
+  "py3-cpp_multih_http-ip-ssl",
+  "py3-cpp_multij-json_http-ip",
+  "py3-cpp_multij-json_http-ip-ssl",
+  "py3-cpp_multij_http-ip",
+  "py3-cpp_multij_http-ip-ssl",
   "py3-d_accel-binary_http-ip",
   "py3-d_accel-binary_http-ip-ssl",
   "py3-d_accelc-compact_http-ip",
@@ -480,11 +574,39 @@
   "py3-java_compact_http-ip-ssl",
   "py3-java_json_http-ip",
   "py3-java_json_http-ip-ssl",
+  "py3-java_multi-binary_http-ip",
+  "py3-java_multi-binary_http-ip-ssl",
+  "py3-java_multi_http-ip",
+  "py3-java_multi_http-ip-ssl",
+  "py3-java_multia-binary_http-ip",
+  "py3-java_multia-binary_http-ip-ssl",
+  "py3-java_multia-multi_http-ip",
+  "py3-java_multia-multi_http-ip-ssl",
+  "py3-java_multiac-compact_http-ip",
+  "py3-java_multiac-compact_http-ip-ssl",
+  "py3-java_multiac-multic_http-ip",
+  "py3-java_multiac-multic_http-ip-ssl",
+  "py3-java_multic-compact_http-ip",
+  "py3-java_multic-compact_http-ip-ssl",
+  "py3-java_multic_http-ip",
+  "py3-java_multic_http-ip-ssl",
+  "py3-java_multij-json_http-ip",
+  "py3-java_multij-json_http-ip-ssl",
+  "py3-java_multij_http-ip",
+  "py3-java_multij_http-ip-ssl",
   "py3-lua_accel-binary_http-ip",
   "py3-lua_accelc-compact_http-ip",
   "py3-lua_binary_http-ip",
   "py3-lua_compact_http-ip",
   "py3-lua_json_http-ip",
+  "py3-rs_multi_buffered-ip",
+  "py3-rs_multi_framed-ip",
+  "py3-rs_multia-multi_buffered-ip",
+  "py3-rs_multia-multi_framed-ip",
+  "py3-rs_multiac-multic_buffered-ip",
+  "py3-rs_multiac-multic_framed-ip",
+  "py3-rs_multic_buffered-ip",
+  "py3-rs_multic_framed-ip",
   "rb-cpp_json_buffered-domain",
   "rb-cpp_json_buffered-ip",
   "rb-cpp_json_buffered-ip-ssl",
diff --git a/test/py/TestClient.py b/test/py/TestClient.py
index a85098e..e7a9a1a 100755
--- a/test/py/TestClient.py
+++ b/test/py/TestClient.py
@@ -27,8 +27,7 @@ import unittest
 from optparse import OptionParser
 from util import local_libpath
 sys.path.insert(0, local_libpath())
-from thrift.protocol import TProtocolDecorator
-from thrift.protocol import TProtocol
+from thrift.protocol import TProtocol, TProtocolDecorator
 
 SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
 
@@ -272,7 +271,7 @@ class AbstractTest(unittest.TestCase):
 
 
 # LAST_SEQID is a global because we have one transport and multiple protocols
-# running on it (when multiplexec)
+# running on it (when multiplexed)
 LAST_SEQID = None
 
 
@@ -398,6 +397,16 @@ class HeaderTest(MultiplexedOptionalTest):
         return make_pedantic(factory.getProtocol(transport))
 
 
+class MultiplexedHeaderTest(MultiplexedOptionalTest):
+    def get_protocol(self, transport):
+        wrapped_proto = make_pedantic(THeaderProtocol.THeaderProtocolFactory().getProtocol(transport))
+        return TMultiplexedProtocol.TMultiplexedProtocol(wrapped_proto, "ThriftTest")
+
+    def get_protocol2(self, transport):
+        wrapped_proto = make_pedantic(THeaderProtocol.THeaderProtocolFactory().getProtocol(transport))
+        return TMultiplexedProtocol.TMultiplexedProtocol(wrapped_proto, "SecondService")
+
+
 def suite():
     suite = unittest.TestSuite()
     loader = unittest.TestLoader()
@@ -421,6 +430,8 @@ def suite():
         suite.addTest(loader.loadTestsFromTestCase(MultiplexedAcceleratedCompactTest))
     elif options.proto == 'multic':
         suite.addTest(loader.loadTestsFromTestCase(MultiplexedCompactTest))
+    elif options.proto == 'multih':
+        suite.addTest(loader.loadTestsFromTestCase(MultiplexedHeaderTest))
     elif options.proto == 'multij':
         suite.addTest(loader.loadTestsFromTestCase(MultiplexedJSONTest))
     else:
@@ -460,7 +471,7 @@ if __name__ == "__main__":
                       dest="verbose", const=0,
                       help="minimal output")
     parser.add_option('--protocol', dest="proto", type="string",
-                      help="protocol to use, one of: accel, accelc, binary, compact, header, json, multi, multia, multiac, multic, multij")
+                      help="protocol to use, one of: accel, accelc, binary, compact, header, json, multi, multia, multiac, multic, multih, multij")
     parser.add_option('--transport', dest="trans", type="string",
                       help="transport to use, one of: buffered, framed, http")
     parser.set_defaults(framed=False, http_path=None, verbose=1, host='localhost', port=9090, proto='binary')
diff --git a/test/py/TestServer.py b/test/py/TestServer.py
index aba0d42..d0a13e5 100755
--- a/test/py/TestServer.py
+++ b/test/py/TestServer.py
@@ -27,6 +27,8 @@ import time
 from optparse import OptionParser
 
 from util import local_libpath
+sys.path.insert(0, local_libpath())
+from thrift.protocol import TProtocol, TProtocolDecorator
 
 SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
 
@@ -178,21 +180,79 @@ class TestHandler(object):
                       byte_thing=arg0, i32_thing=arg1, i64_thing=arg2)
 
 
+class SecondHandler(object):
+    def secondtestString(self, argument):
+        return "testString(\"" + argument + "\")"
+
+
+# LAST_SEQID is a global because we have one transport and multiple protocols
+# running on it (when multiplexed)
+LAST_SEQID = None
+
+
+class TPedanticSequenceIdProtocolWrapper(TProtocolDecorator.TProtocolDecorator):
+    """
+    Wraps any protocol with sequence ID checking: looks for outbound
+    uniqueness as well as request/response alignment.
+    """
+    def __init__(self, protocol):
+        # TProtocolDecorator.__new__ does all the heavy lifting
+        pass
+
+    def readMessageBegin(self):
+        global LAST_SEQID
+        (name, type, seqid) =\
+            super(TPedanticSequenceIdProtocolWrapper, self).readMessageBegin()
+        if LAST_SEQID is not None and LAST_SEQID == seqid:
+            raise TProtocol.TProtocolException(
+                TProtocol.TProtocolException.INVALID_DATA,
+                "We received the same seqid {0} twice in a row".format(seqid))
+        LAST_SEQID = seqid
+        return (name, type, seqid)
+
+
+def make_pedantic(proto):
+    """ Wrap a protocol in the pedantic sequence ID wrapper. """
+    # NOTE: this is disabled for now as many clients send seqid
+    #       of zero and that is okay, need a way to identify
+    #       clients that MUST send seqid unique to function right
+    #       or just force all implementations to send unique seqids (preferred)
+    return proto  # TPedanticSequenceIdProtocolWrapper(proto)
+
+
+class TPedanticSequenceIdProtocolFactory(TProtocol.TProtocolFactory):
+    def __init__(self, encapsulated):
+        super(TPedanticSequenceIdProtocolFactory, self).__init__()
+        self.encapsulated = encapsulated
+
+    def getProtocol(self, trans):
+        return make_pedantic(self.encapsulated.getProtocol(trans))
+
+
 def main(options):
+    # common header allowed client types
+    allowed_client_types = [
+        THeaderTransport.THeaderClientType.HEADERS,
+        THeaderTransport.THeaderClientType.FRAMED_BINARY,
+        THeaderTransport.THeaderClientType.UNFRAMED_BINARY,
+        THeaderTransport.THeaderClientType.FRAMED_COMPACT,
+        THeaderTransport.THeaderClientType.UNFRAMED_COMPACT,
+    ]
+
     # set up the protocol factory form the --protocol option
     prot_factories = {
         'accel': TBinaryProtocol.TBinaryProtocolAcceleratedFactory(),
+        'multia': TBinaryProtocol.TBinaryProtocolAcceleratedFactory(),
         'accelc': TCompactProtocol.TCompactProtocolAcceleratedFactory(),
-        'binary': TBinaryProtocol.TBinaryProtocolFactory(),
+        'multiac': TCompactProtocol.TCompactProtocolAcceleratedFactory(),
+        'binary': TPedanticSequenceIdProtocolFactory(TBinaryProtocol.TBinaryProtocolFactory()),
+        'multi': TPedanticSequenceIdProtocolFactory(TBinaryProtocol.TBinaryProtocolFactory()),
         'compact': TCompactProtocol.TCompactProtocolFactory(),
-        'header': THeaderProtocol.THeaderProtocolFactory(allowed_client_types=[
-            THeaderTransport.THeaderClientType.HEADERS,
-            THeaderTransport.THeaderClientType.FRAMED_BINARY,
-            THeaderTransport.THeaderClientType.UNFRAMED_BINARY,
-            THeaderTransport.THeaderClientType.FRAMED_COMPACT,
-            THeaderTransport.THeaderClientType.UNFRAMED_COMPACT,
-        ]),
+        'multic': TCompactProtocol.TCompactProtocolFactory(),
+        'header': THeaderProtocol.THeaderProtocolFactory(allowed_client_types),
+        'multih': THeaderProtocol.THeaderProtocolFactory(allowed_client_types),
         'json': TJSONProtocol.TJSONProtocolFactory(),
+        'multij': TJSONProtocol.TJSONProtocolFactory(),
     }
     pfactory = prot_factories.get(options.proto, None)
     if pfactory is None:
@@ -215,6 +275,16 @@ def main(options):
     handler = TestHandler()
     processor = ThriftTest.Processor(handler)
 
+    if options.proto.startswith('multi'):
+        secondHandler = SecondHandler()
+        secondProcessor = SecondService.Processor(secondHandler)
+
+        multiplexedProcessor = TMultiplexedProcessor()
+        multiplexedProcessor.registerDefault(processor)
+        multiplexedProcessor.registerProcessor('ThriftTest', processor)
+        multiplexedProcessor.registerProcessor('SecondService', secondProcessor)
+        processor = multiplexedProcessor
+
     global server
 
     # Handle THttpServer as a special case
@@ -312,7 +382,7 @@ if __name__ == '__main__':
                       dest="verbose", const=0,
                       help="minimal output")
     parser.add_option('--protocol', dest="proto", type="string",
-                      help="protocol to use, one of: accel, accelc, binary, compact, json")
+                      help="protocol to use, one of: accel, accelc, binary, compact, json, multi, multia, multiac, multic, multih, multij")
     parser.add_option('--transport', dest="trans", type="string",
                       help="transport to use, one of: buffered, framed, http")
     parser.add_option('--container-limit', dest='container_limit', type='int', default=None)
@@ -324,11 +394,11 @@ if __name__ == '__main__':
     logging.basicConfig(level=options.verbose)
 
     sys.path.insert(0, os.path.join(SCRIPT_DIR, options.genpydir))
-    sys.path.insert(0, local_libpath())
 
-    from ThriftTest import ThriftTest
+    from ThriftTest import ThriftTest, SecondService
     from ThriftTest.ttypes import Xtruct, Xception, Xception2, Insanity
     from thrift.Thrift import TException
+    from thrift.TMultiplexedProcessor import TMultiplexedProcessor
     from thrift.transport import THeaderTransport
     from thrift.transport import TTransport
     from thrift.transport import TSocket
diff --git a/test/rs/src/bin/test_client.rs b/test/rs/src/bin/test_client.rs
index 29b5b88..8016ca6 100644
--- a/test/rs/src/bin/test_client.rs
+++ b/test/rs/src/bin/test_client.rs
@@ -65,7 +65,7 @@ fn run() -> thrift::Result<()> {
         (@arg host: --host +takes_value "Host on which the Thrift test server is located")
         (@arg port: --port +takes_value "Port on which the Thrift test server is listening")
         (@arg transport: --transport +takes_value "Thrift transport implementation to use (\"buffered\", \"framed\")")
-        (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
+        (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\", \"multi\", \"multic\")")
         (@arg testloops: -n --testloops +takes_value "Number of times to run tests")
     )
         .get_matches();
diff --git a/test/tests.json b/test/tests.json
index 043b826..02ae28a 100644
--- a/test/tests.json
+++ b/test/tests.json
@@ -251,13 +251,6 @@
         "--verbose",
         "--host=localhost",
         "--genpydir=gen-py"
-      ],
-      "protocols": [
-        "multi",
-        "multi:multia",
-        "multic",
-        "multic:multiac",
-        "multij"
       ]
     },
     "transports": [
@@ -271,12 +264,20 @@
       "ip-ssl"
     ],
     "protocols": [
-      "compact",
       "binary",
-      "json",
       "binary:accel",
+      "compact",
       "compact:accelc",
-      "header"
+      "header",
+      "json",
+      "multi",
+      "multi:multia",
+      "multia",
+      "multiac",
+      "multic",
+      "multic:multiac",
+      "multih",
+      "multij"
     ],
     "workdir": "py"
   },
@@ -299,13 +300,6 @@
         "TestClient.py",
         "--host=localhost",
         "--genpydir=gen-py"
-      ],
-      "protocols": [
-        "multi",
-        "multi:multia",
-        "multic",
-        "multic:multiac",
-        "multij"
       ]
     },
     "transports": [
@@ -319,12 +313,20 @@
       "ip-ssl"
     ],
     "protocols": [
-      "compact",
       "binary",
-      "json",
       "binary:accel",
+      "compact",
       "compact:accelc",
-      "header"
+      "header",
+      "json",
+      "multi",
+      "multi:multia",
+      "multia",
+      "multiac",
+      "multic",
+      "multic:multiac",
+      "multih",
+      "multij"
     ],
     "workdir": "py"
   },