You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2008/06/11 00:58:52 UTC

svn commit: r666386 - in /incubator/thrift/trunk/lib/alterl/src: thrift_binary_protocol.erl thrift_buffered_transport.erl thrift_processor.erl thrift_protocol.erl thrift_server.erl thrift_socket_transport.erl thrift_transport.erl

Author: dreiss
Date: Tue Jun 10 15:58:52 2008
New Revision: 666386

URL: http://svn.apache.org/viewvc?rev=666386&view=rev
Log:
Implement buffered transport

Added:
    incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl
      - copied, changed from r666385, incubator/thrift/trunk/lib/alterl/src/thrift_server.erl
Modified:
    incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl
    incubator/thrift/trunk/lib/alterl/src/thrift_processor.erl
    incubator/thrift/trunk/lib/alterl/src/thrift_protocol.erl
    incubator/thrift/trunk/lib/alterl/src/thrift_server.erl
    incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl
    incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl Tue Jun 10 15:58:52 2008
@@ -13,7 +13,8 @@
 
 -export([new/1,
          read/2,
-         write/2
+         write/2,
+         flush_transport/1
 ]).
 
 -record(binary_protocol, {transport}).
@@ -26,6 +27,8 @@
 new(Transport) ->
     thrift_protocol:new(?MODULE, #binary_protocol{transport = Transport}).
 
+flush_transport(#binary_protocol{transport = Transport}) ->
+    thrift_transport:flush(Transport).
 
 %%%
 %%% instance methods

Copied: incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl (from r666385, incubator/thrift/trunk/lib/alterl/src/thrift_server.erl)
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl?p2=incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl&p1=incubator/thrift/trunk/lib/alterl/src/thrift_server.erl&r1=666385&r2=666386&rev=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_server.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl Tue Jun 10 15:58:52 2008
@@ -1,24 +1,35 @@
 %%%-------------------------------------------------------------------
-%%% File    : thrift_server.erl
+%%% File    : thrift_buffered_transport.erl
 %%% Author  :  <to...@lipcon.org>
-%%% Description : 
+%%% Description : Buffered transport for thrift
 %%%
-%%% Created : 28 Jan 2008 by  <to...@lipcon.org>
+%%% Created : 30 Jan 2008 by  <to...@lipcon.org>
 %%%-------------------------------------------------------------------
--module(thrift_server).
+-module(thrift_buffered_transport).
 
 -behaviour(gen_server).
+-behaviour(thrift_transport).
 
 %% API
--export([start_link/3]).
+-export([new/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--define(SERVER, ?MODULE).
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1]).
 
--record(state, {listen_socket, acceptor, service}).
+-record(state, {
+          % The wrapped transport
+          wrapped,
+
+          % a list of binaries which will be concatenated and sent during
+          % a flush.
+          %
+          % *** THIS LIST IS STORED IN REVERSE ORDER!!! ***
+          %
+          buffer}).
 
 %%====================================================================
 %% API
@@ -27,8 +38,43 @@
 %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
 %% Description: Starts the server
 %%--------------------------------------------------------------------
-start_link(Port, Service, HandlerModule) when is_integer(Port), is_atom(HandlerModule) ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, {Port, Service, HandlerModule}, []).
+new(WrappedTransport) ->
+    case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+        {ok, Pid} ->
+            thrift_transport:new(?MODULE, Pid);
+        Else ->
+            Else
+    end.
+
+
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%% 
+%% Data = binary()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) when is_binary(Data) ->
+    gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transpor) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+    gen_server:call(Transport, {flush}).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%% 
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+    gen_server:call(Transport, {read, Len}).
 
 %%====================================================================
 %% gen_server callbacks
@@ -41,17 +87,9 @@
 %%                         {stop, Reason}
 %% Description: Initiates the server
 %%--------------------------------------------------------------------
