You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/08/31 00:05:38 UTC

svn commit: r990986 - in /incubator/thrift/trunk/lib/erl/src: thrift_processor.erl thrift_server.erl thrift_socket_server.erl

Author: dreiss
Date: Mon Aug 30 22:05:38 2010
New Revision: 990986

URL: http://svn.apache.org/viewvc?rev=990986&view=rev
Log:
erlang: Refactor the processor

Now the server works.

Modified:
    incubator/thrift/trunk/lib/erl/src/thrift_processor.erl
    incubator/thrift/trunk/lib/erl/src/thrift_server.erl
    incubator/thrift/trunk/lib/erl/src/thrift_socket_server.erl

Modified: incubator/thrift/trunk/lib/erl/src/thrift_processor.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_processor.erl?rev=990986&r1=990985&r2=990986&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_processor.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_processor.erl Mon Aug 30 22:05:38 2010
@@ -24,53 +24,53 @@
 -include("thrift_constants.hrl").
 -include("thrift_protocol.hrl").
 
--record(thrift_processor, {handler, in_protocol, out_protocol, service}).
+-record(thrift_processor, {handler, protocol, service}).
 
 init({Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) ->
-    {ok, IProt, OProt} = ProtoGen(),
-    loop(#thrift_processor{in_protocol = IProt,
-                           out_protocol = OProt,
+    {ok, Proto} = ProtoGen(),
+    loop(#thrift_processor{protocol = Proto,
                            service = Service,
                            handler = Handler}).
 
-loop(State = #thrift_processor{in_protocol  = IProto,
-                               out_protocol = OProto}) ->
-    case thrift_protocol:read(IProto, message_begin) of
+loop(State0 = #thrift_processor{protocol  = Proto0}) ->
+    {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin),
+    State1 = State0#thrift_processor{protocol = Proto1},
+    case MessageBegin of
         #protocol_message_begin{name = Function,
                                 type = ?tMessageType_CALL} ->
-            ok = handle_function(State, list_to_atom(Function)),
-            loop(State);
+            {State2, ok} = handle_function(State1, list_to_atom(Function)),
+            loop(State2);
         #protocol_message_begin{name = Function,
                                 type = ?tMessageType_ONEWAY} ->
-            ok = handle_function(State, list_to_atom(Function)),
-            loop(State);
+            {State2, ok} = handle_function(State1, list_to_atom(Function)),
+            loop(State2);
         {error, timeout} ->
-            thrift_protocol:close_transport(OProto),
+            thrift_protocol:close_transport(Proto1),
             ok;
         {error, closed} ->
             %% error_logger:info_msg("Client disconnected~n"),
-            thrift_protocol:close_transport(OProto),
+            thrift_protocol:close_transport(Proto1),
             exit(shutdown)
     end.
 
-handle_function(State=#thrift_processor{in_protocol = IProto,
-                                        out_protocol = OProto,
-                                        handler = Handler,
-                                        service = Service},
+handle_function(State0=#thrift_processor{protocol = Proto0,
+                                         handler = Handler,
+                                         service = Service},
                 Function) ->
     InParams = Service:function_info(Function, params_type),
 
-    {ok, Params} = thrift_protocol:read(IProto, InParams),
+    {Proto1, {ok, Params}} = thrift_protocol:read(Proto0, InParams),
+    State1 = State0#thrift_processor{protocol = Proto1},
 
     try
         Result = Handler:handle_function(Function, Params),
         %% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]),
         %% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n",
         %%                       [Function, Params, Micro/1000.0]),
-        handle_success(State, Function, Result)
+        handle_success(State1, Function, Result)
     catch
         Type:Data when Type =:= throw orelse Type =:= error ->
-            handle_function_catch(State, Function, Type, Data)
+            handle_function_catch(State1, Function, Type, Data)
     end.
 
 handle_function_catch(State = #thrift_processor{service = Service},
@@ -83,39 +83,37 @@ handle_function_catch(State = #thrift_pr
             error_logger:warning_msg(
               "oneway void ~p threw error which must be ignored: ~p",
               [Function, {ErrType, ErrData, Stack}]),
-            ok;
+            {State, ok};
 
         {throw, Exception} when is_tuple(Exception), size(Exception) > 0 ->
             %error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]),
-            handle_exception(State, Function, Exception),
-            ok;   % we still want to accept more requests from this client
+            handle_exception(State, Function, Exception);
+            % we still want to accept more requests from this client
 
         {error, Error} ->
-            ok = handle_error(State, Function, Error)
+            handle_error(State, Function, Error)
     end.
 
