You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/08/31 00:05:01 UTC

svn commit: r990957 [1/2] - in /incubator/thrift/trunk: compiler/cpp/src/generate/ lib/erl/ lib/erl/build/ lib/erl/include/ lib/erl/src/ test/erl/ test/erl/src/ tutorial/erl/

Author: dreiss
Date: Mon Aug 30 22:05:00 2010
New Revision: 990957

URL: http://svn.apache.org/viewvc?rev=990957&view=rev
Log:
Rollback a few recent Erlang changes to fix blame data

My combined patch for THRIFT-599 was committed, but it is preferable
commit the individual patches to preserve the more detailed log and
blame data.  I'll recommit r987018 as a sequence of patches and r988722
as its own rev.

Added:
    incubator/thrift/trunk/lib/erl/src/test_handler.erl
      - copied, changed from r990955, incubator/thrift/trunk/lib/erl/include/thrift_transport_behaviour.hrl
    incubator/thrift/trunk/lib/erl/src/test_service.erl
      - copied, changed from r990955, incubator/thrift/trunk/lib/erl/include/thrift_transport_behaviour.hrl
    incubator/thrift/trunk/test/erl/src/test_tether.erl
Removed:
    incubator/thrift/trunk/lib/erl/include/thrift_protocol_behaviour.hrl
    incubator/thrift/trunk/lib/erl/include/thrift_transport_behaviour.hrl
    incubator/thrift/trunk/lib/erl/src/thrift_client_util.erl
    incubator/thrift/trunk/lib/erl/src/thrift_transport_state_test.erl
    incubator/thrift/trunk/test/erl/src/test_client.erl
Modified:
    incubator/thrift/trunk/compiler/cpp/src/generate/t_erl_generator.cc
    incubator/thrift/trunk/lib/erl/README
    incubator/thrift/trunk/lib/erl/build/otp.mk
    incubator/thrift/trunk/lib/erl/include/thrift_protocol.hrl
    incubator/thrift/trunk/lib/erl/src/Makefile
    incubator/thrift/trunk/lib/erl/src/thrift_base64_transport.erl
    incubator/thrift/trunk/lib/erl/src/thrift_binary_protocol.erl
    incubator/thrift/trunk/lib/erl/src/thrift_buffered_transport.erl
    incubator/thrift/trunk/lib/erl/src/thrift_client.erl
    incubator/thrift/trunk/lib/erl/src/thrift_disk_log_transport.erl
    incubator/thrift/trunk/lib/erl/src/thrift_file_transport.erl
    incubator/thrift/trunk/lib/erl/src/thrift_framed_transport.erl
    incubator/thrift/trunk/lib/erl/src/thrift_http_transport.erl
    incubator/thrift/trunk/lib/erl/src/thrift_memory_buffer.erl
    incubator/thrift/trunk/lib/erl/src/thrift_processor.erl
    incubator/thrift/trunk/lib/erl/src/thrift_protocol.erl
    incubator/thrift/trunk/lib/erl/src/thrift_server.erl
    incubator/thrift/trunk/lib/erl/src/thrift_socket_server.erl
    incubator/thrift/trunk/lib/erl/src/thrift_socket_transport.erl
    incubator/thrift/trunk/lib/erl/src/thrift_transport.erl
    incubator/thrift/trunk/test/erl/Makefile
    incubator/thrift/trunk/test/erl/src/test_disklog.erl
    incubator/thrift/trunk/test/erl/src/test_membuffer.erl
    incubator/thrift/trunk/test/erl/src/test_server.erl
    incubator/thrift/trunk/tutorial/erl/client.erl

Modified: incubator/thrift/trunk/compiler/cpp/src/generate/t_erl_generator.cc
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/compiler/cpp/src/generate/t_erl_generator.cc?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/compiler/cpp/src/generate/t_erl_generator.cc (original)
+++ incubator/thrift/trunk/compiler/cpp/src/generate/t_erl_generator.cc Mon Aug 30 22:05:00 2010
@@ -649,8 +649,8 @@ void t_erl_generator::generate_service_i
                          << "_thrift:function_info(Function, InfoType)." << endl;
       indent_down();
   } else {
-      // Use a special return code for nonexistent functions
-      indent(f_service_) << "function_info(_Func, _Info) -> no_function." << endl;
+      // Dummy function_info so we don't worry about the ;s
+      indent(f_service_) << "function_info(xxx, dummy) -> dummy." << endl;
   }
 
   indent(f_service_) << endl;

Modified: incubator/thrift/trunk/lib/erl/README
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/README?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/README (original)
+++ incubator/thrift/trunk/lib/erl/README Mon Aug 30 22:05:00 2010
@@ -25,19 +25,32 @@ Example
 
 Example session using thrift_client:
 
-1> {ok, C0} = thrift_client_util:new("localhost", 9090, thriftTest_thrift, []), ok.
-ok
-2> {C1, R1} = thrift_client:call(C0, testVoid, []), R1.
+118> f(), {ok, C} = thrift_client:start_link("localhost", 9090, thriftTest_thrif
+t).
+{ok,<0.271.0>}
+119> thrift_client:call(C, testVoid, []).
 {ok,ok}
-3> {C2, R2} = thrift_client:call(C1, testVoid, [asdf]), R2.
+120> thrift_client:call(C, testVoid, [asdf]).
 {error,{bad_args,testVoid,[asdf]}}
-4> {C3, R3} = thrift_client:call(C2, testI32, [123]), R3.
+121> thrift_client:call(C, testI32, [123]).
 {ok,123}
-5> {C4, R4} = thrift_client:call(C3, testOneway, [1]), R4.
+122> thrift_client:call(C, testOneway, [1]).
 {ok,ok}
-6> {C5, R5} = thrift_client:call(C4, testXception, ["foo"]), R5.
+123> catch thrift_client:call(C, testXception, ["foo"]).
 {error,{no_function,testXception}}
-7> {C6, R6} = thrift_client:call(C5, testException, ["foo"]), R6.
+124> catch thrift_client:call(C, testException, ["foo"]).
 {ok,ok}
-8> {C7, R7} = (catch thrift_client:call(C6, testException, ["Xception"])), R7.
-{exception,{xception,1001,<<"Xception">>}}
+125> catch thrift_client:call(C, testException, ["Xception"]).
+{xception,1001,"This is an Xception"}
+126> thrift_client:call(C, testException, ["Xception"]).
+
+=ERROR REPORT==== 24-Feb-2008::23:00:23 ===
+Error in process <0.269.0> with exit value: {{nocatch,{xception,1001,"This is an
+ Xception"}},[{thrift_client,call,3},{erl_eval,do_apply,5},{shell,exprs,6},{shel
+l,eval_loop,3}]}
+
+** exited: {{nocatch,{xception,1001,"This is an Xception"}},
+            [{thrift_client,call,3},
+             {erl_eval,do_apply,5},
+             {shell,exprs,6},
+             {shell,eval_loop,3}]} **

Modified: incubator/thrift/trunk/lib/erl/build/otp.mk
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/build/otp.mk?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/build/otp.mk (original)
+++ incubator/thrift/trunk/lib/erl/build/otp.mk Mon Aug 30 22:05:00 2010
@@ -25,6 +25,7 @@ OS_TYPE=${shell uname}
 
 # MHOST is the host where this Makefile runs.
 MHOST=${shell hostname -s}
+ERL_COMPILE_FLAGS+=-W0
 
 # The location of the erlang runtime system.
 ifndef ERL_RUN_TOP

Modified: incubator/thrift/trunk/lib/erl/include/thrift_protocol.hrl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/include/thrift_protocol.hrl?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/include/thrift_protocol.hrl (original)
+++ incubator/thrift/trunk/lib/erl/include/thrift_protocol.hrl Mon Aug 30 22:05:00 2010
@@ -18,7 +18,7 @@
 %%
 
 -ifndef(THRIFT_PROTOCOL_INCLUDED).
--define(THRIFT_PROTOCOL_INCLUDED, true).
+-define(THRIFT_PROTOCOL_INCLUDED, yea).
 
 -record(protocol_message_begin, {name, type, seqid}).
 -record(protocol_struct_begin, {name}).
@@ -27,40 +27,5 @@
 -record(protocol_list_begin, {etype, size}).
 -record(protocol_set_begin, {etype, size}).
 
