You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2008/06/11 00:58:52 UTC
svn commit: r666386 - in /incubator/thrift/trunk/lib/alterl/src:
thrift_binary_protocol.erl thrift_buffered_transport.erl
thrift_processor.erl thrift_protocol.erl thrift_server.erl
thrift_socket_transport.erl thrift_transport.erl
Author: dreiss
Date: Tue Jun 10 15:58:52 2008
New Revision: 666386
URL: http://svn.apache.org/viewvc?rev=666386&view=rev
Log:
Implement buffered transport
Added:
incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl
- copied, changed from r666385, incubator/thrift/trunk/lib/alterl/src/thrift_server.erl
Modified:
incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl
incubator/thrift/trunk/lib/alterl/src/thrift_processor.erl
incubator/thrift/trunk/lib/alterl/src/thrift_protocol.erl
incubator/thrift/trunk/lib/alterl/src/thrift_server.erl
incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl
incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl Tue Jun 10 15:58:52 2008
@@ -13,7 +13,8 @@
-export([new/1,
read/2,
- write/2
+ write/2,
+ flush_transport/1
]).
-record(binary_protocol, {transport}).
@@ -26,6 +27,8 @@
new(Transport) ->
thrift_protocol:new(?MODULE, #binary_protocol{transport = Transport}).
+flush_transport(#binary_protocol{transport = Transport}) ->
+ thrift_transport:flush(Transport).
%%%
%%% instance methods
Copied: incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl (from r666385, incubator/thrift/trunk/lib/alterl/src/thrift_server.erl)
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl?p2=incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl&p1=incubator/thrift/trunk/lib/alterl/src/thrift_server.erl&r1=666385&r2=666386&rev=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_server.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl Tue Jun 10 15:58:52 2008
@@ -1,24 +1,35 @@
%%%-------------------------------------------------------------------
-%%% File : thrift_server.erl
+%%% File : thrift_buffered_transport.erl
%%% Author : <to...@lipcon.org>
-%%% Description :
+%%% Description : Buffered transport for thrift
%%%
-%%% Created : 28 Jan 2008 by <to...@lipcon.org>
+%%% Created : 30 Jan 2008 by <to...@lipcon.org>
%%%-------------------------------------------------------------------
--module(thrift_server).
+-module(thrift_buffered_transport).
-behaviour(gen_server).
+-behaviour(thrift_transport).
%% API
--export([start_link/3]).
+-export([new/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--define(SERVER, ?MODULE).
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1]).
--record(state, {listen_socket, acceptor, service}).
+-record(state, {
+ % The wrapped transport
+ wrapped,
+
+ % a list of binaries which will be concatenated and sent during
+ % a flush.
+ %
+ % *** THIS LIST IS STORED IN REVERSE ORDER!!! ***
+ %
+ buffer}).
%%====================================================================
%% API
@@ -27,8 +38,43 @@
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
-start_link(Port, Service, HandlerModule) when is_integer(Port), is_atom(HandlerModule) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, {Port, Service, HandlerModule}, []).
+new(WrappedTransport) ->
+ case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
+
+
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = binary()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) when is_binary(Data) ->
+ gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transpor) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, {flush}).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}).
%%====================================================================
%% gen_server callbacks
@@ -41,17 +87,9 @@
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
-init({Port, Service, Handler}) ->
- {ok, Socket} = gen_tcp:listen(Port,
- [binary,
- {packet, 0},
- {active, false},
- {nodelay, true},
- {reuseaddr, true}]),
- Acceptor = spawn_link(fun () -> acceptor(Socket, Service, Handler) end),
- {ok, #state{listen_socket = Socket,
- acceptor = Acceptor,
- service = Service}}.
+init([Wrapped]) ->
+ {ok, #state{wrapped = Wrapped,
+ buffer = []}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -62,9 +100,19 @@
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
-handle_call(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
+handle_call({write, Data}, _From, State = #state{buffer = Buffer}) ->
+ {reply, ok, State#state{buffer = [Data | Buffer]}};
+
+handle_call({read, Len}, _From, State = #state{wrapped = Wrapped}) ->
+ Response = thrift_transport:read(Wrapped, Len),
+ {reply, Response, State};
+
+handle_call({flush}, _From, State = #state{buffer = Buffer,
+ wrapped = Wrapped}) ->
+ Concat = concat_binary(lists:reverse(Buffer)),
+ Response = thrift_transport:write(Wrapped, Concat),
+ % todo(todd) - flush wrapped transport here?
+ {reply, Response, State#state{buffer = []}}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
@@ -99,25 +147,8 @@
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
- State#state.acceptor ! refresh,
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
-
-acceptor(ListenSocket, Service, Handler)
- when is_port(ListenSocket), is_atom(Handler) ->
- {ok, Socket} = gen_tcp:accept(ListenSocket),
- error_logger:info_msg("Accepted client"),
-
- {ok, Transport} = thrift_socket_transport:new(Socket),
- {ok, Protocol} = thrift_binary_protocol:new(Transport),
-
- thrift_processor:start(Protocol, Protocol, Service, Handler),
- receive
- refresh ->
- error_logger:info_msg("Acceptor refreshing~n"),
- ?MODULE:acceptor(ListenSocket, Service, Handler)
- after 0 -> acceptor(ListenSocket, Service, Handler)
- end.
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_processor.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_processor.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_processor.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_processor.erl Tue Jun 10 15:58:52 2008
@@ -123,6 +123,7 @@
seqid = 0}),
ok = thrift_protocol:write(OProto, Reply),
ok = thrift_protocol:write(OProto, message_end),
+ ok = thrift_protocol:flush_transport(OProto),
ok.
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_protocol.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_protocol.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_protocol.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_protocol.erl Tue Jun 10 15:58:52 2008
@@ -4,6 +4,7 @@
write/2,
read/2,
skip/2,
+ flush_transport/1,
typeid_to_atom/1,
@@ -17,7 +18,8 @@
behaviour_info(callbacks) ->
[
{read, 2},
- {write, 2}
+ {write, 2},
+ {flush_transport, 1}
];
behaviour_info(_Else) -> undefined.
@@ -27,6 +29,10 @@
data = Data}}.
+flush_transport(#protocol{module = Module,
+ data = Data}) ->
+ Module:flush_transport(Data).
+
typeid_to_atom(?tType_STOP) -> field_stop;
typeid_to_atom(?tType_VOID) -> void;
typeid_to_atom(?tType_BOOL) -> bool;
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_server.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_server.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_server.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_server.erl Tue Jun 10 15:58:52 2008
@@ -111,8 +111,9 @@
{ok, Socket} = gen_tcp:accept(ListenSocket),
error_logger:info_msg("Accepted client"),
- {ok, Transport} = thrift_socket_transport:new(Socket),
- {ok, Protocol} = thrift_binary_protocol:new(Transport),
+ {ok, SocketTransport} = thrift_socket_transport:new(Socket),
+ {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
+ {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
thrift_processor:start(Protocol, Protocol, Service, Handler),
receive
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl Tue Jun 10 15:58:52 2008
@@ -4,7 +4,7 @@
-export([new/1,
- write/2, read/2]).
+ write/2, read/2, flush/1]).
-record(data, {socket}).
@@ -16,3 +16,7 @@
read(#data{socket = Socket}, Len) when is_integer(Len), Len >= 0 ->
gen_tcp:recv(Socket, Len).
+
+% We can't really flush - everything is flushed when we write
+flush(_) ->
+ ok.
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl?rev=666386&r1=666385&r2=666386&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl Tue Jun 10 15:58:52 2008
@@ -4,12 +4,14 @@
new/2,
write/2,
- read/2
+ read/2,
+ flush/1
]).
behaviour_info(callbacks) ->
[{write/2,
- read/2}];
+ read/2,
+ flush/1}];
behaviour_info(_Else) -> undefined.
@@ -27,3 +29,6 @@
read(Transport, Len) when is_integer(Len) ->
Module = Transport#transport.module,
Module:read(Transport#transport.data, Len).
+
+flush(#transport{module = Module, data = Data}) ->
+ Module:flush(Data).