-handle_success(State = #thrift_processor{out_protocol = OProto,
-                                         service = Service},
+handle_success(State = #thrift_processor{service = Service},
                Function,
                Result) ->
     ReplyType  = Service:function_info(Function, reply_type),
     StructName = atom_to_list(Function) ++ "_result",
 
-    ok = case Result of
-             {reply, ReplyData} ->
-                 Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}},
-                 send_reply(OProto, Function, ?tMessageType_REPLY, Reply);
-
-             ok when ReplyType == {struct, []} ->
-                 send_reply(OProto, Function, ?tMessageType_REPLY, {ReplyType, {StructName}});
-
-             ok when ReplyType == oneway_void ->
-                 %% no reply for oneway void
-                 ok
-         end.
+    case Result of
+        {reply, ReplyData} ->
+            Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}},
+            send_reply(State, Function, ?tMessageType_REPLY, Reply);
+
+        ok when ReplyType == {struct, []} ->
+            send_reply(State, Function, ?tMessageType_REPLY, {ReplyType, {StructName}});
+
+        ok when ReplyType == oneway_void ->
+            %% no reply for oneway void
+            {State, ok}
+    end.
 
-handle_exception(State = #thrift_processor{out_protocol = OProto,
-                                           service = Service},
+handle_exception(State = #thrift_processor{service = Service},
                  Function,
                  Exception) ->
     ExceptionType = element(1, Exception),
@@ -140,9 +138,9 @@ handle_exception(State = #thrift_process
                                                 % Make sure we got at least one defined
     case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of
         true ->
-            ok = handle_unknown_exception(State, Function, Exception);
+            handle_unknown_exception(State, Function, Exception);
         false ->
-            ok = send_reply(OProto, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple})
+            send_reply(State, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple})
     end.
 
 %%
@@ -153,7 +151,7 @@ handle_unknown_exception(State, Function
     handle_error(State, Function, {exception_not_declared_as_thrown,
                                    Exception}).
 
-handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) ->
+handle_error(State, Function, Error) ->
     Stack = erlang:get_stacktrace(),
     error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]),
 
@@ -169,14 +167,14 @@ handle_error(#thrift_processor{out_proto
              #'TApplicationException'{
                 message = Message,
                 type = ?TApplicationException_UNKNOWN}},
-    send_reply(OProto, Function, ?tMessageType_EXCEPTION, Reply).
+    send_reply(State, Function, ?tMessageType_EXCEPTION, Reply).
 
-send_reply(OProto, Function, ReplyMessageType, Reply) ->
-    ok = thrift_protocol:write(OProto, #protocol_message_begin{
-                                 name = atom_to_list(Function),
-                                 type = ReplyMessageType,
-                                 seqid = 0}),
-    ok = thrift_protocol:write(OProto, Reply),
-    ok = thrift_protocol:write(OProto, message_end),
-    ok = thrift_protocol:flush_transport(OProto),
-    ok.
+send_reply(State = #thrift_processor{protocol = Proto0}, Function, ReplyMessageType, Reply) ->
+    {Proto1, ok} = thrift_protocol:write(Proto0, #protocol_message_begin{
+                                           name = atom_to_list(Function),
+                                           type = ReplyMessageType,
+                                           seqid = 0}),
+    {Proto2, ok} = thrift_protocol:write(Proto1, Reply),
+    {Proto3, ok} = thrift_protocol:write(Proto2, message_end),
+    {Proto4, ok} = thrift_protocol:flush_transport(Proto3),
+    {State#thrift_processor{protocol = Proto4}, ok}.

Modified: incubator/thrift/trunk/lib/erl/src/thrift_server.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_server.erl?rev=990986&r1=990985&r2=990986&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_server.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_server.erl Mon Aug 30 22:05:38 2010
@@ -177,7 +177,7 @@ start_processor(Socket, Service, Handler
                        {ok, SocketTransport} = thrift_socket_transport:new(Socket),
                        {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
                        {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
-                       {ok, Protocol, Protocol}
+                       {ok, Protocol}
                end,
 
     spawn(thrift_processor, init, [{Server, ProtoGen, Service, Handler}]).

Modified: incubator/thrift/trunk/lib/erl/src/thrift_socket_server.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_socket_server.erl?rev=990986&r1=990985&r2=990986&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_socket_server.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_socket_server.erl Mon Aug 30 22:05:38 2010
@@ -188,7 +188,7 @@ acceptor_loop({Server, Listen, Service, 
                                        false -> thrift_buffered_transport:new(SocketTransport)
                                    end,
                                {ok, Protocol}          = thrift_binary_protocol:new(Transport),
-                               {ok, IProt=Protocol, OProt=Protocol}
+                               {ok, Protocol}
                        end,
             thrift_processor:init({Server, ProtoGen, Service, Handler});
         {error, closed} ->