You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by mo...@apache.org on 2011/07/12 20:56:15 UTC

svn commit: r1145719 - /thrift/trunk/lib/erl/src/thrift_reconnecting_client.erl

Author: molinaro
Date: Tue Jul 12 18:56:15 2011
New Revision: 1145719

URL: http://svn.apache.org/viewvc?rev=1145719&view=rev
Log:
THRIFT-1236 - adding reconnecting client

Added:
    thrift/trunk/lib/erl/src/thrift_reconnecting_client.erl

Added: thrift/trunk/lib/erl/src/thrift_reconnecting_client.erl
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/erl/src/thrift_reconnecting_client.erl?rev=1145719&view=auto
==============================================================================
--- thrift/trunk/lib/erl/src/thrift_reconnecting_client.erl (added)
+++ thrift/trunk/lib/erl/src/thrift_reconnecting_client.erl Tue Jul 12 18:56:15 2011
@@ -0,0 +1,240 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_reconnecting_client).
+
+-behaviour(gen_server).
+
+%% API
+-export([ call/3,
+          get_stats/1,
+          get_and_reset_stats/1 ]).
+
+-export([ start_link/6 ]).
+
+%% gen_server callbacks
+-export([ init/1,
+          handle_call/3,
+          handle_cast/2,
+          handle_info/2,
+          terminate/2,
+          code_change/3 ]).
+
+-record( state, { client = nil, 
+                  host,
+                  port,
+                  thrift_svc,
+                  thrift_opts,
+                  reconn_min,
+                  reconn_max,
+                  reconn_time,
+                  op_cnt_dict,
+                  op_time_dict } ).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link( Host, Port,
+            ThriftSvc, ThriftOpts,
+            ReconnMin, ReconnMax ) ->
+  gen_server:start_link( ?MODULE,
+                         [ Host, Port,
+                           ThriftSvc, ThriftOpts,
+                           ReconnMin, ReconnMax ],
+                         [] ).
+
+call( Pid, Op, Args ) ->
+  gen_server:call( Pid, { call, Op, Args } ).
+
+get_stats( Pid ) ->
+  gen_server:call( Pid, get_stats ).
+
+get_and_reset_stats( Pid ) ->
+  gen_server:call( Pid, get_and_reset_stats ).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Start the server.
+%%--------------------------------------------------------------------
+init( [ Host, Port, TSvc, TOpts, ReconnMin, ReconnMax ] ) ->
+  process_flag( trap_exit, true ),
+
+  State = #state{ host         = Host,
+                  port         = Port,
+                  thrift_svc   = TSvc,
+                  thrift_opts  = TOpts,
+                  reconn_min   = ReconnMin,
+                  reconn_max   = ReconnMax,
+                  op_cnt_dict  = dict:new(),
+                  op_time_dict = dict:new() },
+
+  { ok, try_connect( State ) }.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                                   {reply, Reply, State, Timeout} |
+%%                                                   {noreply, State} |
+%%                                                   {noreply, State, Timeout} |
+%%                                                   {stop, Reason, Reply, State} |
+%%                                                   {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call( { call, Op, _ },
+             _From,
+             State = #state{ client = nil } ) ->
+  { reply, { error, noconn }, incr_stats( Op, "failfast", 1, State ) };
+
+handle_call( { call, Op, Args },
+             _From,
+             State=#state{ client = Client } ) ->
+
+  Start = now(),
+  Result = ( catch thrift_client:call( Client, Op, Args) ),
+  Time = timer:now_diff( now(), Start ),
+
+  case Result of
+    { C, { ok, Reply } } ->
+      S = incr_stats( Op, "success", Time, State#state{ client = C } ),
+      { reply, {ok, Reply }, S };
+    { _, { E, Msg } } when E == error; E == exception ->
+      S = incr_stats( Op, "error", Time, try_connect( State ) ),
+      { reply, { E, Msg }, S };
+    Other ->
+      S = incr_stats( Op, "error", Time, try_connect( State ) ),
+      { reply, Other, S }
+  end;
+
+handle_call( get_stats,
+             _From,
+             State = #state{} ) ->
+  { reply, stats( State ), State };
+
+handle_call( get_and_reset_stats,
+             _From,
+             State = #state{} ) ->
+  { reply, stats( State ), reset_stats( State ) }.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast( _Msg, State ) ->
+  { noreply, State }.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info( _Info, State ) ->
+  { noreply, State }.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate( _Reason, #state{ client = Client } ) ->
+  thrift_client:close( Client ),
+  ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change( _OldVsn, State, _Extra ) ->
+  { ok, State }.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+try_connect( State = #state{ client      = OldClient,
+                             host        = Host,
+                             port        = Port,
+                             thrift_svc  = TSvc,
+                             thrift_opts = TOpts } ) ->
+
+  case OldClient of
+    nil -> ok;
+    _   -> ( catch thrift_client:close( OldClient ) )
+  end,
+
+  case catch thrift_client_util:new( Host, Port, TSvc, TOpts ) of
+    { ok, Client } ->
+      State#state{ client = Client, reconn_time = 0 };
+    { E, Msg } when E == error; E == exception ->
+      ReconnTime = reconn_time( State ),
+      error_logger:error_msg( "[~w] ~w connect failed (~w), trying again in ~w ms~n",
+                              [ self(), TSvc, Msg, ReconnTime ] ),
+      erlang:send_after( ReconnTime, self(), try_connect ),
+      State#state{ client = nil, reconn_time = ReconnTime }
+  end.
+
+
+reconn_time( #state{ reconn_min = ReconnMin, reconn_time = 0 } ) ->
+  ReconnMin;
+reconn_time( #state{ reconn_max = ReconnMax, reconn_time = ReconnMax } ) ->
+  ReconnMax;
+reconn_time( #state{ reconn_max = ReconnMax, reconn_time = R } ) ->
+  Backoff = 2 * R,
+  case Backoff > ReconnMax of
+    true  -> ReconnMax;
+    false -> Backoff
+  end.
+
+
+incr_stats( Op, Result, Time,
+            State = #state{ op_cnt_dict  = OpCntDict,
+                            op_time_dict = OpTimeDict } ) ->
+  Key = lists:flatten( [ atom_to_list( Op ), [ "_" | Result ] ] ),
+  State#state{ op_cnt_dict  = dict:update_counter( Key, 1, OpCntDict ),
+               op_time_dict = dict:update_counter( Key, Time, OpTimeDict ) }.
+
+
+stats( #state{ thrift_svc   = TSvc,
+               op_cnt_dict  = OpCntDict,
+               op_time_dict = OpTimeDict } ) ->
+  Svc = atom_to_list(TSvc),
+
+  F = fun( Key, Count, Stats ) ->
+        Name = lists:flatten( [ Svc, [ "_" | Key ] ] ),
+        Micros = dict:fetch( Key, OpTimeDict ),
+        [ { Name, Count, Micros } | Stats ]
+      end,
+
+  dict:fold( F, [], OpCntDict ).
+
+reset_stats( State = #state{} ) ->
+  State#state{ op_cnt_dict = dict:new(), op_time_dict = dict:new() }.