-init({Port, Service, Handler}) ->
-    {ok, Socket} = gen_tcp:listen(Port,
-                                  [binary,
-                                   {packet, 0},
-                                   {active, false},
-                                   {nodelay, true},
-                                   {reuseaddr, true}]),
-    Acceptor = spawn_link(fun () -> acceptor(Socket, Service, Handler) end),
-    {ok, #state{listen_socket = Socket,
-                acceptor = Acceptor,
-                service = Service}}.
+init([Wrapped]) ->
+    {ok, #state{wrapped = Wrapped,
+                buffer = []}}.
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -62,9 +100,19 @@
 %%                                      {stop, Reason, State}
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
-handle_call(_Request, _From, State) ->
-    Reply = ok,
-    {reply, Reply, State}.
+handle_call({write, Data}, _From, State = #state{buffer = Buffer}) ->
+    {reply, ok, State#state{buffer = [Data | Buffer]}};
+
+handle_call({read, Len}, _From, State = #state{wrapped = Wrapped}) ->
+    Response = thrift_transport:read(Wrapped, Len),
+    {reply, Response, State};
+
+handle_call({flush}, _From, State = #state{buffer = Buffer,
+                                           wrapped = Wrapped}) ->
+    Concat = concat_binary(lists:reverse(Buffer)),
+    Response = thrift_transport:write(Wrapped, Concat),
+    % todo(todd) - flush wrapped transport here?
+    {reply, Response, State#state{buffer = []}}.
 
 %%--------------------------------------------------------------------
 %% Function: handle_cast(Msg, State) -> {noreply, State} |
@@ -99,25 +147,8 @@
 %% Description: Convert process state when code is changed
 %%--------------------------------------------------------------------
 code_change(_OldVsn, State, _Extra) ->
-    State#state.acceptor ! refresh,
     {ok, State}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
-
-acceptor(ListenSocket, Service, Handler)
-  when is_port(ListenSocket), is_atom(Handler) ->
-    {ok, Socket} = gen_tcp:accept(ListenSocket),
-    error_logger:info_msg("Accepted client"),
-
-    {ok, Transport} = thrift_socket_transport:new(Socket),
-    {ok, Protocol} = thrift_binary_protocol:new(Transport),
-
-    thrift_processor:start(Protocol, Protocol, Service, Handler),
-    receive
-        refresh ->
-            error_logger:info_msg("Acceptor refreshing~n"),
-            ?MODULE:acceptor(ListenSocket, Service, Handler)
-    after 0      -> acceptor(ListenSocket, Service, Handler)
-    end.

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_processor.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_processor.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_processor.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_processor.erl Tue Jun 10 15:58:52 2008
@@ -123,6 +123,7 @@
                                  seqid = 0}),
     ok = thrift_protocol:write(OProto, Reply),
     ok = thrift_protocol:write(OProto, message_end),
+    ok = thrift_protocol:flush_transport(OProto),
     ok.
 
 

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_protocol.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_protocol.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_protocol.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_protocol.erl Tue Jun 10 15:58:52 2008
@@ -4,6 +4,7 @@
          write/2,
          read/2,
          skip/2,
+         flush_transport/1,
 
          typeid_to_atom/1,
 
@@ -17,7 +18,8 @@
 behaviour_info(callbacks) ->
     [
      {read, 2},
-     {write, 2}
+     {write, 2},
+     {flush_transport, 1}
     ];
 behaviour_info(_Else) -> undefined.
 
@@ -27,6 +29,10 @@
                    data = Data}}.
 
 
+flush_transport(#protocol{module = Module,
+                          data = Data}) ->
+    Module:flush_transport(Data).
+
 typeid_to_atom(?tType_STOP) -> field_stop;
 typeid_to_atom(?tType_VOID) -> void;
 typeid_to_atom(?tType_BOOL) -> bool;

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_server.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_server.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_server.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_server.erl Tue Jun 10 15:58:52 2008
@@ -111,8 +111,9 @@
     {ok, Socket} = gen_tcp:accept(ListenSocket),
     error_logger:info_msg("Accepted client"),
 
-    {ok, Transport} = thrift_socket_transport:new(Socket),
-    {ok, Protocol} = thrift_binary_protocol:new(Transport),
+    {ok, SocketTransport} = thrift_socket_transport:new(Socket),
+    {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
+    {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
 
     thrift_processor:start(Protocol, Protocol, Service, Handler),
     receive

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl Tue Jun 10 15:58:52 2008
@@ -4,7 +4,7 @@
 
 -export([new/1,
          
-         write/2, read/2]).
+         write/2, read/2, flush/1]).
 
 -record(data, {socket}).
 
@@ -16,3 +16,7 @@
 
 read(#data{socket = Socket}, Len) when is_integer(Len), Len >= 0 ->
     gen_tcp:recv(Socket, Len).
+
+% We can't really flush - everything is flushed when we write
+flush(_) ->
+     ok.

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl Tue Jun 10 15:58:52 2008
@@ -4,12 +4,14 @@
 
          new/2,
          write/2,
-         read/2
+         read/2,
+         flush/1
         ]).
 
 behaviour_info(callbacks) ->
     [{write/2,
-      read/2}];
+      read/2,
+      flush/1}];
 behaviour_info(_Else) -> undefined.
 
 
@@ -27,3 +29,6 @@
 read(Transport, Len) when is_integer(Len) ->
     Module = Transport#transport.module,
     Module:read(Transport#transport.data, Len).
+
+flush(#transport{module = Module, data = Data}) ->
+    Module:flush(Data).