You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2014/11/29 19:23:50 UTC

thrift git commit: THRIFT-2861 add buffered transport Client: Haxe Patch: Jens Geyer

Repository: thrift
Updated Branches:
  refs/heads/master 406e7956c -> d35f616c7


THRIFT-2861 add buffered transport
Client: Haxe
Patch: Jens Geyer

This closes #292


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/d35f616c
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/d35f616c
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/d35f616c

Branch: refs/heads/master
Commit: d35f616c7324d1d50651bcf035b3e2bc40a1a3a3
Parents: 406e795
Author: Jens Geyer <je...@apache.org>
Authored: Sat Nov 29 19:23:03 2014 +0100
Committer: Jens Geyer <je...@apache.org>
Committed: Sat Nov 29 19:23:19 2014 +0100

----------------------------------------------------------------------
 .../thrift/transport/TBufferedTransport.hx      | 155 +++++++++++++++++++
 .../transport/TBufferedTransportFactory.hx      |  37 +++++
 .../src/org/apache/thrift/transport/TSocket.hx  |  17 +-
 test/haxe/src/TestClient.hx                     |   3 +-
 test/haxe/src/TestServer.hx                     |   3 +-
 5 files changed, 205 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/d35f616c/lib/haxe/src/org/apache/thrift/transport/TBufferedTransport.hx
----------------------------------------------------------------------
diff --git a/lib/haxe/src/org/apache/thrift/transport/TBufferedTransport.hx b/lib/haxe/src/org/apache/thrift/transport/TBufferedTransport.hx
new file mode 100644
index 0000000..4b33fcf
--- /dev/null
+++ b/lib/haxe/src/org/apache/thrift/transport/TBufferedTransport.hx
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.transport;
+
+import org.apache.thrift.transport.*;
+
+import haxe.io.Eof;
+import haxe.io.Bytes;
+import haxe.io.BytesBuffer;
+import haxe.io.BytesOutput;
+import haxe.io.BytesInput;
+
+
+class TBufferedTransport extends TTransport
+{
+    // constants
+    public static inline var DEFAULT_BUFSIZE : Int = 0x1000;    // 4096 Bytes
+    public static inline var MIN_BUFSIZE : Int = 0x100;         // 256 Bytes
+    public static inline var MAX_BUFSIZE : Int = 0x100000;      // 1 MB
+
+    // Underlying transport
+    public var transport(default,null) :  TTransport = null;
+
+    // Buffer for input/output
+    private var readBuffer_ : BytesInput = null;
+    private var writeBuffer_ : BytesOutput = null;
+    private var bufSize : Int;
+
+    // Constructor wraps around another transport
+    public function new( transport : TTransport, bufSize : Int = DEFAULT_BUFSIZE) {
+
+        // ensure buffer size is in the range
+        if ( bufSize < MIN_BUFSIZE)
+            bufSize = MIN_BUFSIZE;
+        else if( bufSize > MAX_BUFSIZE)
+            bufSize = MAX_BUFSIZE;
+
+        this.transport = transport;
+        this.bufSize = bufSize;
+        this.writeBuffer_ = new BytesOutput();
+        this.writeBuffer_.bigEndian = true;
+    }
+
+    public override function open() : Void {
+        transport.open();
+    }
+
+    public override function isOpen() : Bool {
+        return transport.isOpen();
+    }
+
+    public override function close() : Void {
+        transport.close();
+    }
+
+    public override function read(buf : BytesBuffer, off : Int, len : Int) : Int {
+        try {
+            var data = Bytes.alloc(len);
+
+            while( true) {
+                if ((readBuffer_ != null) && (readBuffer_.position < readBuffer_.length)) {
+                    var got = readBuffer_.readBytes(data, 0, len);
+                    if (got > 0) {
+                        buf.addBytes(data, 0, got);
+                        return got;
+                    }
+                }
+
+                // there is no point in buffering whenever the
+                // remaining length exceeds the buffer size
+                if ( len >= bufSize) {
+                    var got = transport.read( buf, off, len);
+                    if (got > 0) {
+                        buf.addBytes(data, 0, got);
+                        return got;
+                    }
+                }
+
+                // fill the buffer
+                if ( readChunk() <= 0)
+                    break;
+            }
+
+            throw new TTransportException(TTransportException.END_OF_FILE, 'Can\'t read $len bytes!');
+        }
+        catch (eof : Eof) {
+            throw new TTransportException(TTransportException.END_OF_FILE, 'Can\'t read $len bytes!');
+        }
+    }
+
+    function readChunk() : Int {
+        var size = bufSize;
+        try {
+            var buffer = new BytesBuffer();
+            size = transport.read( buffer, 0, size);
+            readBuffer_ = new BytesInput( buffer.getBytes(), 0, size);
+            readBuffer_.bigEndian = true;
+            return size;
+        }
+        catch(eof : Eof) {
+            throw new TTransportException(TTransportException.END_OF_FILE, 'Can\'t read $size bytes!');
+        }
+    }
+
+    private function writeChunk(forceWrite : Bool) : Void {
+        if( writeBuffer_.length > 0) {
+            if ( forceWrite || (writeBuffer_.length >= bufSize)) {
+                var buf = writeBuffer_.getBytes();
+                writeBuffer_ = new BytesOutput();
+                writeBuffer_.bigEndian = true;
+                transport.write(buf, 0, buf.length);
+            }
+        }
+    }
+
+    public override function write(buf : Bytes, off : Int, len : Int) : Void {
+        var halfSize : Int = Std.int(bufSize / 2);
+
+        // No point in buffering if len exceeds the buffer size.
+        // However, if the buffer is less than half full we should still consider
+        // squashing all into one write, except when the actual write len is very large.
+        var huge_write : Bool = (len >= (2 * bufSize));
+        var exceeds_buf : Bool = huge_write || (len >= bufSize);
+        var write_thru : Bool = exceeds_buf && (writeBuffer_.length >= halfSize);
+        if ( write_thru) {
+            writeChunk(true); // force send whatever we have in there
+            transport.write(buf, off, len);  // write thru
+        } else {
+            writeBuffer_.writeBytes(buf, off, len);
+            writeChunk(false);
+        }
+    }
+
+    public override function flush( callback : Dynamic->Void =null) : Void {
+        writeChunk(true);
+        transport.flush(callback);
+    }
+}

