You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by dl...@apache.org on 2019/07/23 17:08:31 UTC
[dubbo-erlang] 04/09: dev client pool
This is an automated email from the ASF dual-hosted git repository.
dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git
commit 1f64b4b0ca14f3178e1b590b1dd6611cf92f81ab
Author: DLive <xs...@163.com>
AuthorDate: Fri Jun 21 10:28:25 2019 +0800
dev client pool
---
include/dubbo.hrl | 2 +-
...o_netty_client.erl => dubbo_client_default.erl} | 21 +-
src/dubbo_consumer_pool.erl | 305 --------------------
.../dubbo_exchanger.erl | 35 ++-
src/dubbo_invoker_old.erl | 2 +-
src/dubbo_netty_client.erl | 2 +-
src/dubbo_protocol_dubbo.erl | 54 +++-
src/dubbo_provider_consumer_reg_table.erl | 320 ++++++++++++++++++++-
src/dubbo_registry_zookeeper.erl | 2 +-
...r_pool_sup.erl => dubbo_transport_pool_sup.erl} | 13 +-
src/dubbo_zookeeper.erl | 2 +-
src/dubboerl_sup.erl | 4 +-
test/dubbo_consumer_pool_tests.erl | 6 +-
13 files changed, 415 insertions(+), 353 deletions(-)
diff --git a/include/dubbo.hrl b/include/dubbo.hrl
index ad2277a..727c71e 100644
--- a/include/dubbo.hrl
+++ b/include/dubbo.hrl
@@ -102,7 +102,7 @@
-record(interface_list, {interface, pid, connection_info}).
--record(provider_node_list, {host_flag, connection_info}).
+-record(provider_node_list, {host_flag, pid, weight, readonly = false}).
-record(connection_info, {connection_id, pid, weight, host_flag, readonly = false}).
-type dubbo_request() :: #dubbo_request{}.
diff --git a/src/dubbo_netty_client.erl b/src/dubbo_client_default.erl
similarity index 97%
copy from src/dubbo_netty_client.erl
copy to src/dubbo_client_default.erl
index 0181d33..220c30f 100644
--- a/src/dubbo_netty_client.erl
+++ b/src/dubbo_client_default.erl
@@ -14,13 +14,12 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
--module(dubbo_netty_client).
+-module(dubbo_client_default).
-behaviour(gen_server).
+
-include("dubbo.hrl").
-%% API
--export([start_link/4]).
%% gen_server callbacks
-export([init/1,
@@ -29,6 +28,8 @@
handle_info/2,
terminate/2,
code_change/3]).
+-export([start_link/1]).
+
-export([check_recv_data/2]).
-define(SERVER, ?MODULE).
@@ -38,7 +39,8 @@
heartbeat = #heartbeat{},
recv_buffer = <<>>, %%从服务端接收的数据
host_flag,
- reconnection_timer
+ reconnection_timer,
+ handler
}).
%%%===================================================================
@@ -51,10 +53,10 @@
%%
%% @end
%%--------------------------------------------------------------------
--spec(start_link(Name :: binary(), HostFlag :: binary(), ProviderConfig :: #provider_config{}, integer()) ->
+-spec(start_link(Name :: binary(), ProviderConfig :: #provider_config{}) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
-start_link(Name, HostFlag, ProviderConfig, Index) ->
- gen_server:start_link({local, Name}, ?MODULE, [HostFlag, ProviderConfig, Index], []).
+start_link(ProviderConfig) ->
+ gen_server:start_link(?MODULE, [ProviderConfig], []).
%%%===================================================================
%%% gen_server callbacks
@@ -74,8 +76,7 @@ start_link(Name, HostFlag, ProviderConfig, Index) ->
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
-init([HostFlag, ProviderConfig, Index]) ->
- erlang:process_flag(min_bin_vheap_size, 1024 * 1024),
+init([HostFlag, ProviderConfig]) ->
#provider_config{host = Host, port = Port} = ProviderConfig,
State = case open(Host, Port) of
{ok, Socket} ->
@@ -415,7 +416,7 @@ process_response(true, _ResponseInfo, _RestData, State) ->
{ok, State}.
process_request(true, #dubbo_request{data = <<"R">>}, State) ->
- {ok, _} = dubbo_consumer_pool:update_connection_readonly(self(), true),
+ {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
{ok, State};
process_request(true, Request, State) ->
{ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State),
diff --git a/src/dubbo_consumer_pool.erl b/src/dubbo_consumer_pool.erl
deleted file mode 100644
index 0a01d38..0000000
--- a/src/dubbo_consumer_pool.erl
+++ /dev/null
@@ -1,305 +0,0 @@
-%%------------------------------------------------------------------------------
-%% Licensed to the Apache Software Foundation (ASF) under one or more
-%% contributor license agreements. See the NOTICE file distributed with
-%% this work for additional information regarding copyright ownership.
-%% The ASF licenses this file to You under the Apache License, Version 2.0
-%% (the "License"); you may not use this file except in compliance with
-%% the License. You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%------------------------------------------------------------------------------
--module(dubbo_consumer_pool).
-
--behaviour(gen_server).
-
-%% API
--export([start_link/0, start_consumer/2]).
-
-%% gen_server callbacks
--export([init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2,
- code_change/3]).
-
--export([select_connection/1, select_connection/2, update_connection_readonly/2]).
-
--include("dubbo.hrl").
--define(SERVER, ?MODULE).
-
--define(INTERFCE_LIST_TABLE, interface_list).
--define(PROVIDER_NODE_LIST_TABLE, provider_node_list).
-
--record(state, {}).
-
--ifdef(TEST).
--compile([export_all]).
--endif.
-
-
-%%%===================================================================
-%%% API
-%%%===================================================================
-
-%%--------------------------------------------------------------------
-%% @doc
-%% Starts the server
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(start_link() ->
- {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-%%%===================================================================
-%%% gen_server callbacks
-%%%===================================================================
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Initializes the server
-%%
-%% @spec init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% @end
-%%--------------------------------------------------------------------
--spec(init(Args :: term()) ->
- {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
- {stop, Reason :: term()} | ignore).
-init([]) ->
- init_ets_table(),
- {ok, #state{}}.
-init_ets_table() ->
- try ets:new(?INTERFCE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
- ?INTERFCE_LIST_TABLE ->
- ok
- catch
- _Type:Reason ->
- logger:error("new ets table error ~p", [Reason]),
- error
- end,
- try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
- ?PROVIDER_NODE_LIST_TABLE ->
- ok
- catch
- _Type1:Reason1 ->
- logger:error("new ets table error ~p", [Reason1]),
- error
- end,
- ok.
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling call messages
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
- State :: #state{}) ->
- {reply, Reply :: term(), NewState :: #state{}} |
- {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
- {noreply, NewState :: #state{}} |
- {noreply, NewState :: #state{}, timeout() | hibernate} |
- {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
- {stop, Reason :: term(), NewState :: #state{}}).
-
-handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) ->
-
- OldProviderList = get_interface_provider_node(Interface),
- NewProviderList = add_consumer(ProviderNodeList, []),
- DeleteProverList = OldProviderList -- NewProviderList,
- clean_invalid_provider(DeleteProverList),
- {reply, ok, State};
-handle_call(_Request, _From, State) ->
- {reply, ok, State}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling cast messages
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_cast(Request :: term(), State :: #state{}) ->
- {noreply, NewState :: #state{}} |
- {noreply, NewState :: #state{}, timeout() | hibernate} |
- {stop, Reason :: term(), NewState :: #state{}}).
-handle_cast(_Request, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling all non call/cast messages
-%%
-%% @spec handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
- {noreply, NewState :: #state{}} |
- {noreply, NewState :: #state{}, timeout() | hibernate} |
- {stop, Reason :: term(), NewState :: #state{}}).
-handle_info(_Info, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any
-%% necessary cleaning up. When it returns, the gen_server terminates
-%% with Reason. The return value is ignored.
-%%
-%% @spec terminate(Reason, State) -> void()
-%% @end
-%%--------------------------------------------------------------------
--spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
- State :: #state{}) -> term()).
-terminate(_Reason, _State) ->
- ok.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Convert process state when code is changed
-%%
-%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% @end
-%%--------------------------------------------------------------------
--spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
- Extra :: term()) ->
- {ok, NewState :: #state{}} | {error, Reason :: term()}).
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-start_consumer(Interface, ProviderNodeInfo) ->
- gen_server:call(?SERVER, {add_consumer, Interface, ProviderNodeInfo}).
-
-
-%%%===================================================================
-%%% Internal functions
-%%%===================================================================
-add_consumer([], RegisterList) ->
- RegisterList;
-add_consumer([ProviderNodeInfo | ProviderList], RegisterList) ->
- case dubbo_node_config_util:parse_provider_info(ProviderNodeInfo) of
- {ok, ProviderConfig} ->
- HostFlag = get_host_flag(ProviderConfig),
- case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
- [] ->
- ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
- ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),
- ok;
- List ->
- List2 = lists:map(fun(#provider_node_list{connection_info = ConnectionItem}) ->
- ConnectionItem
- end, List),
- ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, List2, false),
- ok
- end,
- add_consumer(ProviderList, [HostFlag] ++ RegisterList);
- {error, R1} ->
- logger:error("parse provider info error reason ~p", [R1]),
- add_consumer(ProviderList, RegisterList)
- end.
-
-start_provider_process(HostFlag, Weight, ProviderConfig) ->
- ExecutesList = lists:seq(1, ProviderConfig#provider_config.executes),
- ConnectionList = lists:map(fun(Item) ->
- ConnectionFlag = <<HostFlag/binary, (integer_to_binary(Item))/binary>>,
- ConnectionFlagTerm = binary_to_atom(ConnectionFlag, utf8),
- AChild = {ConnectionFlagTerm, {dubbo_netty_client, start_link, [ConnectionFlagTerm, HostFlag, ProviderConfig, Item]}, permanent, 2000, worker, [dubbo_netty_client]},
- {ok, Pid} = dubbo_consumer_pool_sup:add_children(AChild),
- logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
- #connection_info{connection_id = ConnectionFlagTerm, pid = Pid, weight = Weight, host_flag = HostFlag}
- end, ExecutesList),
- ConnectionList.
-get_host_flag(ProviderConfig) ->
- HostFlag = <<(list_to_binary(ProviderConfig#provider_config.host))/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>,
- HostFlag.
-
-update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) ->
- lists:map(fun(Item) ->
- I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}),
- logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
- case IsUpdateProvideNode of
- true ->
- I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag, connection_info = Item}),
- logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
- false ->
- ok
- end,
- ok
- end, ConnectionList),
- ok.
-
-get_interface_provider_node(Interface) ->
- case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
- [] ->
- [];
- List ->
- ListRet = [Item#interface_list.connection_info#connection_info.host_flag || Item <- List],
- dubbo_lists_util:del_duplicate(ListRet)
- end.
-
-select_connection(Interface) ->
- RandNum = rand:uniform(2048),
- select_connection(Interface, RandNum).
-select_connection(Interface, RandNum) ->
- case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
- [] ->
- {error, none};
- List ->
- Len = length(List),
- RemNum = (RandNum rem Len) + 1,
- InterfaceListItem = lists:nth(RemNum, List),
- {ok, InterfaceListItem#interface_list.connection_info}
- end.
-
--spec(update_connection_readonly(pid(), boolean()) -> ok).
-update_connection_readonly(ConnectionPid, Readonly) ->
- Pattern = #interface_list{pid = ConnectionPid, _ = '_'},
- Objects = ets:match_object(?INTERFCE_LIST_TABLE, Pattern),
- lists:map(fun(#interface_list{interface = Interface, pid = Pid, connection_info = ConnectionInfo} = InterferConnection) ->
- logger:debug("[dubbo] update interface ~p ~p readonly", [Interface, Pid]),
- NewConnectionInfo = ConnectionInfo#connection_info{readonly = Readonly},
- NewObject = InterferConnection#interface_list{connection_info = NewConnectionInfo},
- ets:delete_object(?INTERFCE_LIST_TABLE, InterferConnection),
- ets:insert(?INTERFCE_LIST_TABLE, NewObject)
- end, Objects),
- {ok, length(Objects)}.
-
-clean_invalid_provider([]) ->
- ok;
-clean_invalid_provider([HostFlag | DeleteProverList]) ->
- case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
- [] ->
- ok;
- ProviderNodeList ->
- ProviderNodeList1 = dubbo_lists_util:del_duplicate(ProviderNodeList),
- clean_connection_info(ProviderNodeList1)
- end,
- clean_invalid_provider(DeleteProverList).
-
-clean_connection_info(ProviderNodeList) ->
- lists:map(fun(Item) ->
- Pid = Item#provider_node_list.connection_info#connection_info.pid,
- ConnectionId = Item#provider_node_list.connection_info#connection_info.connection_id,
- Pattern = #interface_list{pid = Pid, _ = '_'},
- ets:delete_object(?INTERFCE_LIST_TABLE, Pattern),
- dubbo_consumer_pool_sup:stop_children(ConnectionId)
- end, ProviderNodeList),
- ok.
\ No newline at end of file
diff --git a/test/dubbo_consumer_pool_tests.erl b/src/dubbo_exchanger.erl
similarity index 57%
copy from test/dubbo_consumer_pool_tests.erl
copy to src/dubbo_exchanger.erl
index 740ed84..09a4833 100644
--- a/test/dubbo_consumer_pool_tests.erl
+++ b/src/dubbo_exchanger.erl
@@ -14,20 +14,27 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
--module(dubbo_consumer_pool_tests).
--author("dlive").
+-module(dubbo_exchanger).
--include_lib("eunit/include/eunit.hrl").
-include("dubbo.hrl").
-update_readonly_test() ->
- dubbo_consumer_pool:start_link(),
- InterfaceName= <<"testinterfacename">>,
- HostFalg= <<"127.0.0.1/20880">>,
- ConnectionList = [
- #connection_info{connection_id=1,pid= testpid,weight = 30,host_flag = HostFalg},
- #connection_info{connection_id=2,pid= testpid2,weight = 30,host_flag = HostFalg}
- ],
- dubbo_consumer_pool:update_connection_info(InterfaceName,HostFalg,ConnectionList,true),
- {ok,Size} = dubbo_consumer_pool:update_connection_readonly(testpid,false),
- ?assertEqual(1,Size).
+%% API
+-export([connect/2]).
+
+connect(Url,Handler) ->
+ case dubbo_node_config_util:parse_provider_info(Url) of
+ {ok, ProviderConfig} ->
+ HostFlag= dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
+ {ok, Pid} = dubbo_transport_pool_sup:add_children(ProviderConfig,Handler),
+ logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
+ {ok,#connection_info{ pid = Pid, weight = get_weight(ProviderConfig), host_flag = HostFlag}};
+ {error, R1} ->
+ logger:error("parse provider info error reason ~p", [R1]),
+ {error,R1}
+ end.
+
+
+
+get_weight(_ProviderConfig)->
+ %% todo get weight from provider info
+ 30.
\ No newline at end of file
diff --git a/src/dubbo_invoker_old.erl b/src/dubbo_invoker_old.erl
index c878656..354cef9 100644
--- a/src/dubbo_invoker_old.erl
+++ b/src/dubbo_invoker_old.erl
@@ -40,7 +40,7 @@ invoke_request(Interface, Request, RequestOption) ->
{ok, reference(), Data :: any(), RpcContent :: list()}|
{error, Reason :: timeout|no_provider|request_full|any()}.
invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
- case dubbo_consumer_pool:select_connection(Interface, Request#dubbo_request.mid) of
+ case dubbo_provider_consumer_reg_table:select_connection(Interface, Request#dubbo_request.mid) of
{ok, #connection_info{pid = Pid, host_flag = HostFlag}} ->
case dubbo_traffic_control:check_goon(HostFlag, 199) of
ok ->
diff --git a/src/dubbo_netty_client.erl b/src/dubbo_netty_client.erl
index 0181d33..06f9d1e 100644
--- a/src/dubbo_netty_client.erl
+++ b/src/dubbo_netty_client.erl
@@ -415,7 +415,7 @@ process_response(true, _ResponseInfo, _RestData, State) ->
{ok, State}.
process_request(true, #dubbo_request{data = <<"R">>}, State) ->
- {ok, _} = dubbo_consumer_pool:update_connection_readonly(self(), true),
+ {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
{ok, State};
process_request(true, Request, State) ->
{ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State),
diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl
index 06c36e6..1ede1d8 100644
--- a/src/dubbo_protocol_dubbo.erl
+++ b/src/dubbo_protocol_dubbo.erl
@@ -17,19 +17,61 @@
-module(dubbo_protocol_dubbo).
-include("dubboerl.hrl").
+-include("dubbo.hrl").
%% API
-export([refer/2]).
-refer(Url,Acc)->
- {ok,UrlInfo} = dubbo_common_fun:parse_url(Url),
+refer(Url, Acc) ->
+ {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
case UrlInfo#dubbo_url.scheme of
<<"dubbo">> ->
- {ok,todo};
+ do_refer(UrlInfo),
+ {ok, todo};
_ ->
- {skip,Acc}
+ {skip, Acc}
end.
-do_refer(UrlInfo)->
+do_refer(UrlInfo) ->
+
+ ok.
+
+
+getClients(ProviderUrl) ->
+ case new_transport(ProviderUrl) of
+ {ok,ConnectionInfoList} ->
+ ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
+ ok;
+ {error,Reason} ->
+ {error,Reason}
+ end.
+
+
+
+%%ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),
+
+
+new_transport(ProviderUrl)->
+ case dubbo_node_config_util:parse_provider_info(ProviderUrl) of
+ {ok, ProviderConfig} ->
+ HostFlag = get_host_flag(ProviderConfig),
+ case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of
+ [] ->
+ case dubbo_exchanger:connect(ProviderUrl,?MODULE) of
+ {ok,ConnectionInfo} ->
+ {ok,[ConnectionInfo]};
+ {error,Reason} ->
+ logger:warning("start client fail ~p ~p",[Reason,HostFlag]),
+ {error,Reason}
+ end;
+ ConnectionInfoList ->
+ {ok,ConnectionInfoList}
+ end;
+ {error, R1} ->
+ logger:error("parse provider info error reason ~p", [R1]),
+ {error,R1}
+ end.
+
+
+
- ok.
\ No newline at end of file
diff --git a/src/dubbo_provider_consumer_reg_table.erl b/src/dubbo_provider_consumer_reg_table.erl
index 3386cdc..c7a8dfa 100644
--- a/src/dubbo_provider_consumer_reg_table.erl
+++ b/src/dubbo_provider_consumer_reg_table.erl
@@ -15,7 +15,323 @@
%% limitations under the License.
%%------------------------------------------------------------------------------
-module(dubbo_provider_consumer_reg_table).
--author("dlive").
+
+-behaviour(gen_server).
%% API
--export([]).
+-export([start_link/0, start_consumer/2]).
+
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+-export([update_consumer_connections/2,get_host_connections/2, select_connection/1, select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2]).
+
+-include("dubbo.hrl").
+-define(SERVER, ?MODULE).
+
+-define(INTERFCE_LIST_TABLE, interface_list).
+-define(PROVIDER_NODE_LIST_TABLE, provider_node_list).
+
+-record(state, {}).
+
+-ifdef(TEST).
+-compile([export_all]).
+-endif.
+
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(start_link() ->
+ {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+-spec(init(Args :: term()) ->
+ {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term()} | ignore).
+init([]) ->
+ init_ets_table(),
+ {ok, #state{}}.
+init_ets_table() ->
+ try ets:new(?INTERFCE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
+ ?INTERFCE_LIST_TABLE ->
+ ok
+ catch
+ _Type:Reason ->
+ logger:error("new ets table error ~p", [Reason]),
+ error
+ end,
+ try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
+ ?PROVIDER_NODE_LIST_TABLE ->
+ ok
+ catch
+ _Type1:Reason1 ->
+ logger:error("new ets table error ~p", [Reason1]),
+ error
+ end,
+ ok.
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+ State :: #state{}) ->
+ {reply, Reply :: term(), NewState :: #state{}} |
+ {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+
+handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) ->
+
+ OldProviderList = get_interface_provider_node(Interface),
+ NewProviderList = add_consumer(ProviderNodeList, []),
+ DeleteProverList = OldProviderList -- NewProviderList,
+ clean_invalid_provider(DeleteProverList),
+ {reply, ok, State};
+handle_call(_Request, _From, State) ->
+ {reply, ok, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_cast(Request :: term(), State :: #state{}) ->
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
+ State :: #state{}) -> term()).
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
+ Extra :: term()) ->
+ {ok, NewState :: #state{}} | {error, Reason :: term()}).
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+start_consumer(Interface, ProviderNodeInfo) ->
+ gen_server:call(?SERVER, {add_consumer, Interface, ProviderNodeInfo}).
+
+
+
+get_host_connections(Host, Port) ->
+ HostFlag = get_host_flag(Host, Port),
+ List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag),
+ List2 = lists:map(
+ fun(#provider_node_list{host_flag = HostFlag,pid = Pid,readonly = Readonly}) ->
+ #connection_info{host_flag = HostFlag,pid = Pid,readonly = Readonly}
+ end, List),
+ List2.
+
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+%%add_consumer([], RegisterList) ->
+%% RegisterList;
+%%add_consumer([ProviderNodeInfo | ProviderList], RegisterList) ->
+%% case dubbo_node_config_util:parse_provider_info(ProviderNodeInfo) of
+%% {ok, ProviderConfig} ->
+%% HostFlag = get_host_flag(ProviderConfig),
+%% case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
+%% [] ->
+%% ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
+%% ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),
+%% ok;
+%% List ->
+%% List2 = lists:map(fun(#provider_node_list{connection_info = ConnectionItem}) ->
+%% ConnectionItem
+%% end, List),
+%% ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, List2, false),
+%% ok
+%% end,
+%% add_consumer(ProviderList, [HostFlag] ++ RegisterList);
+%% {error, R1} ->
+%% logger:error("parse provider info error reason ~p", [R1]),
+%% add_consumer(ProviderList, RegisterList)
+%% end.
+%%
+%%start_provider_process(HostFlag, Weight, ProviderConfig) ->
+%% ExecutesList = lists:seq(1, ProviderConfig#provider_config.executes),
+%% ConnectionList = lists:map(fun(Item) ->
+%% ConnectionFlag = <<HostFlag/binary, (integer_to_binary(Item))/binary>>,
+%% ConnectionFlagTerm = binary_to_atom(ConnectionFlag, utf8),
+%% AChild = {ConnectionFlagTerm, {dubbo_netty_client, start_link, [ConnectionFlagTerm, HostFlag, ProviderConfig, Item]}, permanent, 2000, worker, [dubbo_netty_client]},
+%% {ok, Pid} = dubbo_transport_pool_sup:add_children(AChild),
+%% logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
+%% #connection_info{connection_id = ConnectionFlagTerm, pid = Pid, weight = Weight, host_flag = HostFlag}
+%% end, ExecutesList),
+%% ConnectionList.
+
+update_consumer_connections(Interface, Connections) ->
+ lists:map(
+ fun(Item) ->
+ HostFlag= Item#connection_info.host_flag,
+
+ case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
+ '$end_of_table' ->
+ I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid}),
+ logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
+ {_ObjectList,_Continuation} ->
+ ok
+ end,
+ I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}),
+ logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
+ ok
+ end, Connections),
+ ok.
+
+get_host_flag(ProviderConfig) ->
+ HostFlag = <<(list_to_binary(ProviderConfig#provider_config.host))/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>,
+ HostFlag.
+get_host_flag(Host, Port) ->
+ <<(list_to_binary(Host))/binary, <<"_">>/binary, (integer_to_binary(Port))/binary>>.
+
+update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) ->
+ lists:map(fun(Item) ->
+ I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}),
+ logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
+ case IsUpdateProvideNode of
+ true ->
+ I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag, connection_info = Item}),
+ logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
+ false ->
+ ok
+ end,
+ ok
+ end, ConnectionList),
+ ok.
+
+get_interface_provider_node(Interface) ->
+ case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
+ [] ->
+ [];
+ List ->
+ ListRet = [Item#interface_list.connection_info#connection_info.host_flag || Item <- List],
+ dubbo_lists_util:del_duplicate(ListRet)
+ end.
+
+select_connection(Interface) ->
+ RandNum = rand:uniform(2048),
+ select_connection(Interface, RandNum).
+select_connection(Interface, RandNum) ->
+ case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
+ [] ->
+ {error, none};
+ List ->
+ Len = length(List),
+ RemNum = (RandNum rem Len) + 1,
+ InterfaceListItem = lists:nth(RemNum, List),
+ {ok, InterfaceListItem#interface_list.connection_info}
+ end.
+
+-spec(update_connection_readonly(pid(), boolean()) -> ok).
+update_connection_readonly(ConnectionPid, Readonly) ->
+ Pattern = #interface_list{pid = ConnectionPid, _ = '_'},
+ Objects = ets:match_object(?INTERFCE_LIST_TABLE, Pattern),
+ lists:map(fun(#interface_list{interface = Interface, pid = Pid, connection_info = ConnectionInfo} = InterferConnection) ->
+ logger:debug("[dubbo] update interface ~p ~p readonly", [Interface, Pid]),
+ NewConnectionInfo = ConnectionInfo#connection_info{readonly = Readonly},
+ NewObject = InterferConnection#interface_list{connection_info = NewConnectionInfo},
+ ets:delete_object(?INTERFCE_LIST_TABLE, InterferConnection),
+ ets:insert(?INTERFCE_LIST_TABLE, NewObject)
+ end, Objects),
+ {ok, length(Objects)}.
+
+clean_invalid_provider([]) ->
+ ok;
+clean_invalid_provider([HostFlag | DeleteProverList]) ->
+ case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
+ [] ->
+ ok;
+ ProviderNodeList ->
+ ProviderNodeList1 = dubbo_lists_util:del_duplicate(ProviderNodeList),
+ clean_connection_info(ProviderNodeList1)
+ end,
+ clean_invalid_provider(DeleteProverList).
+
+clean_connection_info(ProviderNodeList) ->
+ lists:map(fun(Item) ->
+ Pid = Item#provider_node_list.connection_info#connection_info.pid,
+ ConnectionId = Item#provider_node_list.connection_info#connection_info.connection_id,
+ Pattern = #interface_list{pid = Pid, _ = '_'},
+ ets:delete_object(?INTERFCE_LIST_TABLE, Pattern),
+ dubbo_transport_pool_sup:stop_children(ConnectionId)
+ end, ProviderNodeList),
+ ok.
\ No newline at end of file
diff --git a/src/dubbo_registry_zookeeper.erl b/src/dubbo_registry_zookeeper.erl
index 0b5b3f5..5d14588 100644
--- a/src/dubbo_registry_zookeeper.erl
+++ b/src/dubbo_registry_zookeeper.erl
@@ -311,4 +311,4 @@ gen_consumer_node_info(Consumer) ->
%%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
start_provider_process(Interface, ProviderList) ->
- dubbo_consumer_pool:start_consumer(Interface, ProviderList).
\ No newline at end of file
+ dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList).
\ No newline at end of file
diff --git a/src/dubbo_consumer_pool_sup.erl b/src/dubbo_transport_pool_sup.erl
similarity index 87%
rename from src/dubbo_consumer_pool_sup.erl
rename to src/dubbo_transport_pool_sup.erl
index 77a6dbe..019c57d 100644
--- a/src/dubbo_consumer_pool_sup.erl
+++ b/src/dubbo_transport_pool_sup.erl
@@ -14,12 +14,12 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
--module(dubbo_consumer_pool_sup).
+-module(dubbo_transport_pool_sup).
-behaviour(supervisor).
%% API
--export([start_link/0, add_children/1, stop_children/1]).
+-export([start_link/0, add_children/2, stop_children/1]).
%% Supervisor callbacks
-export([init/1]).
@@ -63,17 +63,18 @@ start_link() ->
ignore |
{error, Reason :: term()}).
init([]) ->
- RestartStrategy = one_for_one,
+ RestartStrategy = simple_one_for_one,
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
+ Child = {dubbo_client_default, {dubbo_client_default, start_link, []}, permanent, 2000, worker, [dubbo_client_default]},
+ {ok, {SupFlags, [Child]}}.
- {ok, {SupFlags, []}}.
+add_children(ProvideConfig, Handler) ->
+ supervisor:start_child(?SERVER, [ProvideConfig, Handler]).
-add_children(ChildSpec) ->
- supervisor:start_child(?SERVER, ChildSpec).
stop_children(ChildID) ->
supervisor:terminate_child(?SERVER, ChildID).
%%%===================================================================
diff --git a/src/dubbo_zookeeper.erl b/src/dubbo_zookeeper.erl
index f62ace1..84a95d6 100644
--- a/src/dubbo_zookeeper.erl
+++ b/src/dubbo_zookeeper.erl
@@ -272,5 +272,5 @@ gen_consumer_node_info(Consumer) ->
%%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
start_provider_process(Interface, ProviderList) ->
- dubbo_consumer_pool:start_consumer(Interface, ProviderList).
+ dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList).
diff --git a/src/dubboerl_sup.erl b/src/dubboerl_sup.erl
index a829015..a0d2fb0 100644
--- a/src/dubboerl_sup.erl
+++ b/src/dubboerl_sup.erl
@@ -45,8 +45,8 @@ init([]) ->
%% NettySer = {dubbo_netty_client,{dubbo_netty_client, start_link, []},transient,5000,worker,[dubbo_netty_client]},
Id_count = {dubbo_id_generator, {dubbo_id_generator, start_link, []}, transient, 5000, worker, [dubbo_id_generator]},
ProviderPoolSup = {dubbo_provider_worker_sup, {dubbo_provider_worker_sup, start_link, []}, transient, 5000, supervisor, [dubbo_provider_worker_sup]},
- ConsumerPoolSup = {dubbo_consumer_pool_sup, {dubbo_consumer_pool_sup, start_link, []}, transient, 5000, supervisor, [dubbo_consumer_pool_sup]},
- ConsumerPool = {dubbo_consumer_pool, {dubbo_consumer_pool, start_link, []}, transient, 5000, worker, [dubbo_consumer_pool]},
+ ConsumerPoolSup = {dubbo_transport_pool_sup, {dubbo_transport_pool_sup, start_link, []}, transient, 5000, supervisor, [dubbo_transport_pool_sup]},
+ ConsumerPool = {dubbo_provider_consumer_reg_table, {dubbo_provider_consumer_reg_table, start_link, []}, transient, 5000, worker, [dubbo_provider_consumer_reg_table]},
ListNew1 =
case application:get_env(dubboerl, registry, false) of
true ->
diff --git a/test/dubbo_consumer_pool_tests.erl b/test/dubbo_consumer_pool_tests.erl
index 740ed84..0a67f1a 100644
--- a/test/dubbo_consumer_pool_tests.erl
+++ b/test/dubbo_consumer_pool_tests.erl
@@ -21,13 +21,13 @@
-include("dubbo.hrl").
update_readonly_test() ->
- dubbo_consumer_pool:start_link(),
+ dubbo_provider_consumer_reg_table:start_link(),
InterfaceName= <<"testinterfacename">>,
HostFalg= <<"127.0.0.1/20880">>,
ConnectionList = [
#connection_info{connection_id=1,pid= testpid,weight = 30,host_flag = HostFalg},
#connection_info{connection_id=2,pid= testpid2,weight = 30,host_flag = HostFalg}
],
- dubbo_consumer_pool:update_connection_info(InterfaceName,HostFalg,ConnectionList,true),
- {ok,Size} = dubbo_consumer_pool:update_connection_readonly(testpid,false),
+ dubbo_provider_consumer_reg_table:update_connection_info(InterfaceName,HostFalg,ConnectionList,true),
+ {ok,Size} = dubbo_provider_consumer_reg_table:update_connection_readonly(testpid,false),
?assertEqual(1,Size).