--type tprot_header_val() :: #protocol_message_begin{}
-                          | #protocol_struct_begin{}
-                          | #protocol_field_begin{}
-                          | #protocol_map_begin{}
-                          | #protocol_list_begin{}
-                          | #protocol_set_begin{}
-                          .
--type tprot_empty_tag() :: message_end
-                         | struct_begin
-                         | struct_end
-                         | field_end
-                         | map_end
-                         | list_end
-                         | set_end
-                         .
--type tprot_header_tag() :: message_begin
-                          | field_begin
-                          | map_begin
-                          | list_begin
-                          | set_begin
-                          .
--type tprot_data_tag() :: ui32
-                        | bool
-                        | byte
-                        | i16
-                        | i32
-                        | i64
-                        | double
-                        | string
-                        .
--type tprot_cont_tag() :: {list, _Type}
-                        | {map, _KType, _VType}
-                        | {set, _Type}
-                        .
-
 
 -endif.

Modified: incubator/thrift/trunk/lib/erl/src/Makefile
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/Makefile?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/Makefile (original)
+++ incubator/thrift/trunk/lib/erl/src/Makefile Mon Aug 30 22:05:00 2010
@@ -27,7 +27,6 @@ INSTALL_DST = $(ERLANG_OTP)/lib/$(APP_NA
 
 MODULES = $(shell find . -name \*.erl | sed 's:^\./::' | sed 's/\.erl//')
 MODULES_STRING_LIST = $(shell find . -name \*.erl | sed 's:^\./:":' | sed 's/\.erl/",/')
-BEHAV_MODULES = $(shell find . -name \*.erl | xargs grep -l behaviour_info | sed 's:^\./::' | sed 's/\.erl//')
 
 HRL_FILES=
 INTERNAL_HRL_FILES= $(APP_NAME).hrl
@@ -44,8 +43,7 @@ APP_TARGET= $(EBIN)/$(APP_FILE)
 APPUP_TARGET= $(EBIN)/$(APPUP_FILE)
 
 BEAMS= $(MODULES:%=$(EBIN)/%.$(EMULATOR))
-BEHAV_BEAMS= $(BEHAV_MODULES:%=$(EBIN)/%.$(EMULATOR))
-TARGET_FILES= $(BEHAV_BEAMS) $(BEAMS) $(APP_TARGET) $(APPUP_TARGET)
+TARGET_FILES= $(BEAMS) $(APP_TARGET) $(APPUP_TARGET)
 
 WEB_TARGET=/var/yaws/www/$(APP_NAME)
 
@@ -55,8 +53,7 @@ WEB_TARGET=/var/yaws/www/$(APP_NAME)
 
 ERL_FLAGS +=
 ERL_INCLUDE = -I../include -I../../fslib/include -I../../system_status/include
-ERL_BEHAV_PATH = -pz ../ebin
-ERL_COMPILE_FLAGS += $(ERL_INCLUDE) $(ERL_BEHAV_PATH)
+ERL_COMPILE_FLAGS += $(ERL_INCLUDE)
 
 # ----------------------------------------------------
 # Targets

Copied: incubator/thrift/trunk/lib/erl/src/test_handler.erl (from r990955, incubator/thrift/trunk/lib/erl/include/thrift_transport_behaviour.hrl)
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/test_handler.erl?p2=incubator/thrift/trunk/lib/erl/src/test_handler.erl&p1=incubator/thrift/trunk/lib/erl/include/thrift_transport_behaviour.hrl&r1=990955&r2=990957&rev=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/include/thrift_transport_behaviour.hrl (original)
+++ incubator/thrift/trunk/lib/erl/src/test_handler.erl Mon Aug 30 22:05:00 2010
@@ -17,15 +17,10 @@
 %% under the License.
 %%
 
-%% Signature specifications for transport implementations.
+-module(test_handler).
 
--ifndef(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED).
--define(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED, true).
+-export([handle_function/2]).
 
--spec write(state(), iolist() | binary()) -> {state(), ok | {error, _Reason}}.
--spec read(state(), non_neg_integer()) -> {state(), {ok, binary()} | {error, _Reason}}.
--spec flush(state()) -> {state(), ok | {error, _Reason}}.
--spec close(state()) -> {state(), ok | {error, _Reason}}.
-
-
--endif.
+handle_function(add, Params = {A, B}) ->
+    io:format("Got params: ~p~n", [Params]),
+    {reply, A + B}.

Copied: incubator/thrift/trunk/lib/erl/src/test_service.erl (from r990955, incubator/thrift/trunk/lib/erl/include/thrift_transport_behaviour.hrl)
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/test_service.erl?p2=incubator/thrift/trunk/lib/erl/src/test_service.erl&p1=incubator/thrift/trunk/lib/erl/include/thrift_transport_behaviour.hrl&r1=990955&r2=990957&rev=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/include/thrift_transport_behaviour.hrl (original)
+++ incubator/thrift/trunk/lib/erl/src/test_service.erl Mon Aug 30 22:05:00 2010
@@ -17,15 +17,13 @@
 %% under the License.
 %%
 
-%% Signature specifications for transport implementations.
+-module(test_service).
+%
+% Test service definition
 
--ifndef(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED).
--define(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED, true).
+-export([function_info/2]).
 
--spec write(state(), iolist() | binary()) -> {state(), ok | {error, _Reason}}.
--spec read(state(), non_neg_integer()) -> {state(), {ok, binary()} | {error, _Reason}}.
--spec flush(state()) -> {state(), ok | {error, _Reason}}.
--spec close(state()) -> {state(), ok | {error, _Reason}}.
-
-
--endif.
+function_info(add, params_type) ->
+    {struct, [{1, i32},
+              {2, i32}]};
+function_info(add, reply_type) -> i32.

Modified: incubator/thrift/trunk/lib/erl/src/thrift_base64_transport.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_base64_transport.erl?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_base64_transport.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_base64_transport.erl Mon Aug 30 22:05:00 2010
@@ -29,35 +29,30 @@
 
 %% State
 -record(b64_transport, {wrapped}).
--type state() :: #b64_transport{}.
--include("thrift_transport_behaviour.hrl").
 
 new(Wrapped) ->
     State = #b64_transport{wrapped = Wrapped},
     thrift_transport:new(?MODULE, State).
 
 
-write(This = #b64_transport{wrapped = Wrapped}, Data) ->
-    {NewWrapped, Result} = thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))),
-    {This#b64_transport{wrapped = NewWrapped}, Result}.
+write(#b64_transport{wrapped = Wrapped}, Data) ->
+    thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))).
 
 
 %% base64 doesn't support reading quite yet since it would involve
 %% nasty buffering and such
