You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2014/11/11 01:41:37 UTC

[3/3] thrift git commit: THRIFT-2815 Support for Multiplexing Services on any Transport, Protocol and Server Client: Haxe Patch: Jens Geyer

THRIFT-2815 Support for Multiplexing Services on any Transport, Protocol and Server
Client: Haxe
Patch: Jens Geyer

This closes #262


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/e5ff9a86
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/e5ff9a86
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/e5ff9a86

Branch: refs/heads/master
Commit: e5ff9a860d69fbd6cc6d6cb79833930e1148e4c0
Parents: 96dfcd5
Author: Jens Geyer <je...@apache.org>
Authored: Tue Nov 11 01:39:38 2014 +0100
Committer: Jens Geyer <je...@apache.org>
Committed: Tue Nov 11 01:39:38 2014 +0100

----------------------------------------------------------------------
 .../thrift/protocol/TMultiplexedProcessor.hx    | 174 ++++++++++++++
 .../thrift/protocol/TMultiplexedProtocol.hx     |  97 ++++++++
 .../thrift/protocol/TProtocolDecorator.hx       | 218 ++++++++++++++++++
 lib/haxe/test/HaxeTests.hxproj                  |   8 +-
 lib/haxe/test/Makefile.am                       |  12 +-
 lib/haxe/test/make_all.bat                      |   4 +-
 lib/haxe/test/make_all.sh                       |   2 +
 lib/haxe/test/src/Main.hx                       |  50 ++++-
 lib/haxe/test/src/MultiplexTest.hx              | 224 +++++++++++++++++++
 lib/haxe/test/src/StreamTest.hx                 |  32 +--
 lib/haxe/test/src/TestBase.hx                   |  10 +-
 11 files changed, 808 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProcessor.hx