http://git-wip-us.apache.org/repos/asf/thrift/blob/d35f616c/lib/haxe/src/org/apache/thrift/transport/TBufferedTransportFactory.hx
----------------------------------------------------------------------
diff --git a/lib/haxe/src/org/apache/thrift/transport/TBufferedTransportFactory.hx b/lib/haxe/src/org/apache/thrift/transport/TBufferedTransportFactory.hx
new file mode 100644
index 0000000..539e720
--- /dev/null
+++ b/lib/haxe/src/org/apache/thrift/transport/TBufferedTransportFactory.hx
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.transport;
+
+import org.apache.thrift.transport.*;
+
+
+class TBufferedTransportFactory extends TTransportFactory {
+
+    private var bufSize : Int;
+
+    public function new(bufSize : Int = TBufferedTransport.DEFAULT_BUFSIZE) {
+        super();
+        this.bufSize = bufSize;
+    }
+
+    public override function getTransport(base : TTransport) : TTransport {
+        return new TBufferedTransport(base, bufSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/thrift/blob/d35f616c/lib/haxe/src/org/apache/thrift/transport/TSocket.hx
----------------------------------------------------------------------
diff --git a/lib/haxe/src/org/apache/thrift/transport/TSocket.hx b/lib/haxe/src/org/apache/thrift/transport/TSocket.hx
index b6f2119..8aa0608 100644
--- a/lib/haxe/src/org/apache/thrift/transport/TSocket.hx
+++ b/lib/haxe/src/org/apache/thrift/transport/TSocket.hx
@@ -73,6 +73,8 @@ class TSocket extends TTransport  {
     private var output : Output = null;
     #end
 
+    private static inline var DEFAULT_TIMEOUT = 5.0;
+
     private var obuffer : BytesOutput = new BytesOutput();
     private var ioCallback : TException->Void = null;
     private var readCount : Int = 0;
@@ -117,7 +119,8 @@ class TSocket extends TTransport  {
         }
     }
 
-
+    // Reads up to len bytes into buffer buf, starting att offset off.
+    // May return less bytes than len required
     public override function read( buf : BytesBuffer, off : Int, len : Int) : Int   {
         try
         {
@@ -149,10 +152,12 @@ class TSocket extends TTransport  {
                 input.read(off-readCount);
                 readCount = off;
             }
-            var data = input.read(len);
-            readCount += data.length;
-            buf.add(data);
-            return data.length;
+
+            var data = Bytes.alloc(len);
+            var got = input.readBytes(data, 0, len);
+            buf.addBytes( data, 0, got);
+            readCount += got;
+            return got;
 
             #end
         }
@@ -269,7 +274,7 @@ class TSocket extends TTransport  {
         var socket = new Socket();
         socket.setBlocking(true);
         socket.setFastSend(true);
-        socket.setTimeout(5.0);
+        socket.setTimeout( DEFAULT_TIMEOUT);
         socket.connect(host, port);
 
         #end

http://git-wip-us.apache.org/repos/asf/thrift/blob/d35f616c/test/haxe/src/TestClient.hx
----------------------------------------------------------------------
diff --git a/test/haxe/src/TestClient.hx b/test/haxe/src/TestClient.hx
index 3f9158e..276cd0f 100644
--- a/test/haxe/src/TestClient.hx
+++ b/test/haxe/src/TestClient.hx
@@ -200,8 +200,7 @@ class TestClient {
         }
         if ( args.buffered) {
             trace("- buffered transport");
-            throw "TBufferedTransport not implemented yet";
-            //transport = new TBufferedTransport(transport);
+            transport = new TBufferedTransport(transport);
         }
 
         // protocol

http://git-wip-us.apache.org/repos/asf/thrift/blob/d35f616c/test/haxe/src/TestServer.hx
----------------------------------------------------------------------
diff --git a/test/haxe/src/TestServer.hx b/test/haxe/src/TestServer.hx
index e9c838f..502d0d1 100644
--- a/test/haxe/src/TestServer.hx
+++ b/test/haxe/src/TestServer.hx
@@ -56,8 +56,7 @@ class TestServer
             }
             if ( args.buffered) {
                 trace("- buffered transport");
-                throw "TBufferedTransport not implemented yet";
-                //transfactory = new TBufferedTransportFactory();
+                transfactory = new TBufferedTransportFactory();
             }
 
             // protocol