-read(This = #b64_transport{}, _Data) ->
-    {This, {error, no_reads_allowed}}.
+read(#b64_transport{wrapped = Wrapped}, Data) ->
+    {error, no_reads_allowed}.
 
 
-flush(This = #b64_transport{wrapped = Wrapped0}) ->
-    {Wrapped1, ok} = thrift_transport:write(Wrapped0, <<"\n">>),
-    {Wrapped2, ok} = thrift_transport:flush(Wrapped1),
-    {This#b64_transport{wrapped = Wrapped2}, ok}.
+flush(#b64_transport{wrapped = Wrapped}) ->
+    thrift_transport:write(Wrapped, <<"\n">>),
+    thrift_transport:flush(Wrapped).
 
 
-close(This0) ->
-    {This1 = #b64_transport{wrapped = Wrapped}, ok} = flush(This0),
-    {NewWrapped, ok} = thrift_transport:close(Wrapped),
-    {This1#b64_transport{wrapped = NewWrapped}, ok}.
+close(Me = #b64_transport{wrapped = Wrapped}) ->
+    flush(Me),
+    thrift_transport:close(Wrapped).
 
 
 %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

Modified: incubator/thrift/trunk/lib/erl/src/thrift_binary_protocol.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_binary_protocol.erl?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_binary_protocol.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_binary_protocol.erl Mon Aug 30 22:05:00 2010
@@ -19,7 +19,7 @@
 
 -module(thrift_binary_protocol).
 
--behaviour(thrift_protocol).
+-behavior(thrift_protocol).
 
 -include("thrift_constants.hrl").
 -include("thrift_protocol.hrl").
@@ -37,8 +37,6 @@
                           strict_read=true,
                           strict_write=true
                          }).
--type state() :: #binary_protocol{}.
--include("thrift_protocol_behaviour.hrl").
 
 -define(VERSION_MASK, 16#FFFF0000).
 -define(VERSION_1, 16#80010000).
@@ -60,81 +58,79 @@ parse_options([{strict_write, Bool} | Re
     parse_options(Rest, State#binary_protocol{strict_write=Bool}).
 
 
-flush_transport(This = #binary_protocol{transport = Transport}) ->
-    {NewTransport, Result} = thrift_transport:flush(Transport),
-    {This#binary_protocol{transport = NewTransport}, Result}.
-
-close_transport(This = #binary_protocol{transport = Transport}) ->
-    {NewTransport, Result} = thrift_transport:close(Transport),
-    {This#binary_protocol{transport = NewTransport}, Result}.
+flush_transport(#binary_protocol{transport = Transport}) ->
+    thrift_transport:flush(Transport).
+
+close_transport(#binary_protocol{transport = Transport}) ->
+    thrift_transport:close(Transport).
 
 %%%
 %%% instance methods
 %%%
 
-write(This0, #protocol_message_begin{
+write(This, #protocol_message_begin{
         name = Name,
         type = Type,
         seqid = Seqid}) ->
-    case This0#binary_protocol.strict_write of
+    case This#binary_protocol.strict_write of
         true ->
-            {This1, ok} = write(This0, {i32, ?VERSION_1 bor Type}),
-            {This2, ok} = write(This1, {string, Name}),
-            {This3, ok} = write(This2, {i32, Seqid}),
-            {This3, ok};
+            write(This, {i32, ?VERSION_1 bor Type}),
+            write(This, {string, Name}),
+            write(This, {i32, Seqid});
         false ->
-            {This1, ok} = write(This0, {string, Name}),
-            {This2, ok} = write(This1, {byte, Type}),
-            {This3, ok} = write(This2, {i32, Seqid}),
-            {This3, ok}
-    end;
+            write(This, {string, Name}),
+            write(This, {byte, Type}),
+            write(This, {i32, Seqid})
+    end,
+    ok;
 
-write(This, message_end) -> {This, ok};
+write(This, message_end) -> ok;
 
-write(This0, #protocol_field_begin{
+write(This, #protocol_field_begin{
        name = _Name,
        type = Type,
        id = Id}) ->
-    {This1, ok} = write(This0, {byte, Type}),
-    {This2, ok} = write(This1, {i16, Id}),
-    {This2, ok};
+    write(This, {byte, Type}),
+    write(This, {i16, Id}),
+    ok;
 
 write(This, field_stop) ->
-    write(This, {byte, ?tType_STOP});
+    write(This, {byte, ?tType_STOP}),
+    ok;
 
-write(This, field_end) -> {This, ok};
+write(This, field_end) -> ok;
 
-write(This0, #protocol_map_begin{
+write(This, #protocol_map_begin{
        ktype = Ktype,
        vtype = Vtype,
        size = Size}) ->
-    {This1, ok} = write(This0, {byte, Ktype}),
-    {This2, ok} = write(This1, {byte, Vtype}),
-    {This3, ok} = write(This2, {i32, Size}),
-    {This3, ok};
+    write(This, {byte, Ktype}),
+    write(This, {byte, Vtype}),
+    write(This, {i32, Size}),
+    ok;
 
-write(This, map_end) -> {This, ok};
+write(This, map_end) -> ok;
 
-write(This0, #protocol_list_begin{
+write(This, #protocol_list_begin{
         etype = Etype,
         size = Size}) ->
-    {This1, ok} = write(This0, {byte, Etype}),
-    {This2, ok} = write(This1, {i32, Size}),
-    {This2, ok};
+    write(This, {byte, Etype}),
+    write(This, {i32, Size}),
+    ok;
 
-write(This, list_end) -> {This, ok};
+write(This, list_end) -> ok;
 
-write(This0, #protocol_set_begin{
+write(This, #protocol_set_begin{
         etype = Etype,
         size = Size}) ->
-    {This1, ok} = write(This0, {byte, Etype}),
-    {This2, ok} = write(This1, {i32, Size}),
-    {This2, ok};
+    write(This, {byte, Etype}),
+    write(This, {i32, Size}),
+    ok;
 
-write(This, set_end) -> {This, ok};
+write(This, set_end) -> ok;
 
-write(This, #protocol_struct_begin{}) -> {This, ok};
-write(This, struct_end) -> {This, ok};
+write(This, #protocol_struct_begin{}) -> ok;
+write(This, struct_end) -> ok;
 
 write(This, {bool, true})  -> write(This, {byte, 1});
 write(This, {bool, false}) -> write(This, {byte, 0});
@@ -154,166 +150,152 @@ write(This, {i64, I64}) ->
 write(This, {double, Double}) ->
     write(This, <<Double:64/big-signed-float>>);
 
-write(This0, {string, Str}) when is_list(Str) ->
-    {This1, ok} = write(This0, {i32, length(Str)}),
-    {This2, ok} = write(This1, list_to_binary(Str)),
-    {This2, ok};
-
-write(This0, {string, Bin}) when is_binary(Bin) ->
-    {This1, ok} = write(This0, {i32, size(Bin)}),
-    {This2, ok} = write(This1, Bin),
-    {This2, ok};
+write(This, {string, Str}) when is_list(Str) ->
+    write(This, {i32, length(Str)}),
+    write(This, list_to_binary(Str));
+
+write(This, {string, Bin}) when is_binary(Bin) ->
+    write(This, {i32, size(Bin)}),
+    write(This, Bin);
 
 %% Data :: iolist()
-write(This = #binary_protocol{transport = Trans}, Data) ->
-    {NewTransport, Result} = thrift_transport:write(Trans, Data),
-    {This#binary_protocol{transport = NewTransport}, Result}.
+write(This, Data) ->
+    thrift_transport:write(This#binary_protocol.transport, Data).
 
 %%
 
-read(This0, message_begin) ->
-    {This1, Initial} = read(This0, ui32),
-    case Initial of
+read(This, message_begin) ->
+    case read(This, ui32) of
         {ok, Sz} when Sz band ?VERSION_MASK =:= ?VERSION_1 ->
             %% we're at version 1
-            {This2, {ok, Name}}  = read(This1, string),
-            {This3, {ok, SeqId}} = read(This2, i32),
-            Type                 = Sz band ?TYPE_MASK,
-            {This3, #protocol_message_begin{name  = binary_to_list(Name),
-                                            type  = Type,
-                                            seqid = SeqId}};
+            {ok, Name}  = read(This, string),
+            Type        = Sz band ?TYPE_MASK,
+            {ok, SeqId} = read(This, i32),
+            #protocol_message_begin{name  = binary_to_list(Name),
+                                    type  = Type,
+                                    seqid = SeqId};
 
         {ok, Sz} when Sz < 0 ->
             %% there's a version number but it's unexpected
-            {This1, {error, {bad_binary_protocol_version, Sz}}};
+            {error, {bad_binary_protocol_version, Sz}};
 
-        {ok, _Sz} when This1#binary_protocol.strict_read =:= true ->
+        {ok, Sz} when This#binary_protocol.strict_read =:= true ->
             %% strict_read is true and there's no version header; that's an error
-            {This1, {error, no_binary_protocol_version}};
+            {error, no_binary_protocol_version};
 
-        {ok, Sz} when This1#binary_protocol.strict_read =:= false ->
+        {ok, Sz} when This#binary_protocol.strict_read =:= false ->
             %% strict_read is false, so just read the old way
-            {This2, {ok, Name}}  = read_data(This1, Sz),
-            {This3, {ok, Type}}  = read(This2, byte),
-            {This4, {ok, SeqId}} = read(This3, i32),
-            {This4, #protocol_message_begin{name  = binary_to_list(Name),
-                                            type  = Type,
-                                            seqid = SeqId}};
-
-        Else ->
-            {This1, Else}
+            {ok, Name}  = read(This, Sz),
+            {ok, Type}  = read(This, byte),
+            {ok, SeqId} = read(This, i32),
+            #protocol_message_begin{name  = binary_to_list(Name),
+                                    type  = Type,
+                                    seqid = SeqId};
+
+        Err = {error, closed} -> Err;
+        Err = {error, timeout}-> Err;
+        Err = {error, ebadf}  -> Err
     end;
 
-read(This, message_end) -> {This, ok};
+read(This, message_end) -> ok;
 
-read(This, struct_begin) -> {This, ok};
-read(This, struct_end) -> {This, ok};
+read(This, struct_begin) -> ok;
+read(This, struct_end) -> ok;
 
-read(This0, field_begin) ->
-    {This1, Result} = read(This0, byte),
-    case Result of
+read(This, field_begin) ->
+    case read(This, byte) of
         {ok, Type = ?tType_STOP} ->
-            {This1, #protocol_field_begin{type = Type}};
+            #protocol_field_begin{type = Type};
         {ok, Type} ->
-            {This2, {ok, Id}} = read(This1, i16),
-            {This2, #protocol_field_begin{type = Type,
-                                          id = Id}}
+            {ok, Id} = read(This, i16),
+            #protocol_field_begin{type = Type,
+                                  id = Id}
     end;
 
-read(This, field_end) -> {This, ok};
+read(This, field_end) -> ok;
 
-read(This0, map_begin) ->
-    {This1, {ok, Ktype}} = read(This0, byte),
-    {This2, {ok, Vtype}} = read(This1, byte),
-    {This3, {ok, Size}}  = read(This2, i32),
-    {This3, #protocol_map_begin{ktype = Ktype,
-                                vtype = Vtype,
-                                size = Size}};
-read(This, map_end) -> {This, ok};
-
-read(This0, list_begin) ->
-    {This1, {ok, Etype}} = read(This0, byte),
-    {This2, {ok, Size}}  = read(This1, i32),
-    {This2, #protocol_list_begin{etype = Etype,
-                                 size = Size}};
-read(This, list_end) -> {This, ok};
-
-read(This0, set_begin) ->
-    {This1, {ok, Etype}} = read(This0, byte),
-    {This2, {ok, Size}}  = read(This1, i32),
-    {This2, #protocol_set_begin{etype = Etype,
-                                 size = Size}};
-read(This, set_end) -> {This, ok};
-
-read(This0, field_stop) ->
-    {This1, {ok, ?tType_STOP}} = read(This0, byte),
-    {This1, ok};
+read(This, map_begin) ->
+    {ok, Ktype} = read(This, byte),
+    {ok, Vtype} = read(This, byte),
+    {ok, Size}  = read(This, i32),
+    #protocol_map_begin{ktype = Ktype,
+                        vtype = Vtype,
+                        size = Size};
+read(This, map_end) -> ok;
+
+read(This, list_begin) ->
+    {ok, Etype} = read(This, byte),
+    {ok, Size}  = read(This, i32),
+    #protocol_list_begin{etype = Etype,
+                         size = Size};
+read(This, list_end) -> ok;
+
+read(This, set_begin) ->
+    {ok, Etype} = read(This, byte),
+    {ok, Size}  = read(This, i32),
+    #protocol_set_begin{etype = Etype,
+                        size = Size};
+read(This, set_end) -> ok;
+
+read(This, field_stop) ->
+    {ok, ?tType_STOP} =  read(This, byte),
+    ok;
 
 %%
 
-read(This0, bool) ->
-    {This1, Result} = read(This0, byte),
-    case Result of
-        {ok, Byte} -> {This1, {ok, Byte /= 0}};
-        Else -> {This1, Else}
+read(This, bool) ->
+    case read(This, byte) of
+        {ok, Byte} -> {ok, Byte /= 0};
+        Else -> Else
     end;
 
-read(This0, byte) ->
-    {This1, Bytes} = read_data(This0, 1),
-    case Bytes of
-        {ok, <<Val:8/integer-signed-big, _/binary>>} -> {This1, {ok, Val}};
-        Else -> {This1, Else}
+read(This, byte) ->
+    case read(This, 1) of
+        {ok, <<Val:8/integer-signed-big, _/binary>>} -> {ok, Val};
+        Else -> Else
     end;
 
-read(This0, i16) ->
-    {This1, Bytes} = read_data(This0, 2),
-    case Bytes of
-        {ok, <<Val:16/integer-signed-big, _/binary>>} -> {This1, {ok, Val}};
-        Else -> {This1, Else}
+read(This, i16) ->
+    case read(This, 2) of
+        {ok, <<Val:16/integer-signed-big, _/binary>>} -> {ok, Val};
+        Else -> Else
     end;
 
-read(This0, i32) ->
-    {This1, Bytes} = read_data(This0, 4),
-    case Bytes of
-        {ok, <<Val:32/integer-signed-big, _/binary>>} -> {This1, {ok, Val}};
-        Else -> {This1, Else}
+read(This, i32) ->
+    case read(This, 4) of
+        {ok, <<Val:32/integer-signed-big, _/binary>>} -> {ok, Val};
+        Else -> Else
     end;
 
 %% unsigned ints aren't used by thrift itself, but it's used for the parsing
 %% of the packet version header. Without this special function BEAM works fine
 %% but hipe thinks it received a bad version header.
-read(This0, ui32) ->
-    {This1, Bytes} = read_data(This0, 4),
-    case Bytes of
-        {ok, <<Val:32/integer-unsigned-big, _/binary>>} -> {This1, {ok, Val}};
-        Else -> {This1, Else}
+read(This, ui32) ->
+    case read(This, 4) of
+        {ok, <<Val:32/integer-unsigned-big, _/binary>>} -> {ok, Val};
+        Else -> Else
     end;
 
-read(This0, i64) ->
-    {This1, Bytes} = read_data(This0, 8),
-    case Bytes of
-        {ok, <<Val:64/integer-signed-big, _/binary>>} -> {This1, {ok, Val}};
-        Else -> {This1, Else}
+read(This, i64) ->
+    case read(This, 8) of
+        {ok, <<Val:64/integer-signed-big, _/binary>>} -> {ok, Val};
+        Else -> Else
     end;
 
-read(This0, double) ->
-    {This1, Bytes} = read_data(This0, 8),
-    case Bytes of
-        {ok, <<Val:64/float-signed-big, _/binary>>} -> {This1, {ok, Val}};
-        Else -> {This1, Else}
+read(This, double) ->
+    case read(This, 8) of
+        {ok, <<Val:64/float-signed-big, _/binary>>} -> {ok, Val};
+        Else -> Else
     end;
 
 % returns a binary directly, call binary_to_list if necessary
-read(This0, string) ->
-    {This1, {ok, Sz}}  = read(This0, i32),
-    read_data(This1, Sz).
-
--spec read_data(#binary_protocol{}, non_neg_integer()) ->
-    {#binary_protocol{}, {ok, binary()} | {error, _Reason}}.
-read_data(This, 0) -> {This, {ok, <<>>}};
-read_data(This = #binary_protocol{transport = Trans}, Len) when is_integer(Len) andalso Len > 0 ->
-    {NewTransport, Result} = thrift_transport:read(Trans, Len),
-    {This#binary_protocol{transport = NewTransport}, Result}.
+read(This, string) ->
+    {ok, Sz}  = read(This, i32),
+    {ok, Bin} = read(This, Sz);
+
+read(This, 0) -> {ok, <<>>};
+read(This, Len) when is_integer(Len), Len >= 0 ->
+    thrift_transport:read(This#binary_protocol.transport, Len).
 
 
 %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

Modified: incubator/thrift/trunk/lib/erl/src/thrift_buffered_transport.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_buffered_transport.erl?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_buffered_transport.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_buffered_transport.erl Mon Aug 30 22:05:00 2010
@@ -19,51 +19,154 @@
 
 -module(thrift_buffered_transport).
 
+-behaviour(gen_server).
 -behaviour(thrift_transport).
 
 %% API
 -export([new/1, new_transport_factory/1]).
 
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
 %% thrift_transport callbacks
 -export([write/2, read/2, flush/1, close/1]).
 
 -record(buffered_transport, {wrapped, % a thrift_transport
                              write_buffer % iolist()
                             }).
--type state() :: #buffered_transport{}.
--include("thrift_transport_behaviour.hrl").
-
 
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
 new(WrappedTransport) ->
-    State = #buffered_transport{wrapped = WrappedTransport,
-                                write_buffer = []},
-    thrift_transport:new(?MODULE, State).
-
-
-%% Writes data into the buffer
-write(State = #buffered_transport{write_buffer = WBuf}, Data) ->
-    {State#buffered_transport{write_buffer = [WBuf, Data]}, ok}.
-
-%% Flushes the buffer through to the wrapped transport
-flush(State = #buffered_transport{write_buffer = WBuf,
-                                  wrapped = Wrapped0}) ->
-    {Wrapped1, Response} = thrift_transport:write(Wrapped0, WBuf),
-    {Wrapped2, _} = thrift_transport:flush(Wrapped1),
-    NewState = State#buffered_transport{write_buffer = [],
-                                        wrapped = Wrapped2},
-    {NewState, Response}.
-
-%% Closes the transport and the wrapped transport
-close(State = #buffered_transport{wrapped = Wrapped0}) ->
-    {Wrapped1, Result} = thrift_transport:close(Wrapped0),
-    NewState = State#buffered_transport{wrapped = Wrapped1},
-    {NewState, Result}.
-
-%% Reads data through from the wrapped transport
-read(State = #buffered_transport{wrapped = Wrapped0}, Len) when is_integer(Len) ->
-    {Wrapped1, Response} = thrift_transport:read(Wrapped0, Len),
-    NewState = State#buffered_transport{wrapped = Wrapped1},
-    {NewState, Response}.
+    case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+        {ok, Pid} ->
+            thrift_transport:new(?MODULE, Pid);
+        Else ->
+            Else
+    end.
+
+
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+    gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+    gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+    gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+    gen_server:call(Transport, {read, Len}, _Timeout=10000).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Wrapped]) ->
+    {ok, #buffered_transport{wrapped = Wrapped,
+                             write_buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #buffered_transport{write_buffer = WBuf}) ->
+    {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}};
+
+handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) ->
+    Response = thrift_transport:read(Wrapped, Len),
+    {reply, Response, State};
+
+handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf,
+                                                      wrapped = Wrapped}) ->
+    Response = thrift_transport:write(Wrapped, WBuf),
+    thrift_transport:flush(Wrapped),
+    {reply, Response, State#buffered_transport{write_buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(close, State = #buffered_transport{write_buffer = WBuf,
+                                               wrapped = Wrapped}) ->
+    thrift_transport:write(Wrapped, WBuf),
+    %% Wrapped is closed by terminate/2
+    %%  error_logger:info_msg("thrift_buffered_transport ~p: closing", [self()]),
+    {stop, normal, State};
+handle_cast(Msg, State=#buffered_transport{}) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, State = #buffered_transport{wrapped=Wrapped}) ->
+    thrift_transport:close(Wrapped),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions

Modified: incubator/thrift/trunk/lib/erl/src/thrift_client.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_client.erl?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_client.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_client.erl Mon Aug 30 22:05:00 2010
@@ -19,127 +19,366 @@
 
 -module(thrift_client).
 
+-behaviour(gen_server).
+
 %% API
--export([new/2, call/3, send_call/3, close/1]).
+-export([start_link/2, start_link/3, start_link/4,
+         start/3, start/4,
+         call/3, send_call/3, close/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
 
 -include("thrift_constants.hrl").
 -include("thrift_protocol.hrl").
 
--record(tclient, {service, protocol, seqid}).
+-record(state, {service, protocol, seqid}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server as a linked process.
+%%--------------------------------------------------------------------
+start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) ->
+    start_link(Host, Port, Service, []).
+
+start_link(Host, Port, Service, Options) ->
+    start(Host, Port, Service, [{monitor, link} | Options]).
+
+start_link(ProtocolFactory, Service) ->
+    start(ProtocolFactory, Service, [{monitor, link}]).
+
+%%
+%% Splits client options into protocol options and transport options
+%%
+%% split_options([Options...]) -> {ProtocolOptions, TransportOptions}
+%%
+split_options(Options) ->
+    split_options(Options, [], [], []).
+
+split_options([], ClientIn, ProtoIn, TransIn) ->
+    {ClientIn, ProtoIn, TransIn};
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+  when OptKey =:= monitor ->
+    split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn);
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+  when OptKey =:= strict_read;
+       OptKey =:= strict_write ->
+    split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn);
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+  when OptKey =:= framed;
+       OptKey =:= connect_timeout;
+       OptKey =:= sockopts ->
+    split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]).
+
 
+%%--------------------------------------------------------------------
+%% Function: start() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server as an unlinked process.
+%%--------------------------------------------------------------------
 
-new(Protocol, Service)
-  when is_atom(Service) ->
-    {ok, #tclient{protocol = Protocol,
-                  service = Service,
-                  seqid = 0}}.
-
--spec call(#tclient{}, atom(), list()) -> {#tclient{}, {ok, any()} | {error, any()}}.
-call(Client = #tclient{}, Function, Args)
-  when is_atom(Function), is_list(Args) ->
-    case send_function_call(Client, Function, Args) of
-        {Client1, ok} ->
-            receive_function_result(Client1, Function);
-        Else ->
-            Else
+%% Backwards-compatible starter for the common-case of socket transports
+start(Host, Port, Service, Options)
+  when is_integer(Port), is_atom(Service), is_list(Options) ->
+    {ClientOpts, ProtoOpts, TransOpts} = split_options(Options),
+
+    {ok, TransportFactory} =
+        thrift_socket_transport:new_transport_factory(Host, Port, TransOpts),
+
+    {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
+                              TransportFactory, ProtoOpts),
+
+    start(ProtocolFactory, Service, ClientOpts).
+
+
+%% ProtocolFactory :: fun() -> thrift_protocol()
+start(ProtocolFactory, Service, ClientOpts)
+  when is_function(ProtocolFactory), is_atom(Service) ->
+    {Starter, Opts} =
+        case lists:keysearch(monitor, 1, ClientOpts) of
+            {value, {monitor, link}} ->
+                {start_link, []};
+            {value, {monitor, tether}} ->
+                {start, [{tether, self()}]};
+            _ ->
+                {start, []}
+        end,
+
+    Connect =
+        case lists:keysearch(connect, 1, ClientOpts) of
+            {value, {connect, Choice}} ->
+                Choice;
+            _ ->
+                %% By default, connect at creation-time.
+                true
+        end,
+
+
+    Started = gen_server:Starter(?MODULE, [Service, Opts], []),
+
+    if
+        Connect ->
+            case Started of
+                {ok, Pid} ->
+                    case gen_server:call(Pid, {connect, ProtocolFactory}) of
+                        ok ->
+                            {ok, Pid};
+                        Error ->
+                            Error
+                    end;
+                Else ->
+                    Else
+            end;
+        true ->
+            Started
     end.
 
+call(Client, Function, Args)
+  when is_pid(Client), is_atom(Function), is_list(Args) ->
+    case gen_server:call(Client, {call, Function, Args}) of
+        R = {ok, _} -> R;
+        R = {error, _} -> R;
+        {exception, Exception} -> throw(Exception)
+    end.
+
+cast(Client, Function, Args)
+  when is_pid(Client), is_atom(Function), is_list(Args) ->
+    gen_server:cast(Client, {call, Function, Args}).
 
 %% Sends a function call but does not read the result. This is useful
 %% if you're trying to log non-oneway function calls to write-only
 %% transports like thrift_disk_log_transport.
--spec send_call(#tclient{}, atom(), list()) -> {#tclient{}, ok}.
-send_call(Client = #tclient{}, Function, Args)
-  when is_atom(Function), is_list(Args) ->
-    send_function_call(Client, Function, Args).
-
--spec close(#tclient{}) -> ok.
-close(#tclient{protocol=Protocol}) ->
-    thrift_protocol:close_transport(Protocol).
+send_call(Client, Function, Args)
+  when is_pid(Client), is_atom(Function), is_list(Args) ->
+    gen_server:call(Client, {send_call, Function, Args}).
+
+close(Client) when is_pid(Client) ->
+    gen_server:cast(Client, close).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
 
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Service, Opts]) ->
+    case lists:keysearch(tether, 1, Opts) of
+        {value, {tether, Pid}} ->
+            erlang:monitor(process, Pid);
+        _Else ->
+            ok
+    end,
+    {ok, #state{service = Service}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({connect, ProtocolFactory}, _From,
+            State = #state{service = Service}) ->
+    case ProtocolFactory() of
+        {ok, Protocol} ->
+            {reply, ok, State#state{protocol = Protocol,
+                                    seqid = 0}};
+        Error ->
+            {stop, normal, Error, State}
+    end;
+
+handle_call({call, Function, Args}, _From, State = #state{service = Service}) ->
+    Result = catch_function_exceptions(
+               fun() ->
+                       ok = send_function_call(State, Function, Args),
+                       receive_function_result(State, Function)
+               end,
+               Service),
+    {reply, Result, State};
+
+
+handle_call({send_call, Function, Args}, _From, State = #state{service = Service}) ->
+    Result = catch_function_exceptions(
+               fun() ->
+                       send_function_call(State, Function, Args)
+               end,
+               Service),
+    {reply, Result, State}.
+
+
+%% Helper function that catches exceptions thrown by sending or receiving
+%% a function and returns the correct response for call or send_only above.
+catch_function_exceptions(Fun, Service) ->
+    try
+        Fun()
+    catch
+        throw:{return, Return} ->
+            Return;
+          error:function_clause ->
+            ST = erlang:get_stacktrace(),
+            case hd(ST) of
+                {Service, function_info, [Function, _]} ->
+                    {error, {no_function, Function}};
+                _ -> throw({error, {function_clause, ST}})
+            end
+    end.
+
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast({call, Function, Args}, State = #state{service = Service,
+                                                   protocol = Protocol,
+                                                   seqid = SeqId}) ->
+    _Result =
+        try
+            ok = send_function_call(State, Function, Args),
+            receive_function_result(State, Function)
+        catch
+            Class:Reason ->
+                error_logger:error_msg("error ignored in handle_cast({cast,...},...): ~p:~p~n", [Class, Reason])
+        end,
+
+    {noreply, State};
+
+handle_cast(close, State=#state{protocol = Protocol}) ->
+%%     error_logger:info_msg("thrift_client ~p received close", [self()]),
+    {stop,normal,State};
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({'DOWN', MonitorRef, process, Pid, _Info}, State)
+  when is_reference(MonitorRef), is_pid(Pid) ->
+    %% We don't actually verify the correctness of the DOWN message.
+    {stop, parent_died, State};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(Reason, State = #state{protocol=undefined}) ->
+    ok;
+terminate(Reason, State = #state{protocol=Protocol}) ->
+    thrift_protocol:close_transport(Protocol),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
--spec send_function_call(#tclient{}, atom(), list()) -> {#tclient{}, ok | {error, any()}}.
-send_function_call(Client = #tclient{protocol = Proto0,
-                                     service  = Service,
-                                     seqid    = SeqId},
+send_function_call(#state{protocol = Proto,
+                          service  = Service,
+                          seqid    = SeqId},
                    Function,
                    Args) ->
     Params = Service:function_info(Function, params_type),
-    case Params of
-        no_function ->
-            {Client, {error, {no_function, Function}}};
-        {struct, PList} when length(PList) =/= length(Args) ->
-            {Client, {error, {bad_args, Function, Args}}};
-        {struct, _PList} ->
-            Begin = #protocol_message_begin{name = atom_to_list(Function),
-                                            type = ?tMessageType_CALL,
-                                            seqid = SeqId},
-            {Proto1, ok} = thrift_protocol:write(Proto0, Begin),
-            {Proto2, ok} = thrift_protocol:write(Proto1, {Params, list_to_tuple([Function | Args])}),
-            {Proto3, ok} = thrift_protocol:write(Proto2, message_end),
-            {Proto4, ok} = thrift_protocol:flush_transport(Proto3),
-            {Client#tclient{protocol = Proto4}, ok}
-    end.
-
--spec receive_function_result(#tclient{}, atom()) -> {#tclient{}, {ok, any()} | {error, any()}}.
-receive_function_result(Client = #tclient{service = Service}, Function) ->
+    {struct, PList} = Params,
+    if
+        length(PList) =/= length(Args) ->
+            throw({return, {error, {bad_args, Function, Args}}});
+        true -> ok
+    end,
+
+    Begin = #protocol_message_begin{name = atom_to_list(Function),
+                                    type = ?tMessageType_CALL,
+                                    seqid = SeqId},
+    ok = thrift_protocol:write(Proto, Begin),
+    ok = thrift_protocol:write(Proto, {Params, list_to_tuple([Function | Args])}),
+    ok = thrift_protocol:write(Proto, message_end),
+    thrift_protocol:flush_transport(Proto),
+    ok.
+
+receive_function_result(State = #state{protocol = Proto,
+                                       service = Service},
+                        Function) ->
     ResultType = Service:function_info(Function, reply_type),
-    read_result(Client, Function, ResultType).
+    read_result(State, Function, ResultType).
 
-read_result(Client, _Function, oneway_void) ->
-    {Client, {ok, ok}};
+read_result(_State,
+            _Function,
+            oneway_void) ->
+    {ok, ok};
 
-read_result(Client = #tclient{protocol = Proto0,
-                              seqid    = SeqId},
+read_result(State = #state{protocol = Proto,
+                           seqid    = SeqId},
             Function,
             ReplyType) ->
-    {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin),
-    NewClient = Client#tclient{protocol = Proto1},
-    case MessageBegin of
+    case thrift_protocol:read(Proto, message_begin) of
         #protocol_message_begin{seqid = RetSeqId} when RetSeqId =/= SeqId ->
-            {NewClient, {error, {bad_seq_id, SeqId}}};
+            {error, {bad_seq_id, SeqId}};
 
         #protocol_message_begin{type = ?tMessageType_EXCEPTION} ->
-            handle_application_exception(NewClient);
+            handle_application_exception(State);
 
         #protocol_message_begin{type = ?tMessageType_REPLY} ->
-            handle_reply(NewClient, Function, ReplyType)
+            handle_reply(State, Function, ReplyType)
     end.
 
-
-handle_reply(Client = #tclient{protocol = Proto0,
-                               service = Service},
+handle_reply(State = #state{protocol = Proto,
+                            service = Service},
              Function,
              ReplyType) ->
     {struct, ExceptionFields} = Service:function_info(Function, exceptions),
     ReplyStructDef = {struct, [{0, ReplyType}] ++ ExceptionFields},
-    {Proto1, {ok, Reply}} = thrift_protocol:read(Proto0, ReplyStructDef),
-    {Proto2, ok} = thrift_protocol:read(Proto1, message_end),
-    NewClient = Client#tclient{protocol = Proto2},
+    {ok, Reply} = thrift_protocol:read(Proto, ReplyStructDef),
     ReplyList = tuple_to_list(Reply),
     true = length(ReplyList) == length(ExceptionFields) + 1,
     ExceptionVals = tl(ReplyList),
     Thrown = [X || X <- ExceptionVals,
                    X =/= undefined],
-    case Thrown of
-        [] when ReplyType == {struct, []} ->
-            {NewClient, {ok, ok}};
-        [] ->
-            {NewClient, {ok, hd(ReplyList)}};
-        [Exception] ->
-            throw({NewClient, {exception, Exception}})
-    end.
-
-handle_application_exception(Client = #tclient{protocol = Proto0}) ->
-    {Proto1, {ok, Exception}} =
-        thrift_protocol:read(Proto0, ?TApplicationException_Structure),
-    {Proto2, ok} = thrift_protocol:read(Proto1, message_end),
+    Result =
+        case Thrown of
+            [] when ReplyType == {struct, []} ->
+                {ok, ok};
+            [] ->
+                {ok, hd(ReplyList)};
+            [Exception] ->
+                {exception, Exception}
+        end,
+    ok = thrift_protocol:read(Proto, message_end),
+    Result.
+
+handle_application_exception(State = #state{protocol = Proto}) ->
+    {ok, Exception} = thrift_protocol:read(Proto,
+                                           ?TApplicationException_Structure),
+    ok = thrift_protocol:read(Proto, message_end),
     XRecord = list_to_tuple(
                 ['TApplicationException' | tuple_to_list(Exception)]),
     error_logger:error_msg("X: ~p~n", [XRecord]),
     true = is_record(XRecord, 'TApplicationException'),
-    NewClient = Client#tclient{protocol = Proto2},
-    throw({NewClient, {exception, XRecord}}).
+    {exception, XRecord}.

Modified: incubator/thrift/trunk/lib/erl/src/thrift_disk_log_transport.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_disk_log_transport.erl?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_disk_log_transport.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_disk_log_transport.erl Mon Aug 30 22:05:00 2010
@@ -35,8 +35,6 @@
                        close_on_close = false,
                        sync_every = infinity,
                        sync_tref}).
--type state() :: #dl_transport{}.
--include("thrift_transport_behaviour.hrl").
 
 
 %% Create a transport attached to an already open log.
@@ -49,7 +47,7 @@ new(LogName, Opts) when is_atom(LogName)
     State2 =
         case State#dl_transport.sync_every of
             N when is_integer(N), N > 0 ->
-                {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, [State]),
+                {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, State),
                 State#dl_transport{sync_tref = TRef};
             _ -> State
         end,
@@ -60,41 +58,38 @@ new(LogName, Opts) when is_atom(LogName)
 parse_opts([], State) ->
     State;
 parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) ->
-    parse_opts(Rest, State#dl_transport{close_on_close = Bool});
+    State#dl_transport{close_on_close = Bool};
 parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 ->
-    parse_opts(Rest, State#dl_transport{sync_every = Int}).
+    State#dl_transport{sync_every = Int}.
 
 
 %%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
 %% disk_log_transport is write-only
-read(State, _Len) ->
-    {State, {error, no_read_from_disk_log}}.
+read(_State, Len) ->
+    {error, no_read_from_disk_log}.
 
-write(This = #dl_transport{log = Log}, Data) ->
-    {This, disk_log:balog(Log, erlang:iolist_to_binary(Data))}.
+write(#dl_transport{log = Log}, Data) ->
+    disk_log:balog(Log, erlang:iolist_to_binary(Data)).
 
 force_flush(#dl_transport{log = Log}) ->
     error_logger:info_msg("~p syncing~n", [?MODULE]),
     disk_log:sync(Log).
 
-flush(This = #dl_transport{log = Log, sync_every = SE}) ->
+flush(#dl_transport{log = Log, sync_every = SE}) ->
     case SE of
         undefined -> % no time-based sync
             disk_log:sync(Log);
         _Else ->     % sync will happen automagically
             ok
-    end,
-    {This, ok}.
-
-
+    end.
 
 
 %% On close, close the underlying log if we're configured to do so.
-close(This = #dl_transport{close_on_close = false}) ->
-    {This, ok};
-close(This = #dl_transport{log = Log}) ->
-    {This, disk_log:lclose(Log)}.
+close(#dl_transport{close_on_close = false}) ->
+    ok;
+close(#dl_transport{log = Log}) ->
+    disk_log:lclose(Log).
 
 
 %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -114,10 +109,10 @@ factory_impl(Name, ExtraLogOpts, Transpo
                ExtraLogOpts],
     Log =
         case disk_log:open(LogOpts) of
-            {ok, LogS} ->
-                LogS;
-            {repaired, LogS, Info1, Info2} ->
-                error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [LogS, Info1, Info2]),
-                LogS
+            {ok, Log} ->
+                Log;
+            {repaired, Log, Info1, Info2} ->
+                error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]),
+                Log
         end,
     new(Log, TransportOpts).

Modified: incubator/thrift/trunk/lib/erl/src/thrift_file_transport.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_file_transport.erl?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_file_transport.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_file_transport.erl Mon Aug 30 22:05:00 2010
@@ -29,8 +29,6 @@
 -record(t_file_transport, {device,
                            should_close = true,
                            mode = write}).
--type state() :: #t_file_transport{}.
--include("thrift_transport_behaviour.hrl").
 
 %%%% CONSTRUCTION   %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
@@ -65,25 +63,25 @@ parse_opts([], State) ->
 
 %%%% TRANSPORT IMPL %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
-write(This = #t_file_transport{device = Device, mode = write}, Data) ->
-    {This, file:write(Device, Data)};
-write(This, _D) ->
-    {This, {error, read_mode}}.
+write(#t_file_transport{device = Device, mode = write}, Data) ->
+    file:write(Device, Data);
+write(_T, _D) ->
+    {error, read_mode}.
 
 
-read(This = #t_file_transport{device = Device, mode = read}, Len)
+read(#t_file_transport{device = Device, mode = read}, Len)
   when is_integer(Len), Len >= 0 ->
-    {This, file:read(Device, Len)};
-read(This, _D) ->
-    {This, {error, read_mode}}.
+    file:read(Device, Len);
+read(_T, _D) ->
+    {error, read_mode}.
 
-flush(This = #t_file_transport{device = Device, mode = write}) ->
-    {This, file:sync(Device)}.
+flush(#t_file_transport{device = Device, mode = write}) ->
+    file:sync(Device).
 
-close(This = #t_file_transport{device = Device, should_close = SC}) ->
+close(#t_file_transport{device = Device, should_close = SC}) ->
     case SC of
         true ->
-            {This, file:close(Device)};
+            file:close(Device);
         false ->
-            {This, ok}
+            ok
     end.

Modified: incubator/thrift/trunk/lib/erl/src/thrift_framed_transport.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_framed_transport.erl?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_framed_transport.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_framed_transport.erl Mon Aug 30 22:05:00 2010
@@ -19,11 +19,16 @@
 
 -module(thrift_framed_transport).
 
+-behaviour(gen_server).
 -behaviour(thrift_transport).
 
 %% API
 -export([new/1]).
 
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
 %% thrift_transport callbacks
 -export([write/2, read/2, flush/1, close/1]).
 
@@ -31,55 +36,102 @@
                            read_buffer, % iolist()
                            write_buffer % iolist()
                           }).
--type state() :: #framed_transport{}.
--include("thrift_transport_behaviour.hrl").
 
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
 new(WrappedTransport) ->
-    State = #framed_transport{wrapped = WrappedTransport,
-                              read_buffer = [],
-                              write_buffer = []},
-    thrift_transport:new(?MODULE, State).
-
-%% Writes data into the buffer
-write(State = #framed_transport{write_buffer = WBuf}, Data) ->
-    {State#framed_transport{write_buffer = [WBuf, Data]}, ok}.
-
-%% Flushes the buffer through to the wrapped transport
-flush(State0 = #framed_transport{write_buffer = Buffer,
-                                   wrapped = Wrapped0}) ->
-    FrameLen = iolist_size(Buffer),
-    Data     = [<<FrameLen:32/integer-signed-big>>, Buffer],
+    case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+        {ok, Pid} ->
+            thrift_transport:new(?MODULE, Pid);
+        Else ->
+            Else
+    end.
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+    gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+    gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+    gen_server:cast(Transport, close).
 
-    {Wrapped1, Response} = thrift_transport:write(Wrapped0, Data),
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+    gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
 
-    {Wrapped2, _} = thrift_transport:flush(Wrapped1),
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Wrapped]) ->
+    {ok, #framed_transport{wrapped = Wrapped,
+                           read_buffer = [],
+                           write_buffer = []}}.
 
-    State1 = State0#framed_transport{wrapped = Wrapped2, write_buffer = []},
-    {State1, Response}.
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) ->
+    {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}};
 
-%% Closes the transport and the wrapped transport
-close(State = #framed_transport{wrapped = Wrapped0}) ->
-    {Wrapped1, Result} = thrift_transport:close(Wrapped0),
-    NewState = State#framed_transport{wrapped = Wrapped1},
-    {NewState, Result}.
-
-%% Reads data through from the wrapped transport
-read(State0 = #framed_transport{wrapped = Wrapped0, read_buffer = RBuf},
-     Len) when is_integer(Len) ->
-    {Wrapped1, {RBuf1, RBuf1Size}} =
+handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped,
+                                                          read_buffer = RBuf}) ->
+    {RBuf1, RBuf1Size} =
         %% if the read buffer is empty, read another frame
         %% otherwise, just read from what's left in the buffer
         case iolist_size(RBuf) of
             0 ->
                 %% read the frame length
-                {WrappedS1, {ok, <<FrameLen:32/integer-signed-big, _/binary>>}} =
-                    thrift_transport:read(Wrapped0, 4),
+                {ok, <<FrameLen:32/integer-signed-big, _/binary>>} =
+                    thrift_transport:read(Wrapped, 4),
                 %% then read the data
-                {WrappedS2, {ok, Bin}} =
-                    thrift_transport:read(WrappedS1, FrameLen),
-                {WrappedS2, {Bin, erlang:byte_size(Bin)}};
+                {ok, Bin} =
+                    thrift_transport:read(Wrapped, FrameLen),
+                {Bin, erlang:byte_size(Bin)};
             Sz ->
-                {Wrapped0, {RBuf, Sz}}
+                {RBuf, Sz}
         end,
 
     %% pull off Give bytes, return them to the user, leave the rest in the buffer
@@ -87,13 +139,69 @@ read(State0 = #framed_transport{wrapped 
     <<Data:Give/binary, RBuf2/binary>> = iolist_to_binary(RBuf1),
 
     Response = {ok, Data},
-    State1 = State0#framed_transport{wrapped = Wrapped1, read_buffer=RBuf2},
+    State1 = State#framed_transport{read_buffer=RBuf2},
+
+    {reply, Response, State1};
+
+handle_call(flush, _From, State) ->
+    {Response, State1} = do_flush(State),
+    {reply, Response, State1}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(close, State) ->
+    {_, State1} = do_flush(State),
+    %% Wrapped is closed by terminate/2
+    %%  error_logger:info_msg("thrift_framed_transport ~p: closing", [self()]),
+    {stop, normal, State};
+handle_cast(Msg, State=#framed_transport{}) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+    {noreply, State}.
 
-    {State1, Response}.
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, State = #framed_transport{wrapped=Wrapped}) ->
+    thrift_transport:close(Wrapped),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
 
 %%--------------------------------------------------------------------
-%% Internal functions
+%%% Internal functions
 %%--------------------------------------------------------------------
+do_flush(State = #framed_transport{write_buffer = Buffer,
+                                   wrapped = Wrapped}) ->
+    FrameLen = iolist_size(Buffer),
+    Data     = [<<FrameLen:32/integer-signed-big>>, Buffer],
+
+    Response = thrift_transport:write(Wrapped, Data),
+
+    thrift_transport:flush(Wrapped),
+
+    State1 = State#framed_transport{write_buffer = []},
+    {Response, State1}.
 
 min(A,B) when A<B -> A;
 min(_,B)          -> B.

Modified: incubator/thrift/trunk/lib/erl/src/thrift_http_transport.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_http_transport.erl?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_http_transport.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_http_transport.erl Mon Aug 30 22:05:00 2010
@@ -19,11 +19,20 @@
 
 -module(thrift_http_transport).
 
+-behaviour(gen_server).
 -behaviour(thrift_transport).
 
 %% API
 -export([new/2, new/3]).
 
+%% gen_server callbacks
+-export([init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         terminate/2,
+         code_change/3]).
+
 %% thrift_transport callbacks
 -export([write/2, read/2, flush/1, close/1]).
 
@@ -34,9 +43,14 @@
                          http_options, % see http(3)
                          extra_headers % [{str(), str()}, ...]
                         }).
--type state() :: pid().
--include("thrift_transport_behaviour.hrl").
 
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: new() -> {ok, Transport} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
 new(Host, Path) ->
     new(Host, Path, _Options = []).
 
@@ -46,6 +60,54 @@ new(Host, Path) ->
 %%   {extra_headers, ExtraHeaders}  = List of extra HTTP headers
 %%--------------------------------------------------------------------
 new(Host, Path, Options) ->
+    case gen_server:start_link(?MODULE, {Host, Path, Options}, []) of
+        {ok, Pid} ->
+            thrift_transport:new(?MODULE, Pid);
+        Else ->
+            Else
+    end.
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+    gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer, making a request
+%%--------------------------------------------------------------------
+flush(Transport) ->
+    gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+    gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+    gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+init({Host, Path, Options}) ->
     State1 = #http_transport{host = Host,
                              path = Path,
                              read_buffer = [],
@@ -65,17 +127,50 @@ new(Host, Path, Options) ->
         end,
     case lists:foldl(ApplyOption, State1, Options) of
         State2 = #http_transport{} ->
-            thrift_transport:new(?MODULE, State2);
+            {ok, State2};
         Else ->
-            {error, Else}
+            {stop, Else}
     end.
 
-%% Writes data into the buffer
-write(State = #http_transport{write_buffer = WBuf}, Data) ->
-    {State#http_transport{write_buffer = [WBuf, Data]}, ok}.
+handle_call({write, Data}, _From, State = #http_transport{write_buffer = WBuf}) ->
+    {reply, ok, State#http_transport{write_buffer = [WBuf, Data]}};
+
+handle_call({read, Len}, _From, State = #http_transport{read_buffer = RBuf}) ->
+    %% Pull off Give bytes, return them to the user, leave the rest in the buffer.
+    Give = min(iolist_size(RBuf), Len),
+    case iolist_to_binary(RBuf) of
+        <<Data:Give/binary, RBuf1/binary>> ->
+            Response = {ok, Data},
+            State1 = State#http_transport{read_buffer=RBuf1},
+            {reply, Response, State1};
+        _ ->
+            {reply, {error, 'EOF'}, State}
+    end;
+
+handle_call(flush, _From, State) ->
+    {Response, State1} = do_flush(State),
+    {reply, Response, State1}.
 
-%% Flushes the buffer, making a request
-flush(State = #http_transport{host = Host,
+handle_cast(close, State) ->
+    {_, State1} = do_flush(State),
+    {stop, normal, State1};
+
+handle_cast(_Msg, State=#http_transport{}) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+do_flush(State = #http_transport{host = Host,
                                  path = Path,
                                  read_buffer = Rbuf,
                                  write_buffer = Wbuf,
@@ -84,7 +179,7 @@ flush(State = #http_transport{host = Hos
     case iolist_to_binary(Wbuf) of
         <<>> ->
             %% Don't bother flushing empty buffers.
-            {State, ok};
+            {ok, State};
         WBinary ->
             {ok, {{_Version, 200, _ReasonPhrase}, _Headers, Body}} =
               http:request(post,
@@ -97,22 +192,7 @@ flush(State = #http_transport{host = Hos
 
             State1 = State#http_transport{read_buffer = [Rbuf, Body],
                                           write_buffer = []},
-            {State1, ok}
-    end.
-
-close(State) ->
-    {State, ok}.
-
-read(State = #http_transport{read_buffer = RBuf}, Len) when is_integer(Len) ->
-    %% Pull off Give bytes, return them to the user, leave the rest in the buffer.
-    Give = min(iolist_size(RBuf), Len),
-    case iolist_to_binary(RBuf) of
-        <<Data:Give/binary, RBuf1/binary>> ->
-            Response = {ok, Data},
-            State1 = State#http_transport{read_buffer=RBuf1},
-            {State1, Response};
-        _ ->
-            {State, {error, 'EOF'}}
+            {ok, State1}
     end.
 
 min(A,B) when A<B -> A;

Modified: incubator/thrift/trunk/lib/erl/src/thrift_memory_buffer.erl
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/erl/src/thrift_memory_buffer.erl?rev=990957&r1=990956&r2=990957&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/erl/src/thrift_memory_buffer.erl (original)
+++ incubator/thrift/trunk/lib/erl/src/thrift_memory_buffer.erl Mon Aug 30 22:05:00 2010
@@ -19,43 +19,145 @@
 
 -module(thrift_memory_buffer).
 
+-behaviour(gen_server).
 -behaviour(thrift_transport).
 
 %% API
 -export([new/0, new_transport_factory/0]).
 
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
 %% thrift_transport callbacks
 -export([write/2, read/2, flush/1, close/1]).
 
 -record(memory_buffer, {buffer}).
--type state() :: #memory_buffer{}.
--include("thrift_transport_behaviour.hrl").
 
+%%====================================================================
+%% API
+%%====================================================================
 new() ->
-    State = #memory_buffer{buffer = []},
-    thrift_transport:new(?MODULE, State).
+    case gen_server:start_link(?MODULE, [], []) of
+        {ok, Pid} ->
+            thrift_transport:new(?MODULE, Pid);
+        Else ->
+            Else
+    end.
 
 new_transport_factory() ->
     {ok, fun() -> new() end}.
 
-%% Writes data into the buffer
-write(State = #memory_buffer{buffer = Buf}, Data) ->
-    {State#memory_buffer{buffer = [Buf, Data]}, ok}.
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+    gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+    gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+    gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+    gen_server:call(Transport, {read, Len}).
 
-flush(State) ->
-    {State, ok}.
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
 
-close(State) ->
-    {State, ok}.
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([]) ->
+    {ok, #memory_buffer{buffer = []}}.
 
-read(State = #memory_buffer{buffer = Buf}, Len) when is_integer(Len) ->
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #memory_buffer{buffer = Buf}) ->
+    {reply, ok, State#memory_buffer{buffer = [Buf, Data]}};
+
+handle_call({read, Len}, _From, State = #memory_buffer{buffer = Buf}) ->
     Binary = iolist_to_binary(Buf),
     Give = min(iolist_size(Binary), Len),
     {Result, Remaining} = split_binary(Binary, Give),
-    {State#memory_buffer{buffer = Remaining}, {ok, Result}}.
+    {reply, {ok, Result}, State#memory_buffer{buffer = Remaining}};
+
+handle_call(flush, _From, State) ->
+    {reply, ok, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(close, State) ->
+    {stop, normal, State};
+handle_cast(Msg, State=#memory_buffer{}) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
 
 %%--------------------------------------------------------------------
-%% Internal functions
+%%% Internal functions
 %%--------------------------------------------------------------------
 min(A,B) when A<B -> A;
 min(_,B)          -> B.