----------------------------------------------------------------------
diff --git a/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProcessor.hx b/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProcessor.hx
new file mode 100644
index 0000000..7354ff4
--- /dev/null
+++ b/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProcessor.hx
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.protocol;
+
+import haxe.ds.StringMap;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TProcessor;
+
+import org.apache.thrift.transport.TTransport;
+
+
+/**
+ * TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
+ * To do so, you instantiate the processor and then register additional processors with it,
+ * as shown in the following example:
+ *
+ *     TMultiplexedProcessor processor = new TMultiplexedProcessor();
+ *
+ *     processor.registerProcessor(
+ *         "Calculator",
+ *         new Calculator.Processor(new CalculatorHandler()));
+ *
+ *     processor.registerProcessor(
+ *         "WeatherReport",
+ *         new WeatherReport.Processor(new WeatherReportHandler()));
+ *
+ *     TServerTransport t = new TServerSocket(9090);
+ *     TSimpleServer server = new TSimpleServer(processor, t);
+ *
+ *     server.serve();
+ */
+class TMultiplexedProcessor implements TProcessor
+{
+    private var serviceProcessorMap : StringMap<TProcessor> = new StringMap<TProcessor>();
+    private var defaultProcessor : TProcessor = null;
+
+    /**
+     * 'Register' a service with this TMultiplexedProcessor. This allows us to broker
+     * requests to individual services by using the service name to select them at request time.
+     *
+     * Args:
+     * - serviceName    Name of a service, has to be identical to the name
+     *                  declared in the Thrift IDL, e.g. "WeatherReport".
+     * - processor      Implementation of a service, ususally referred to as "handlers",
+     *                  e.g. WeatherReportHandler implementing WeatherReport.Iface.
+     */
+    public function RegisterProcessor(serviceName : String, processor : TProcessor, asDefault : Bool = false) : Void {
+        serviceProcessorMap.set(serviceName, processor);
+        if ( asDefault) {
+            if( defaultProcessor != null) {
+                throw new TApplicationException( TApplicationException.UNKNOWN, "Can't have multiple default processors");
+            } else {
+                defaultProcessor = processor;
+            }
+        }
+    }
+
+
+    private function Fail( oprot : TProtocol, message : TMessage, extype : Int, etxt : String) : Void {
+        var appex = new TApplicationException( extype, etxt);
+
+        var newMessage = new TMessage(message.name, TMessageType.EXCEPTION, message.seqid);
+
+        oprot.writeMessageBegin(newMessage);
+        appex.write( oprot);
+        oprot.writeMessageEnd();
+        oprot.getTransport().flush();
+    }
+
+
+    /**
+     * This implementation of process performs the following steps:
+     *
+     * - Read the beginning of the message.
+     * - Extract the service name from the message.
+     * - Using the service name to locate the appropriate processor.
+     * - Dispatch to the processor, with a decorated instance of TProtocol
+     *    that allows readMessageBegin() to return the original TMessage.
+     *
+     * Throws an exception if
+     * - the message type is not CALL or ONEWAY,
+     * - the service name was not found in the message, or
+     * - the service name has not been RegisterProcessor()ed.
+     */
+    public function process( iprot : TProtocol, oprot : TProtocol) : Bool {
+        /*  Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
+            message header.  This pulls the message "off the wire", which we'll
+            deal with at the end of this method. */
+
+        var message : TMessage = iprot.readMessageBegin();
+        var methodName : String = "";
+
+        if ((message.type != TMessageType.CALL) && (message.type != TMessageType.ONEWAY))
+        {
+            Fail(oprot, message,
+                  TApplicationException.INVALID_MESSAGE_TYPE,
+                  "Message type CALL or ONEWAY expected");
+            return false;
+        }
+
+        // Extract the service name
+        var actualProcessor : TProcessor = null;
+        var index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR);
+        if (index < 0) {
+            // fallback to default processor
+            methodName = message.name;
+            actualProcessor = defaultProcessor;
+            if( actualProcessor == null) {
+                Fail(oprot, message,
+                      TApplicationException.INVALID_PROTOCOL,
+                      "Service name not found in message name: " + message.name + " and no default processor defined. " +
+                      "Did you forget to use a TMultiplexProtocol in your client?");
+                return false;
+            }
+
+        } else {
+            // service name given
+            var serviceName = message.name.substring(0, index);
+            methodName = message.name.substring( serviceName.length + TMultiplexedProtocol.SEPARATOR.length);
+            actualProcessor = serviceProcessorMap.get( serviceName);
+            if( actualProcessor == null) {
+                Fail(oprot, message,
+                      TApplicationException.INTERNAL_ERROR,
+                      "Service name not found: " + serviceName + ". " +
+                      "Did you forget to call RegisterProcessor()?");
+                return false;
+            }
+        }
+
+        // Create a new TMessage, removing the service name
+        // Dispatch processing to the stored processor
+        var newMessage = new TMessage( methodName, message.type, message.seqid);
+        var storedMsg = new StoredMessageProtocol( iprot, newMessage);
+        return actualProcessor.process( storedMsg, oprot);
+    }
+}
+
+
+/**
+ *  Our goal was to work with any protocol.  In order to do that, we needed
+ *  to allow them to call readMessageBegin() and get a TMessage in exactly
+ *  the standard format, without the service name prepended to TMessage.name.
+ */
+class StoredMessageProtocol extends TProtocolDecorator
+{
+    private var messageBegin : TMessage;
+
+    public function new( protocol : TProtocol, messageBegin : TMessage) {
+        super( protocol);
+        this.messageBegin = messageBegin;
+    }
+
+    public override function readMessageBegin() : TMessage {
+        return messageBegin;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProtocol.hx
----------------------------------------------------------------------
diff --git a/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProtocol.hx b/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProtocol.hx
new file mode 100644
index 0000000..cacd1d7
--- /dev/null
+++ b/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProtocol.hx
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.protocol;
+
+import org.apache.thrift.transport.TTransport;
+
+
+/**
+ * TMultiplexedProtocol is a protocol-independent concrete decorator that allows a Thrift
+ * client to communicate with a multiplexing Thrift server, by prepending the service name
+ * to the function name during function calls.
+ *
+ * NOTE: THIS IS NOT TO BE USED BY SERVERS.
+ * On the server, use TMultiplexedProcessor to handle requests from a multiplexing client.
+ *
+ * This example uses a single socket transport to invoke two services:
+ *
+ *     TSocket transport = new TSocket("localhost", 9090);
+ *     transport.open();
+ *
+ *     TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ *
+ *     TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");
+ *     Calculator.Client service = new Calculator.Client(mp);
+ *
+ *     TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");
+ *     WeatherReport.Client service2 = new WeatherReport.Client(mp2);
+ *
+ *     System.out.println(service.add(2,2));
+ *     System.out.println(service2.getTemperature());
+ *
+ */
+class TMultiplexedProtocol extends TProtocolDecorator {
+
+    /** Used to delimit the service name from the function name */
+    public static inline var SEPARATOR : String = ":";
+
+    private var service : String;
+
+    /**
+     * Wrap the specified protocol, allowing it to be used to communicate with a
+     * multiplexing server.  The <code>serviceName</code> is required as it is
+     * prepended to the message header so that the multiplexing server can broker
+     * the function call to the proper service.
+     *
+     * Args:
+     *  protocol        Your communication protocol of choice, e.g. TBinaryProtocol
+     *  serviceName     The service name of the service communicating via this protocol.
+     */
+    public function new( protocol : TProtocol, serviceName : String)    {
+        super( protocol);
+        service = serviceName;
+    }
+
+    /**
+     * Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR.
+     * Args:
+     *   tMessage     The original message.
+     */
+    public override function writeMessageBegin( message : TMessage) : Void {
+        switch( message.type)
+        {
+            case TMessageType.CALL:
+                super.writeMessageBegin(new TMessage(
+                    service + SEPARATOR + message.name,
+                    message.type,
+                    message.seqid));
+
+            case TMessageType.ONEWAY:
+                super.writeMessageBegin(new TMessage(
+                    service + SEPARATOR + message.name,
+                    message.type,
+                    message.seqid));
+
+            default:
+                super.writeMessageBegin(message);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/src/org/apache/thrift/protocol/TProtocolDecorator.hx
----------------------------------------------------------------------
diff --git a/lib/haxe/src/org/apache/thrift/protocol/TProtocolDecorator.hx b/lib/haxe/src/org/apache/thrift/protocol/TProtocolDecorator.hx
new file mode 100644
index 0000000..e43d2d9
--- /dev/null
+++ b/lib/haxe/src/org/apache/thrift/protocol/TProtocolDecorator.hx
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.protocol;
+
+import haxe.io.Bytes;
+import haxe.Int64;
+
+import org.apache.thrift.transport.TTransport;
+
+
+/**
+ * TProtocolDecorator forwards all requests to an enclosed TProtocol instance,
+ * providing a way to author concise concrete decorator subclasses.  While it has
+ * no abstract methods, it is marked abstract as a reminder that by itself,
+ * it does not modify the behaviour of the enclosed TProtocol.
+ *
+ * See p.175 of Design Patterns (by Gamma et al.)
+ * See TMultiplexedProtocol
+ */
+class TProtocolDecorator implements TProtocol
+{
+    private var wrapped : TProtocol;
+
+    /**
+     * Encloses the specified protocol.
+     * @param protocol All operations will be forward to this protocol.  Must be non-null.
+     */
+    private function new( protocol : TProtocol)  // not to be instantiated, must derive a class
+    {
+        wrapped = protocol;
+    }
+
+    public function getTransport() : TTransport {
+      return wrapped.getTransport();
+    }
+
+    public function writeMessageBegin( value : TMessage) : Void    {
+        wrapped.writeMessageBegin( value);
+    }
+
+    public function writeMessageEnd() : Void {
+        wrapped.writeMessageEnd();
+    }
+
+    public function writeStructBegin(value : TStruct) : Void {
+        wrapped.writeStructBegin( value);
+    }
+
+    public function writeStructEnd() : Void {
+        wrapped.writeStructEnd();
+    }
+
+    public function writeFieldBegin(value : TField) : Void {
+        wrapped.writeFieldBegin( value);
+    }
+
+    public function writeFieldEnd() : Void {
+        wrapped.writeFieldEnd();
+    }
+
+    public function writeFieldStop() : Void {
+        wrapped.writeFieldStop();
+    }
+
+    public function writeMapBegin( value : TMap) : Void {
+        wrapped.writeMapBegin( value);
+    }
+
+    public function writeMapEnd() : Void {
+        wrapped.writeMapEnd();
+    }
+
+    public function writeListBegin( value : TList) : Void {
+        wrapped.writeListBegin( value);
+    }
+
+    public function writeListEnd() : Void {
+        wrapped.writeListEnd();
+    }
+
+    public function writeSetBegin( value : TSet) : Void {
+        wrapped.writeSetBegin( value);
+    }
+
+    public function writeSetEnd() : Void {
+        wrapped.writeSetEnd();
+    }
+
+    public function writeBool(value : Bool) : Void {
+        wrapped.writeBool( value);
+    }
+
+    public function writeByte(value : Int) : Void {
+        wrapped.writeByte( value);
+    }
+
+    public function writeI16(value : Int) : Void {
+        wrapped.writeI16( value);
+    }
+
+    public function writeI32(value : Int) : Void {
+        wrapped.writeI32( value);
+    }
+
+    public function writeI64(value : haxe.Int64) : Void {
+        wrapped.writeI64( value);
+    }
+
+    public function writeDouble(value : Float) : Void {
+        wrapped.writeDouble( value);
+    }
+
+    public function writeString(value : String) : Void {
+        wrapped.writeString( value);
+    }
+
+    public function writeBinary(value : Bytes ) : Void {
+        wrapped.writeBinary( value);
+    }
+
+    public function readMessageBegin() : TMessage {
+        return wrapped.readMessageBegin();
+    }
+
+    public function readMessageEnd() : Void {
+        wrapped.readMessageEnd();
+    }
+
+    public function readStructBegin() : TStruct {
+        return wrapped.readStructBegin();
+    }
+
+    public function readStructEnd() : Void {
+        wrapped.readStructEnd();
+    }
+
+    public function readFieldBegin() : TField {
+        return wrapped.readFieldBegin();
+    }
+
+    public function readFieldEnd() : Void {
+        wrapped.readFieldEnd();
+    }
+
+    public function readMapBegin() : TMap {
+        return wrapped.readMapBegin();
+    }
+
+    public function readMapEnd() : Void {
+        wrapped.readMapEnd();
+    }
+
+    public function readListBegin() : TList {
+        return wrapped.readListBegin();
+    }
+
+    public function readListEnd() : Void {
+        wrapped.readListEnd();
+    }
+
+    public function readSetBegin() : TSet {
+        return wrapped.readSetBegin();
+    }
+
+    public function readSetEnd() : Void {
+        wrapped.readSetEnd();
+    }
+
+    public function readBool() : Bool
+    {
+        return wrapped.readBool();
+    }
+
+    public function readByte() : Int {
+        return wrapped.readByte();
+    }
+
+    public function readI16() : Int {
+        return wrapped.readI16();
+    }
+
+    public function readI32() : Int {
+        return wrapped.readI32();
+    }
+
+    public function readI64() : haxe.Int64 {
+        return wrapped.readI64();
+    }
+
+    public function readDouble() : Float {
+        return wrapped.readDouble();
+    }
+
+    public function readString() : String {
+        return wrapped.readString();
+    }
+
+    public function readBinary() : Bytes {
+        return wrapped.readBinary();
+    }
+}

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/test/HaxeTests.hxproj
----------------------------------------------------------------------
diff --git a/lib/haxe/test/HaxeTests.hxproj b/lib/haxe/test/HaxeTests.hxproj
index 4e8929b..3beed82 100644
--- a/lib/haxe/test/HaxeTests.hxproj
+++ b/lib/haxe/test/HaxeTests.hxproj
@@ -53,14 +53,16 @@
     <hidden path="python.hxml" />
   </hiddenPaths>
   <!-- Executed before build -->
-  <preBuildCommand>thrift -r -gen haxe  ../../../test/ThriftTest.thrift</preBuildCommand>
+  <preBuildCommand>thrift -r -gen haxe  ../../../test/ThriftTest.thrift
+thrift -r -gen haxe  ../../../contrib/async-test/aggr.thrift
+thrift -r -gen haxe  ../../../lib/rb/benchmark/Benchmark.thrift</preBuildCommand>
   <!-- Executed after build -->
   <postBuildCommand alwaysRun="False" />
   <!-- Other project options -->
   <options>
     <option showHiddenPaths="False" />
-    <option testMovie="Unknown" />
-    <option testMovieCommand="" />
+    <option testMovie="Custom" />
+    <option testMovieCommand="bin/HaxeTests/Main.exe server multiplex" />
   </options>
   <!-- Plugin storage -->
   <storage />

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/test/Makefile.am
----------------------------------------------------------------------
diff --git a/lib/haxe/test/Makefile.am b/lib/haxe/test/Makefile.am
index 357436c..13b4266 100644
--- a/lib/haxe/test/Makefile.am
+++ b/lib/haxe/test/Makefile.am
@@ -20,15 +20,25 @@
 THRIFT = $(top_srcdir)/compiler/cpp/thrift
 THRIFTCMD = $(THRIFT) --gen haxe -r
 THRIFTTEST = $(top_srcdir)/test/ThriftTest.thrift
+AGGR = $(top_srcdir)/contrib/async-test/aggr.thrift
+BENCHMARK = $(top_srcdir)/lib/rb/benchmark/Benchmark.thrift
 
 BIN_CPP = bin/Main-debug
 
 gen-haxe/thrift/test/ThriftTest.hx: $(THRIFTTEST)
 	$(THRIFTCMD) $(THRIFTTEST)
 
+gen-haxe/thrift/test/Aggr.hx: $(AGGR)
+	$(THRIFTCMD) $(AGGR)
+
+gen-haxe/thrift/test/BenchmarkService.hx: $(BENCHMARK)
+	$(THRIFTCMD) $(BENCHMARK)
+
 all-local: $(BIN_CPP)
 
-$(BIN_CPP):    gen-haxe/thrift/test/ThriftTest.hx
+$(BIN_CPP): gen-haxe/thrift/test/ThriftTest.hx \
+			gen-haxe/thrift/test/Aggr.hx \
+			gen-haxe/thrift/test/BenchmarkService.hx
 	$(HAXE) --cwd .  cpp.hxml
 
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/test/make_all.bat
----------------------------------------------------------------------
diff --git a/lib/haxe/test/make_all.bat b/lib/haxe/test/make_all.bat
index ee18f10..0314e18 100644
--- a/lib/haxe/test/make_all.bat
+++ b/lib/haxe/test/make_all.bat
@@ -26,7 +26,9 @@ if "%HAXEPATH%"=="" goto NOTINSTALLED
 set path=%HAXEPATH%;%HAXEPATH%\..\neko;%path%
 
 rem # invoke Thrift comnpiler
-thrift -r -gen haxe   ..\..\..\test\ThriftTest.thrift
+thrift -r -gen haxe  ..\..\..\test\ThriftTest.thrift
+thrift -r -gen haxe  ..\..\..\contrib\async-test\aggr.thrift
+thrift -r -gen haxe  ..\..\..\lib\rb\benchmark\Benchmark.thrift
 if errorlevel 1 goto STOP
 
 rem # invoke Haxe compiler for all targets

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/test/make_all.sh
----------------------------------------------------------------------
diff --git a/lib/haxe/test/make_all.sh b/lib/haxe/test/make_all.sh
index 13b5754..512f5ec 100644
--- a/lib/haxe/test/make_all.sh
+++ b/lib/haxe/test/make_all.sh
@@ -20,6 +20,8 @@
 
 # invoke Thrift comnpiler
 thrift -r -gen haxe  ../../../test/ThriftTest.thrift
+thrift -r -gen haxe  ../../../contrib/async-test/aggr.thrift
+thrift -r -gen haxe  ../../../lib/rb/benchmark/Benchmark.thrift
 
 # output folder
 if [ ! -d bin ]; then

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/test/src/Main.hx
----------------------------------------------------------------------
diff --git a/lib/haxe/test/src/Main.hx b/lib/haxe/test/src/Main.hx
index da0a7f5..6c262d7 100644
--- a/lib/haxe/test/src/Main.hx
+++ b/lib/haxe/test/src/Main.hx
@@ -27,19 +27,67 @@ import org.apache.thrift.meta_data.*;
 
 import thrift.test.*;  // generated code
 
+
+enum WhatTests {
+    Normal;
+    Multiplex;
+}
+
 class Main
 {
+    static private var tests : WhatTests = Normal;
+    static private var server : Bool = false;
+
+    static private inline var CMDLINEHELP : String
+        = "\nHaxeTests  [client|server]  [multiplex]\n"
+        + "  client|server  ... determines run mode for some tests, default is client\n"
+        + "  multiplex ........ run multiplex test server or client\n";
+
+    static private function ParseArgs() {
+        #if sys
+
+        var args = Sys.args();
+        if ( args != null) {
+            for ( arg in args) {
+                switch(arg.toLowerCase()) {
+                    case "client":
+                        server = false;
+                    case "server" :
+                        server = true;
+                    case "multiplex" :
+                        tests = Multiplex;
+                    default:
+                throw 'Invalid argument "$arg"\n'+CMDLINEHELP;
+                }
+            }
+        }
+
+        #end
+    }
+
     static public function main()
     {
         try
         {
-            StreamTest.Run();
+            ParseArgs();
+
+            switch( tests) {
+                case Normal:
+                    StreamTest.Run(server);
+                case Multiplex:
+                    MultiplexTest.Run(server);
+                default:
+                    throw "Unhandled test mode $tests";
+            }
 
             trace("All tests completed.");
         }
         catch( e: Dynamic)
         {
             trace('$e');
+            #if sys
+            Sys.exit(1);  // indicate error
+            #end
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/test/src/MultiplexTest.hx
----------------------------------------------------------------------
diff --git a/lib/haxe/test/src/MultiplexTest.hx b/lib/haxe/test/src/MultiplexTest.hx
new file mode 100644
index 0000000..3818b66
--- /dev/null
+++ b/lib/haxe/test/src/MultiplexTest.hx
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package;
+
+import haxe.Int64;
+import haxe.Int32;
+
+import org.apache.thrift.*;
+import org.apache.thrift.protocol.*;
+import org.apache.thrift.transport.*;
+import org.apache.thrift.server.*;
+import org.apache.thrift.meta_data.*;
+
+// debug only
+import org.apache.thrift.protocol.TProtocolDecorator;
+import org.apache.thrift.protocol.TMultiplexedProtocol;
+import org.apache.thrift.protocol.TMultiplexedProcessor;
+
+// generated code imports
+import Aggr;
+import AggrImpl;
+import AggrProcessor;
+import BenchmarkService;
+import BenchmarkServiceImpl;
+import BenchmarkServiceProcessor;
+import Error;
+
+
+class BenchmarkServiceHandler implements BenchmarkService
+{
+    public function new() {
+    }
+
+    public function fibonacci(n : haxe.Int32) : haxe.Int32 {
+        trace('Benchmark.fibonacci($n)');
+        var next : Int;
+        var prev   = 0;
+        var result = 1;
+        while( n > 0)
+        {
+            next   = result + prev;
+            prev   = result;
+            result = next;
+            --n;
+        }
+        return result;
+    }
+}
+
+
+class AggrServiceHandler implements Aggr
+{
+    private var values : List<haxe.Int32> = new List<haxe.Int32>();
+
+    public function new() {
+    }
+
+    public function addValue(value : haxe.Int32) : Void    {
+        trace('Aggr.addValue($value)');
+        values.add( value);
+    }
+
+    public function getValues() : List< haxe.Int32> {
+        trace('Aggr.getValues()');
+        return values;
+    }
+}
+
+
+
+class MultiplexTest extends TestBase {
+
+    private inline static var NAME_BENCHMARKSERVICE : String = "BenchmarkService";
+    private inline static var NAME_AGGR             : String  = "Aggr";
+
+
+    public static override function Run(server : Bool) : Void {
+        if ( server) {
+            RunMultiplexServer();
+        } else {
+            RunMultiplexClient();
+            RunDefaultClient();
+        }
+    }
+
+
+    // run the multiplex server
+    public static override function RunMultiplexServer() : Void  {
+       try
+       {
+            var benchHandler : BenchmarkService = new BenchmarkServiceHandler();
+            var benchProcessor : TProcessor = new BenchmarkServiceProcessor( benchHandler);
+
+            var aggrHandler : Aggr = new AggrServiceHandler();
+            var aggrProcessor : TProcessor = new AggrProcessor( aggrHandler);
+
+            var multiplex : TMultiplexedProcessor = new TMultiplexedProcessor();
+            multiplex.RegisterProcessor( NAME_BENCHMARKSERVICE, benchProcessor, true);  // default
+            multiplex.RegisterProcessor( NAME_AGGR, aggrProcessor);
+
+            // protocol+transport stack
+            var protfact : TProtocolFactory = new TBinaryProtocolFactory(true,true);
+            var servertrans : TServerTransport = new TServerSocket( 9090, 5, false);
+            var transfact : TTransportFactory = new TFramedTransportFactory();
+
+            var server : TServer = new TSimpleServer( multiplex, servertrans, transfact, protfact);
+
+            trace("Starting the server ...");
+            server.Serve();
+       }
+        catch( e : TApplicationException)
+        {
+            TestBase.Expect(false,'${e.errorID} ${e.errorMsg}');
+        }
+        catch( e : TException)
+        {
+            TestBase.Expect(false,'$e');
+        }
+    }
+
+
+    // run multiplex client against multiplex server
+    public static override function RunMultiplexClient() : Void  {
+        try
+        {
+            var trans : TTransport;
+            trans = new TSocket("localhost", 9090);
+            trans = new TFramedTransport(trans);
+            trans.open();
+
+            var protocol : TProtocol = new TBinaryProtocol(trans,true,true);
+            var multiplex : TMultiplexedProtocol;
+
+            multiplex = new TMultiplexedProtocol( protocol, NAME_BENCHMARKSERVICE);
+            var bench = new BenchmarkServiceImpl( multiplex);
+
+            multiplex = new TMultiplexedProtocol( protocol, NAME_AGGR);
+            var aggr = new AggrImpl( multiplex);
+
+            trace('calling aggr.add( bench.fibo())...');
+            for( i in 1 ... 10)
+            {
+                trace('$i');
+                aggr.addValue( bench.fibonacci(i));
+            }
+
+            trace('calling aggr ...');
+            var i = 1;
+            var values = aggr.getValues();
+            TestBase.Expect(values != null,'aggr.getValues() == null');
+            for( k in values)
+            {
+                trace('fib($i) = $k');
+                ++i;
+            }
+
+            trans.close();
+            trace('done.');
+
+        }
+        catch( e : TApplicationException)
+        {
+            TestBase.Expect(false,'${e.errorID} ${e.errorMsg}');
+        }
+        catch( e : TException)
+        {
+            TestBase.Expect(false,'$e');
+        }
+    }
+
+
+    // run non-multiplex client against multiplex server to test default fallback
+    public static override function RunDefaultClient() : Void  {
+        try
+        {
+            var trans : TTransport;
+            trans = new TSocket("localhost", 9090);
+            trans = new TFramedTransport(trans);
+            trans.open();
+
+            var protocol : TProtocol = new TBinaryProtocol(trans,true,true);
+
+            var bench = new BenchmarkServiceImpl( protocol);
+
+            trace('calling bench (via default) ...');
+            for( i in 1 ... 10)
+            {
+                var k = bench.fibonacci(i);
+                trace('fib($i) = $k');
+            }
+
+            trans.close();
+            trace('done.');
+        }
+        catch( e : TApplicationException)
+        {
+            TestBase.Expect(false,'${e.errorID} ${e.errorMsg}');
+        }
+        catch( e : TException)
+        {
+            TestBase.Expect(false,'$e');
+        }
+    }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/test/src/StreamTest.hx
----------------------------------------------------------------------
diff --git a/lib/haxe/test/src/StreamTest.hx b/lib/haxe/test/src/StreamTest.hx
index 7500eee..244f1ea 100644
--- a/lib/haxe/test/src/StreamTest.hx
+++ b/lib/haxe/test/src/StreamTest.hx
@@ -20,6 +20,7 @@
 package;
 
 import haxe.Int64;
+import sys.FileSystem;
 
 import org.apache.thrift.*;
 import org.apache.thrift.protocol.*;
@@ -33,15 +34,9 @@ import thrift.test.*;  // generated code
 class StreamTest extends TestBase {
 
 
-    private inline static var tmpfile : String = "bin/data.tmp";
+    private inline static var tmpfile : String = "data.tmp";
 
 
-    private static function Expect( expr : Bool, info : String, ?pos : haxe.PosInfos) : Void {
-        if( ! expr) {
-            throw ('Test "$info" failed at '+pos.methodName+' in '+pos.fileName+':'+pos.lineNumber);
-        }
-    }
-
     private static function MakeTestData() : Xtruct {
         var data : Xtruct = new Xtruct();
         data.string_thing = "Streamtest";
@@ -77,15 +72,22 @@ class StreamTest extends TestBase {
         return data;
     }
 
-    public static override function Run() : Void
+    public static override function Run(server : Bool) : Void
     {
-        var written = WriteData();
-        var read = ReadData();
-
-        Expect( read.string_thing == written.string_thing, "string data");
-        Expect( read.byte_thing == written.byte_thing, "byte data");
-        Expect( read.i32_thing == written.i32_thing, "i32 data");
-        Expect( Int64.compare( read.i64_thing, written.i64_thing) == 0, "i64 data");
+        try {
+            var written = WriteData();
+            var read = ReadData();
+            FileSystem.deleteFile(tmpfile);
+
+            TestBase.Expect( read.string_thing == written.string_thing, "string data");
+            TestBase.Expect( read.byte_thing == written.byte_thing, "byte data");
+            TestBase.Expect( read.i32_thing == written.i32_thing, "i32 data");
+            TestBase.Expect( Int64.compare( read.i64_thing, written.i64_thing) == 0, "i64 data");
+
+        } catch(e:Dynamic) {
+            FileSystem.deleteFile(tmpfile);
+            throw e;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5ff9a86/lib/haxe/test/src/TestBase.hx
----------------------------------------------------------------------
diff --git a/lib/haxe/test/src/TestBase.hx b/lib/haxe/test/src/TestBase.hx
index 2a344d6..1232773 100644
--- a/lib/haxe/test/src/TestBase.hx
+++ b/lib/haxe/test/src/TestBase.hx
@@ -25,7 +25,6 @@ import org.apache.thrift.transport.*;
 import org.apache.thrift.server.*;
 import org.apache.thrift.meta_data.*;
 
-import thrift.test.*;  // generated code
 
 class TestBase {
 
@@ -33,8 +32,15 @@ class TestBase {
         // override, if necessary
     }
 
-    public static function Run() : Void {
+    public static function Run(server : Bool) : Void {
           throw new AbstractMethodError();
     }
+
+    public static function Expect( expr : Bool, info : String, ?pos : haxe.PosInfos) : Void {
+        if( ! expr) {
+            throw ('Test "$info" failed at '+pos.methodName+' in '+pos.fileName+':'+pos.lineNumber);
+        }
+    }
+
 }
  
\ No newline at end of file