You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by dl...@apache.org on 2019/07/23 17:08:27 UTC

[dubbo-erlang] branch 0.4.0 created (now af4b990)

This is an automated email from the ASF dual-hosted git repository.

dlive pushed a change to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git.


      at af4b990  feature: support protocol extension

This branch includes the following new commits:

     new 07e743a  mod defined for plugin design
     new 051bb2a  redesign the subscription process
     new fc0072d  redesign the protocol callback
     new 1f64b4b  dev client pool
     new 216b5a7  dev reference ref process
     new e0b122b  dev reference ref process
     new a5a0cde  redesign: invoker process
     new 91faffc  merge form master
     new af4b990  feature: support protocol extension

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[dubbo-erlang] 06/09: dev reference ref process

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit e0b122b375aadbf1524789d54360728a7d90c027
Author: DLive <xs...@163.com>
AuthorDate: Sun Jun 23 23:39:36 2019 +0800

    dev reference ref process
---
 config_example/sys.config                 |  1 +
 include/dubbo.hrl                         |  1 +
 src/dubbo_common_fun.erl                  |  7 +--
 src/dubbo_directory.erl                   | 13 +++--
 src/dubbo_loadbalance_random.erl          | 20 ++++++++
 src/dubbo_protocol_registry.erl           |  2 +-
 src/dubbo_provider_consumer_reg_table.erl | 19 +++++--
 src/dubbo_reference_config.erl            | 83 ++++++++++++++++++++++---------
 src/dubboerl.erl                          |  5 +-
 9 files changed, 113 insertions(+), 38 deletions(-)

diff --git a/config_example/sys.config b/config_example/sys.config
index 0ed8a87..07cfaf9 100644
--- a/config_example/sys.config
+++ b/config_example/sys.config
@@ -17,6 +17,7 @@
         ]
     },
     {dubboerl,[
+        {registry,zookeeper},
         {zookeeper_list,[{"127.0.0.1",2181}]},
         {application,<<"testdubboerl">>},
         {registry,true},
diff --git a/include/dubbo.hrl b/include/dubbo.hrl
index f2d4048..282e0da 100644
--- a/include/dubbo.hrl
+++ b/include/dubbo.hrl
@@ -100,6 +100,7 @@
 }).
 
 
+-record(interface_info, {interface, loadbalance}).
 
 -record(interface_list, {interface, pid, connection_info}).
 %%-record(provider_node_list, {host_flag, pid, weight, readonly = false}).
diff --git a/src/dubbo_common_fun.erl b/src/dubbo_common_fun.erl
index 6717fac..5f38fbd 100644
--- a/src/dubbo_common_fun.erl
+++ b/src/dubbo_common_fun.erl
@@ -18,7 +18,7 @@
 
 -include("dubboerl.hrl").
 %% API
--export([local_ip_v4/0, local_ip_v4_str/0, parse_url/1, map_to_url/1]).
+-export([local_ip_v4/0, local_ip_v4_str/0, parse_url/1, url_to_binary/1]).
 
 local_ip_v4() ->
     {ok, Addrs} = inet:getifaddrs(),
@@ -67,7 +67,7 @@ parse_url_parameter([Item | Rest], Parameters) ->
     end.
 
 
-map_to_url(UrlInfo) ->
+url_to_binary(UrlInfo) ->
     ParameterStr =
         case UrlInfo#dubbo_url.parameters of
             undefined ->
@@ -79,10 +79,11 @@ map_to_url(UrlInfo) ->
                 ParameterStr2 = ["?" | ParameterStr1],
                 list_to_binary(ParameterStr2)
         end,
-    Value = io_lib:format(<<"~s://~s/~s?~s">>,
+    Value = io_lib:format(<<"~s://~s:~p/~s?~s">>,
         [
             UrlInfo#dubbo_url.scheme,
             UrlInfo#dubbo_url.host,
+            UrlInfo#dubbo_url.port,
             UrlInfo#dubbo_url.path,
             ParameterStr
         ]),
diff --git a/src/dubbo_directory.erl b/src/dubbo_directory.erl
index 07a6dff..c667f29 100644
--- a/src/dubbo_directory.erl
+++ b/src/dubbo_directory.erl
@@ -18,6 +18,8 @@
 
 -behaviour(gen_server).
 -include("dubboerl.hrl").
+-include("dubbo.hrl").
+
 -export([subscribe/2,notify/2]).
 %% API
 -export([start_link/0]).
@@ -92,15 +94,15 @@ refresh_invoker(UrlList)->
     case pick_interface(UrlList) of
         {error,Reason}->
             fail;
-        {"empty",Interface}->
+        {"empty",Interface,_}->
             todo_destroy;
-        {_,Interface} ->
+        {_,Interface,LoadBalance} ->
             OldProviderHosts = dubbo_provider_consumer_reg_table:get_interface_provider_node(Interface),
             NewInvokers = refresh_invoker(UrlList,[]),
             NewProviderHosts = [Item#dubbo_invoker.host_flag || Item <- NewInvokers],
             DeleteProverList = OldProviderHosts -- NewProviderHosts,
-            dubbo_provider_consumer_reg_table:clean_invalid_provider(DeleteProverList)
-
+            dubbo_provider_consumer_reg_table:clean_invalid_provider(DeleteProverList),
+            dubbo_provider_consumer_reg_table:update_connection_info(#interface_info{interface = Interface,loadbalance = LoadBalance})
     end.
 %%    OldProviderHosts =
 
@@ -119,7 +121,8 @@ pick_interface([Url | _]) ->
     case dubbo_common_fun:parse_url(Url) of
         {ok,UrlInfo}->
             Interface = maps:get("interface",UrlInfo#dubbo_url.parameters),
-            {UrlInfo#dubbo_url.scheme,Interface};
+            LoadBalance = list_to_atom("dubbo_loadbalance_" ++ maps:get("loadbalance",UrlInfo#dubbo_url.parameters,"random")),
+            {UrlInfo#dubbo_url.scheme,Interface,LoadBalance};
         {error,Reason} ->
             {error,Reason}
     end.
diff --git a/src/dubbo_loadbalance_random.erl b/src/dubbo_loadbalance_random.erl
new file mode 100644
index 0000000..21d4f61
--- /dev/null
+++ b/src/dubbo_loadbalance_random.erl
@@ -0,0 +1,20 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_loadbalance_random).
+
+%% API
+-export([]).
diff --git a/src/dubbo_protocol_registry.erl b/src/dubbo_protocol_registry.erl
index 8277c7e..b2fde17 100644
--- a/src/dubbo_protocol_registry.erl
+++ b/src/dubbo_protocol_registry.erl
@@ -49,5 +49,5 @@ gen_consumer_url(UrlInfo)->
         path = Interface,
         parameters = Parameters2
     },
-    ConsumerUrl = dubbo_common_fun:map_to_url(ConsumerUrlInfo),
+    ConsumerUrl = dubbo_common_fun:url_to_binary(ConsumerUrlInfo),
     ConsumerUrl.
\ No newline at end of file
diff --git a/src/dubbo_provider_consumer_reg_table.erl b/src/dubbo_provider_consumer_reg_table.erl
index d47260d..1c193b8 100644
--- a/src/dubbo_provider_consumer_reg_table.erl
+++ b/src/dubbo_provider_consumer_reg_table.erl
@@ -36,6 +36,9 @@
 -define(SERVER, ?MODULE).
 
 -define(INTERFCE_LIST_TABLE, interface_list).
+
+-define(INTERFAE_INFO_TABLE,dubbo_interface_info).
+
 -define(PROVIDER_NODE_LIST_TABLE, provider_node_list).
 
 -record(state, {}).
@@ -87,16 +90,21 @@ init_ets_table() ->
             ok
     catch
         _Type:Reason ->
-            logger:error("new ets table error ~p", [Reason]),
-            error
+            logger:error("new ets table INTERFCE_LIST_TABLE error ~p", [Reason])
     end,
     try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
         ?PROVIDER_NODE_LIST_TABLE ->
             ok
     catch
         _Type1:Reason1 ->
-            logger:error("new ets table error ~p", [Reason1]),
-            error
+            logger:error("new ets table  PROVIDER_NODE_LIST_TABLE error ~p", [Reason1])
+    end,
+    try ets:new(?INTERFAE_INFO_TABLE, [public, named_table, {keypos, 2}]) of
+        ?INTERFAE_INFO_TABLE ->
+            ok
+    catch
+        _Type1:Reason1 ->
+            logger:error("new ets table  PROVIDER_NODE_LIST_TABLE error ~p", [Reason1])
     end,
     ok.
 %%--------------------------------------------------------------------
@@ -196,6 +204,9 @@ get_host_connections(Host, Port) ->
     List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag),
     List.
 
+update_interface_info(InterfaceInfo)->
+    ets:insert(?INTERFAE_INFO_TABLE,InterfaceInfo).
+
 
 %%%===================================================================
 %%% Internal functions
diff --git a/src/dubbo_reference_config.erl b/src/dubbo_reference_config.erl
index 6c58a50..7ab7f86 100644
--- a/src/dubbo_reference_config.erl
+++ b/src/dubbo_reference_config.erl
@@ -16,24 +16,28 @@
 %%------------------------------------------------------------------------------
 -module(dubbo_reference_config).
 
+-include("dubbo.hrl").
+-include("dubboerl.hrl").
+
 -record(dubbo_interface_info,{}).
 
 %% API
--export([]).
-
-init_reference()->
-    InitConfigMap= #{
+-export([init_reference/1]).
 
-    },
+init_reference(ConsumerInfo)->
+%%    InitConfigMap= #{
+%%
+%%    },
     %% 组装各类需要数据
+    create_proxy(ConsumerInfo),
     ok.
 
 
-create_proxy(InitConfigMap)->
+create_proxy(ConsumerInfo)->
+
 
 
-    InterfaceClassInfo = #{},
-    Para = gen_parameter(),
+    Para = gen_parameter(ConsumerInfo),
     Url = gen_registry_url(Para),
     dubbo_extension:run(protocol_wapper,refer,[Url]),
     ok.
@@ -43,32 +47,65 @@ create_proxy(InitConfigMap)->
 
 gen_registry_url(Para)->
     %%todo 组装para & url
+    {Host,Port} = get_registry_host_port(),
+    UrlInfo = #dubbo_url{
+        scheme = <<"registry">>,
+        host = list_to_binary(Host),
+        port = integer_to_binary(Port),
+        path = <<"org.apache.dubbo.registry.RegistryService">>,
+        parameters = Para
+    },
+    dubbo_common_fun:url_to_binary(UrlInfo).
+%%    Url = "registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hello-world&dubbo=2.0.2&pid=68901&refer=application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127.0.0.1% [...]
+%%    Url.
+
+get_registry_host_port()->
+    %% @todo need adapter other registry
+    RegistryList = application:get_env(dubboerl,zookeeper_list,[{"127.0.0.1",2181}]),
+    [Item|_] = RegistryList,
+    Item.
 
-    Url = "registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hello-world&dubbo=2.0.2&pid=68901&refer=application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127.0.0.1%26 [...]
-    Url.
-gen_parameter()->
+gen_parameter(ConsumerInfo)->
     Para = #{
-        <<"application">> => get_appname(),
+        <<"application">> => get_appname(ConsumerInfo),
         <<"dubbo">> => <<"2.0.2">>,
         <<"pid">> => get_pid(),
-        <<"refer">> => get_refinfo(),
+        <<"refer">> => get_refinfo(ConsumerInfo),
         <<"registry">> => get_registry_type(),
         <<"release">> => <<"2.7.1">>,
-        <<"timestamp">> => <<"1559727842451">>
+        <<"timestamp">> => integer_to_binary(dubbo_time_util:timestamp_ms())
     },
 
     Para.
 
-get_appname()->
-    %%todo
-    <<"hello-world">>.
+get_appname(ConsumerInfo)->
+    ConsumerInfo#consumer_config.application.
 get_pid()->
-    %%todo
-    <<"68901">>.
-get_refinfo()->
-    %%todo
-    <<"application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127..0.1%26release%3D2.7.1%26retries%3D0%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1559727789953">>.
+    os:getpid().
+get_refinfo(ConsumerInfo)->
+    KeyValues=[
+        {"application",ConsumerInfo#consumer_config.application},
+        {"default.check",ConsumerInfo#consumer_config.check},
+        {"default.lazy","false"},
+        {"default.retries","0"},
+        {"default.sticky","false"},
+        {"default.timeout","300000"},
+        {"dubbo","2.0.2"},
+        {"interface",ConsumerInfo#consumer_config.interface},
+        {"lazy","false"},
+        {"methods",ConsumerInfo#consumer_config.methods},
+        {"register.ip",ConsumerInfo#consumer_config.application},
+        {"release","2.7.1"},
+        {"pid",get_pid()},
+        {"side","consumer"},
+        {"sticky","false"},
+        {"timestamp",dubbo_time_util:timestamp_ms()}
+    ],
+    KeyValues2 = [io_lib:format("~s=~p", [Key, Value]) || {Key, Value} <= KeyValues],
+    ParameterStr1 = string:join(KeyValues2, "&"),
+    list_to_binary(http_uri:encode(ParameterStr1)).
+%%    <<"application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127..0.1%26release%3D2.7.1%26retries%3D0%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1559727789953">>.
 
 get_registry_type()->
     %%todo
-    <<"zookeeper">>.
\ No newline at end of file
+    atom_to_binary(application:get_env(dubboerl,registry,zookeeper)).
\ No newline at end of file
diff --git a/src/dubboerl.erl b/src/dubboerl.erl
index 0a5f5bc..03cc8a0 100644
--- a/src/dubboerl.erl
+++ b/src/dubboerl.erl
@@ -33,8 +33,9 @@ start_consumer() ->
     ApplicationName = application:get_env(dubboerl, application, <<"defaultApplication">>),
     lists:map(fun({Interface, Option}) ->
         ConsumerInfo = dubbo_config_util:gen_consumer(ApplicationName, Interface, Option),
-        dubbo_zookeeper:register_consumer(ConsumerInfo),
-        logger:info("register consumer success ~p", [Interface])
+%%        dubbo_zookeeper:register_consumer(ConsumerInfo),
+        dubbo_reference_config:init_reference(ConsumerInfo),
+        logger:info("consumer refer success ~p", [Interface])
               end, ConsumerList),
     ok.
 


[dubbo-erlang] 08/09: merge form master

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit 91faffcf749b1abd4610580fe38ea5ad81bc0e61
Merge: a5a0cde cdfcff9
Author: DLive <xs...@163.com>
AuthorDate: Thu Jun 27 18:58:29 2019 +0800

    merge form master

 .github/ISSUE_TEMPLATE/issue-report-template.md    | 35 ++++++++++++++++++++++
 .../PULL_REQUEST_TEMPLATE/pull_request_template.md | 20 +++++++++++++
 .travis.yml                                        |  2 +-
 README.md                                          |  4 +--
 rebar.config                                       | 13 ++++----
 rebar.lock                                         |  8 ++---
 src/dubbo_serializa_json.erl                       | 30 ++++++++-----------
 7 files changed, 80 insertions(+), 32 deletions(-)


[dubbo-erlang] 02/09: redesign the subscription process

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit 051bb2a1f46cd55f221f2e4ce79c0517a98e5a0f
Author: DLive <xs...@163.com>
AuthorDate: Tue Jun 11 23:21:37 2019 +0800

    redesign the subscription process
---
 src/dubbo_directory.erl          | 27 ++++++++++++++++++++++++---
 src/dubbo_protocol.erl           |  2 +-
 src/dubbo_protocol_dubbo.erl     |  6 +++++-
 src/dubbo_protocol_registry.erl  |  4 ++--
 src/dubbo_reference_config.erl   |  2 +-
 src/dubbo_registry_zookeeper.erl |  2 +-
 6 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/src/dubbo_directory.erl b/src/dubbo_directory.erl
index 2356e7c..08e959c 100644
--- a/src/dubbo_directory.erl
+++ b/src/dubbo_directory.erl
@@ -18,7 +18,7 @@
 
 -behaviour(gen_server).
 
--export([subscribe/2,notify/1]).
+-export([subscribe/2,notify/2]).
 %% API
 -export([start_link/0]).
 
@@ -80,10 +80,31 @@ subscribe(RegistryName,SubcribeUrl)->
             {error,Reason}
     end.
 
-notify(UrlList)->
-    dubbo_consumer_pool:start_consumer(Interface, UrlList),
+notify(Interface,UrlList)->
+    %% @todo if UrlList size is 1, and protocol is empty ,need destroyAllInvokers
+
+    case dubbo_extension:run_fold(protocol,refer,[UrlList],{error,no_protocol}) of
+        {ok,Invokers} ->
+            ok;
+        {error,no_protocol}->
+            error
+    end,
+%%    dubbo_consumer_pool:start_consumer(Interface, UrlList),
     ok.
 
+refresh_invoker(UrlList)->
+    NewInvokers = refresh_invoker(UrlList,[]).
+
+refresh_invoker([Url|Rest],Acc)->
+    case dubbo_extension:run_fold(protocol,refer,[Url],undefined) of
+        undefined ->
+            refresh_invoker(Rest,Acc);
+        {ok,Invoker} ->
+            refresh_invoker(Rest,[Invoker|Acc]);
+        {stop,_}->
+            refresh_invoker(Rest,Acc)
+    end.
+
 %%--------------------------------------------------------------------
 %% @private
 %% @doc
diff --git a/src/dubbo_protocol.erl b/src/dubbo_protocol.erl
index 3c82119..8808fc8 100644
--- a/src/dubbo_protocol.erl
+++ b/src/dubbo_protocol.erl
@@ -16,7 +16,7 @@
 %%------------------------------------------------------------------------------
 -module(dubbo_protocol).
 
--callback refer(InterfaceClassInfo,Url)->ok.
+-callback refer(Url)->ok.
 
 %% API
 -export([refer/2]).
diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl
index 2242939..c2fb6dd 100644
--- a/src/dubbo_protocol_dubbo.erl
+++ b/src/dubbo_protocol_dubbo.erl
@@ -18,4 +18,8 @@
 -author("dlive").
 
 %% API
--export([]).
+-export([refer/1]).
+
+refer(Url)->
+
+    {ok,todo}.
\ No newline at end of file
diff --git a/src/dubbo_protocol_registry.erl b/src/dubbo_protocol_registry.erl
index f1d9bc4..8277c7e 100644
--- a/src/dubbo_protocol_registry.erl
+++ b/src/dubbo_protocol_registry.erl
@@ -20,9 +20,9 @@
 -include("dubboerl.hrl").
 
 %% API
--export([]).
+-export([refer/1]).
 
-refer(InterfaceClassInfo,Url)->
+refer(Url)->
     {ok,UrlInfo} =  dubbo_common_fun:parse_url(Url),
 
     {ok,RegistryName} = dubbo_registry:setup_register(UrlInfo),
diff --git a/src/dubbo_reference_config.erl b/src/dubbo_reference_config.erl
index f6e8bdd..6c58a50 100644
--- a/src/dubbo_reference_config.erl
+++ b/src/dubbo_reference_config.erl
@@ -35,7 +35,7 @@ create_proxy(InitConfigMap)->
     InterfaceClassInfo = #{},
     Para = gen_parameter(),
     Url = gen_registry_url(Para),
-    dubbo_extension:run(protoco_wapper,refer,[InterfaceClassInfo,Url]),
+    dubbo_extension:run(protocol_wapper,refer,[Url]),
     ok.
 
     %%application=hello-world&dubbo=2.0.2&pid=68901&refer=application=hello-world&default.check=false&default.lazy=false&default.retries=0&default.sticky=false&default.timeout=300000&dubbo=2.0.2&interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&lazy=false&methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid=68901&register.ip=127.0.0.1&release=2.7.1&retries=0&side=consumer&sticky=false&timestamp=1559727789953&registry=zookeeper&release=2.7.1&timestamp=1559727842451
diff --git a/src/dubbo_registry_zookeeper.erl b/src/dubbo_registry_zookeeper.erl
index ea9ef62..0b5b3f5 100644
--- a/src/dubbo_registry_zookeeper.erl
+++ b/src/dubbo_registry_zookeeper.erl
@@ -240,7 +240,7 @@ register_provider_path(Provider, State) ->
 get_provider_list(InterfaceName,ZkPid,NotifyFun) ->
     InterfacePath = <<<<"/dubbo/">>/binary, InterfaceName/binary, <<"/providers">>/binary>>,
     ChildList= get_provider_and_start(ZkPid, InterfaceName, InterfacePath),
-    NotifyFun(ChildList),
+    NotifyFun(InterfaceName,ChildList),
     ok.
 get_provider_and_start(Pid, Interface, Path) ->
     case erlzk:get_children(Pid, Path, spawn(dubbo_registry_zookeeper, provider_watcher, [Interface])) of


[dubbo-erlang] 01/09: mod defined for plugin design

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit 07e743aa315bd5a08f9694337f0ebf8f4f3da507
Author: DLive <xs...@163.com>
AuthorDate: Mon Jun 10 23:31:34 2019 +0800

    mod defined for plugin design
---
 config_example/sys.config                          |   2 +-
 include/dubboerl.hrl                               |   5 +-
 rebar.config                                       |   5 +-
 rebar.lock                                         |   4 +
 .../src/main/resources/applicationProvider.xml     |   6 +-
 .../apps/dubbo_sample_service/src/userOperator.erl |   8 +-
 include/dubboerl.hrl => src/dubbo_cluster.erl      |   8 +-
 .../dubboerl.hrl => src/dubbo_cluster_failfast.erl |   8 +-
 src/dubbo_common_fun.erl                           |  60 +++-
 src/dubbo_directory.erl                            | 173 ++++++++++++
 src/dubbo_extension.erl                            | 189 +++++++++++++
 include/dubboerl.hrl => src/dubbo_filter.erl       |   8 +-
 src/dubbo_invoker.erl                              |  78 +----
 .../dubboerl.hrl => src/dubbo_invoker_cluster.erl  |   8 +-
 src/{dubbo_invoker.erl => dubbo_invoker_old.erl}   |   2 +-
 include/dubboerl.hrl => src/dubbo_protocol.erl     |  11 +-
 .../dubboerl.hrl => src/dubbo_protocol_dubbo.erl   |   8 +-
 src/dubbo_protocol_registry.erl                    |  53 ++++
 .../dubbo_provider_consumer_reg_table.erl          |   8 +-
 src/dubbo_reference_config.erl                     |  74 +++++
 src/{dubbo_common_fun.erl => dubbo_registry.erl}   |  38 ++-
 src/dubbo_registry_zookeeper.erl                   | 314 +++++++++++++++++++++
 test/userOperator.erl                              |   8 +-
 23 files changed, 947 insertions(+), 131 deletions(-)

diff --git a/config_example/sys.config b/config_example/sys.config
index b100088..0ed8a87 100644
--- a/config_example/sys.config
+++ b/config_example/sys.config
@@ -23,7 +23,7 @@
         {protocol,hessian},
         {port,20881},
         {consumer,[
-            {<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
+           % {<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
         ]},
         {provider,[
             {dubbo_service_user_impl,userOperator,<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
diff --git a/include/dubboerl.hrl b/include/dubboerl.hrl
index 31204a5..ce6baf1 100644
--- a/include/dubboerl.hrl
+++ b/include/dubboerl.hrl
@@ -18,4 +18,7 @@
 
 -define(PROVIDER_WORKER,provider_worker).
 
--define(TRAFFIC_CONTROL,traffic_control).
\ No newline at end of file
+-define(TRAFFIC_CONTROL,traffic_control).
+
+
+-record(dubbo_url,{scheme,user_info,host,port,path,parameters,fragment}).
\ No newline at end of file
diff --git a/rebar.config b/rebar.config
index aebfaa3..2eb91f2 100644
--- a/rebar.config
+++ b/rebar.config
@@ -21,9 +21,10 @@
 
 {deps, [
     {erlzk, ".*", {git, "https://github.com/huaban/erlzk.git", {tag, "v0.6.2"}}},
-    {ranch, ".*",  {git, "https://github.com/ninenines/ranch.git", {tag, "1.4.0"}}},
+	{ranch, ".*",  {git, "https://github.com/ninenines/ranch.git", {tag, "1.4.0"}}},
     {poolboy, ".*",  {git, "https://github.com/devinus/poolboy.git", {tag, "1.5.1"}}},
-    {jiffy, "0.15.1"}
+    {jiffy, "0.15.1"},
+	{hooks,{git,"https://github.com/benoitc/hooks.git",{tag,"2.1.0"}}}
 ]}.
 
 
diff --git a/rebar.lock b/rebar.lock
index f57b258..cc2515b 100644
--- a/rebar.lock
+++ b/rebar.lock
@@ -3,6 +3,10 @@
   {git,"https://github.com/huaban/erlzk.git",
        {ref,"aa7190ee2343ac1341cea3edc9b9eea36c591708"}},
   0},
+ {<<"hooks">>,
+  {git,"https://github.com/benoitc/hooks.git",
+       {ref,"d4872554a27c0ee9c2166d18000f725f8c3dc8a8"}},
+  0},
  {<<"jiffy">>,{pkg,<<"jiffy">>,<<"0.15.1">>},0},
  {<<"poolboy">>,
   {git,"https://github.com/devinus/poolboy.git",
diff --git a/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml b/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
index 974fabc..425e41e 100644
--- a/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
+++ b/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
@@ -10,8 +10,8 @@
 
     <dubbo:consumer check="false" timeout="300000" id="dubboConsumerConfig" retries="0"/>
 
-    <bean id="userService" class="org.apache.dubbo.erlang.sample.service.impl.UserOperatorImpl"/>
-    <dubbo:service interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" ref="userService"/>
+<!--    <bean id="userService" class="org.apache.dubbo.erlang.sample.service.impl.UserOperatorImpl"/>-->
+<!--    <dubbo:service interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" ref="userService"/>-->
 
-    <!--    <dubbo:reference  id="userInterface" interface="UserOperator" retries="0" />-->
+    <dubbo:reference  id="userInterface" interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" retries="0" />
 </beans>
diff --git a/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl b/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
index b92ca62..d7e0809 100644
--- a/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
+++ b/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
@@ -67,7 +67,7 @@ getUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec genUserId()->
@@ -96,7 +96,7 @@ genUserId( RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserInfo(Arg0::#userInfoRequest{})->
@@ -127,7 +127,7 @@ queryUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserList(Arg0::list())->
@@ -158,5 +158,5 @@ queryUserList(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
diff --git a/include/dubboerl.hrl b/src/dubbo_cluster.erl
similarity index 87%
copy from include/dubboerl.hrl
copy to src/dubbo_cluster.erl
index 31204a5..da4031e 100644
--- a/include/dubboerl.hrl
+++ b/src/dubbo_cluster.erl
@@ -14,8 +14,8 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--define(PROVIDER_IMPL_TABLE,provider_impl_table).
+-module(dubbo_cluster).
+-author("dlive").
 
--define(PROVIDER_WORKER,provider_worker).
-
--define(TRAFFIC_CONTROL,traffic_control).
\ No newline at end of file
+%% API
+-export([]).
diff --git a/include/dubboerl.hrl b/src/dubbo_cluster_failfast.erl
similarity index 87%
copy from include/dubboerl.hrl
copy to src/dubbo_cluster_failfast.erl
index 31204a5..6078517 100644
--- a/include/dubboerl.hrl
+++ b/src/dubbo_cluster_failfast.erl
@@ -14,8 +14,8 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--define(PROVIDER_IMPL_TABLE,provider_impl_table).
+-module(dubbo_cluster_failfast).
+-author("dlive").
 
--define(PROVIDER_WORKER,provider_worker).
-
--define(TRAFFIC_CONTROL,traffic_control).
\ No newline at end of file
+%% API
+-export([]).
diff --git a/src/dubbo_common_fun.erl b/src/dubbo_common_fun.erl
index 6744171..e74f57f 100644
--- a/src/dubbo_common_fun.erl
+++ b/src/dubbo_common_fun.erl
@@ -16,8 +16,9 @@
 %%------------------------------------------------------------------------------
 -module(dubbo_common_fun).
 
+-include("dubboerl.hrl").
 %% API
--export([local_ip_v4/0, local_ip_v4_str/0]).
+-export([local_ip_v4/0, local_ip_v4_str/0, parse_url/1, map_to_url/1]).
 
 local_ip_v4() ->
     {ok, Addrs} = inet:getifaddrs(),
@@ -29,3 +30,60 @@ local_ip_v4() ->
 local_ip_v4_str() ->
     {V1, V2, V3, V4} = local_ip_v4(),
     list_to_binary(io_lib:format("~p.~p.~p.~p", [V1, V2, V3, V4])).
+
+
+-spec(parse_url(Url :: binary()|list()) -> {ok, map()}).
+parse_url(Url) when is_binary(Url) ->
+    parse_url(binary_to_list(Url));
+parse_url(Url) ->
+    case http_uri:parse(Url, []) of
+        {ok, {Scheme, _UserInfo, Host, Port, _Path, Query}} ->
+            QueryStr = case lists:prefix("?", Query) of
+                           true ->
+                               [_ | Query2] = Query,
+                               Query2;
+                           false ->
+                               Query
+                       end,
+            QueryListTmp = string:tokens(QueryStr, "&"),
+            Parameters = parse_url_parameter(QueryListTmp, #{}),
+            Result = #dubbo_url{scheme = Scheme, host = Host, port = Port, parameters = Parameters},
+            {ok, Result};
+        {error, R1} ->
+            {error, R1}
+    end.
+
+
+parse_url_parameter([], Parameters) ->
+    Parameters;
+parse_url_parameter([Item | Rest], Parameters) ->
+    case string:tokens(Item, "=") of
+        KeyPair when length(KeyPair) == 2 ->
+            [Key, Value] = KeyPair,
+            parse_url_parameter(Rest, maps:put(Key, Value, Parameters));
+        KeyPair2 ->
+            logger:error("parse parameter error, keypair ~p", [KeyPair2]),
+            parse_url_parameter(Rest, Parameters)
+    end.
+
+
+map_to_url(UrlInfo) ->
+    ParameterStr =
+        case UrlInfo#dubbo_url.parameters of
+            undefined ->
+                "";
+            Parameter ->
+                KeyValues = maps:to_list(Parameter),
+                KeyValues2 = [io_lib:format("~s=~s", [Key, http_uri:encode(Value)]) || {Key, Value} <= KeyValues],
+                ParameterStr1 = string:join(KeyValues2, "&"),
+                ParameterStr2 = ["?" | ParameterStr1],
+                list_to_binary(ParameterStr2)
+        end,
+    Value = io_lib:format(<<"~s://~s/~s?~s">>,
+        [
+            UrlInfo#dubbo_url.scheme,
+            UrlInfo#dubbo_url.host,
+            UrlInfo#dubbo_url.path,
+            ParameterStr
+        ]),
+    list_to_binary(Value).
\ No newline at end of file
diff --git a/src/dubbo_directory.erl b/src/dubbo_directory.erl
new file mode 100644
index 0000000..2356e7c
--- /dev/null
+++ b/src/dubbo_directory.erl
@@ -0,0 +1,173 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_directory).
+
+-behaviour(gen_server).
+
+-export([subscribe/2,notify/1]).
+%% API
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(start_link() ->
+    {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%%                     {ok, State, Timeout} |
+%%                     ignore |
+%%                     {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+-spec(init(Args :: term()) ->
+    {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term()} | ignore).
+init([]) ->
+    {ok, #state{}}.
+
+subscribe(RegistryName,SubcribeUrl)->
+    try gen_server:call(?SERVER,{subscribe,RegistryName,SubcribeUrl},5000) of
+        ok->
+            ok
+    catch
+        Error:Reason->
+            %% todo improve erro type
+            {error,Reason}
+    end.
+
+notify(UrlList)->
+    dubbo_consumer_pool:start_consumer(Interface, UrlList),
+    ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+    State :: #state{}) ->
+    {reply, Reply :: term(), NewState :: #state{}} |
+    {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
+    {noreply, NewState :: #state{}} |
+    {noreply, NewState :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
+    {stop, Reason :: term(), NewState :: #state{}}).
+handle_call({subscribe,RegistryName,SubcribeUrl}, _From, State) ->
+    NotifyFun= fun dubbo_directory:notify/1,
+    apply(RegistryName,subscribe,[SubcribeUrl,NotifyFun]),
+    {reply, ok, State};
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_cast(Request :: term(), State :: #state{}) ->
+    {noreply, NewState :: #state{}} |
+    {noreply, NewState :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term(), NewState :: #state{}}).
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%%                                   {noreply, State, Timeout} |
+%%                                   {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
+    {noreply, NewState :: #state{}} |
+    {noreply, NewState :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term(), NewState :: #state{}}).
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
+    State :: #state{}) -> term()).
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
+    Extra :: term()) ->
+    {ok, NewState :: #state{}} | {error, Reason :: term()}).
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
diff --git a/src/dubbo_extension.erl b/src/dubbo_extension.erl
new file mode 100644
index 0000000..4e5eb0c
--- /dev/null
+++ b/src/dubbo_extension.erl
@@ -0,0 +1,189 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_extension).
+-behaviour(gen_server).
+
+%% API
+-export([run/3,run_fold/4,register/3,unregister/3]).
+
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+
+-define(TAB, ?MODULE).
+
+-record(state, {}).
+
+
+-spec reg(HookName::hookname(), Module::atom(),Priority::integer()) -> ok | {error, term()}.
+register(HookName, Module,Priority) ->
+    gen_server:call(?MODULE, {register, HookName, {Priority, {Module}}}).
+
+-spec unregister(HookName::hookname(), Module::atom(),Priority::integer()) -> ok.
+unregister(HookName, Module,Priority) ->
+    gen_server:call(?MODULE, {unregister, HookName, {Priority, Module}}).
+
+%% @doc run all hooks registered for the HookName.
+%% Execution can be interrupted if an hook return the atom `stop'.
+-spec run(HookName::hookname(), Args::list()) -> ok.
+run(HookName,Fun, Args) ->
+    case find_hooks(HookName) of
+        no_hook -> ok;
+        Hooks -> run1(Hooks, HookName,Fun, Args)
+    end.
+
+run1([], _HookName,_Fun, _Args) ->
+    ok;
+run1([M | Rest], HookName, Fun, Args) ->
+    Ret = (catch apply(M, Fun, Args)),
+    case Ret of
+        {'EXIT', Reason} ->
+            logger:error("~p~n error running extension: ~p~n", [HookName, Reason]),
+            run1(Rest, HookName,Fun, Args);
+        stop ->
+            ok;
+        _ ->
+            run1(Rest, HookName,Fun, Args)
+    end.
+
+-spec run_fold(HookName::hookname(), Args::list(), Acc::any()) -> Acc2::any().
+run_fold(HookName, Fun, Args, Acc) ->
+    case find_hooks(HookName) of
+        no_hook -> Acc;
+        Hooks -> run_fold1(Hooks,HookName, Fun, Args, Acc)
+    end.
+
+
+run_fold1([], _HookName,_Fun, _Args,  Acc) ->
+    Acc;
+run_fold1([M | Rest], HookName,Fun, Args0,  Acc) ->
+    Args = Args0 ++ [Acc],
+    Ret = (catch apply(M, Fun, Args)),
+    case Ret of
+        {'EXIT', Reason} ->
+            error_logger:error_msg("~p~n error running hook: ~p~n",
+                [HookName, Reason]),
+            run_fold1(Rest, HookName,Fun,Args0, Acc);
+        stop ->
+            Acc;
+        {stop, NewAcc} ->
+            NewAcc;
+        _ ->
+            run_fold1(Rest, HookName,Fun,Args0, Ret)
+    end.
+
+
+
+
+%% @doc retrieve the lists of registered functions for an hook.
+-spec find(HookName::hookname()) -> {ok, [{atom(), atom()}]} | error.
+find(HookName) ->
+    case ?find_hook(HookName) of
+        no_hook -> error;
+        Hooks -> {ok, Hooks}
+    end.
+
+%% @hidden
+start_link() ->
+    _ = init_tabs(),
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init_tabs() ->
+    case ets:info(?TAB, name) of
+        undefined ->
+            ets:new(?TAB, [ordered_set, public, named_table,
+                {read_concurrency, true},
+                {write_concurrency, true}]);
+        _ ->
+            true
+    end.
+
+%% @hidden
+init([]) ->
+    {ok, #state{}}.
+
+%% @hidden
+handle_call({register, HookName, {Priority, Module}}, _From, State) ->
+    do_register(HookName, {Priority, Module}),
+    {reply, ok, State};
+handle_call({unregister, HookName, {Priority, Module}}, _From, State) ->
+    do_unregister(HookName, {Priority, Module}),
+    {reply, ok, State};
+handle_call(_Msg, _From, State) ->
+    {reply, badarg, State}.
+
+%% @hidden
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%% @hidden
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% @hidden
+terminate(_Reason, _Srv) ->
+    ok.
+
+do_register(HookName, {_Priority, ModuleName}=Hook) ->
+    check_module(ModuleName),
+    update_hooks(HookName, [Hook]).
+
+
+do_unregister(HookName, Hook) ->
+    remove_hooks(HookName, [Hook]),
+    ok.
+
+update_hooks(HookName, HookFuns) ->
+    case ets:lookup(?TAB, HookName) of
+        [] ->
+            true = ets:insert(?TAB, {HookName, HookFuns});
+        [{_, Funs}] ->
+            Funs2 = lists:keysort(1, Funs ++ HookFuns),
+            true = ets:insert(?TAB, {HookName, Funs2})
+    end.
+
+remove_hooks(HookName, HookFuns) ->
+    case ets:lookup(?TAB, HookName) of
+        [] ->
+            ok;
+        [{_, Funs}] ->
+            Funs2 = Funs -- HookFuns,
+            case Funs2 of
+                [] ->
+                    ets:delete(?TAB, HookName);
+                _ ->
+                    ets:insert(?TAB, {HookName, Funs2})
+            end
+    end.
+
+check_module(ModuleName) ->
+    _ = code:ensure_loaded(ModuleName),
+    ok.
+
+find_hooks(HookName)->
+    case ets:lookup(?TAB,HookName) of
+        []->
+            no_hook;
+        [{_, Modules}]->
+            Modules
+    end.
diff --git a/include/dubboerl.hrl b/src/dubbo_filter.erl
similarity index 87%
copy from include/dubboerl.hrl
copy to src/dubbo_filter.erl
index 31204a5..7fa7950 100644
--- a/include/dubboerl.hrl
+++ b/src/dubbo_filter.erl
@@ -14,8 +14,8 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--define(PROVIDER_IMPL_TABLE,provider_impl_table).
+-module(dubbo_filter).
+-author("dlive").
 
--define(PROVIDER_WORKER,provider_worker).
-
--define(TRAFFIC_CONTROL,traffic_control).
\ No newline at end of file
+%% API
+-export([]).
diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker.erl
index b29751c..0a3527b 100644
--- a/src/dubbo_invoker.erl
+++ b/src/dubbo_invoker.erl
@@ -16,82 +16,8 @@
 %%------------------------------------------------------------------------------
 -module(dubbo_invoker).
 
--include("dubbo.hrl").
 %% API
--export([invoke_request/2, invoke_request/3, invoke_request/5]).
+-export([]).
 
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) ->
-    {ok, reference()}|
-    {ok, reference(), Data :: any(), RpcContent :: list()}|
-    {error, Reason :: timeout|no_provider|any()}.
-invoke_request(Interface, Request) ->
-    invoke_request(Interface, Request, [], #{}, self()).
 
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RequestOption :: map()) ->
-    {ok, reference()}|
-    {ok, reference(), Data :: any(), RpcContent :: list()}|
-    {error, Reason :: timeout|no_provider|any()}.
-invoke_request(Interface, Request, RequestOption) ->
-    invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
-
-
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
-    {ok, reference()}|
-    {ok, reference(), Data :: any(), RpcContent :: list()}|
-    {error, Reason :: timeout|no_provider|request_full|any()}.
-invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
-    case dubbo_consumer_pool:select_connection(Interface, Request#dubbo_request.mid) of
-        {ok, #connection_info{pid = Pid, host_flag = HostFlag}} ->
-            case dubbo_traffic_control:check_goon(HostFlag, 199) of
-                ok ->
-                    Request2 = merge_attachments(Request, RpcContext),
-                    {ok, RequestData} = dubbo_codec:encode_request(Request2),
-                    Ref = get_ref(RequestState),
-                    gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}),
-                    case is_sync(RequestState) of
-                        true ->
-                            sync_receive(Ref, get_timeout(RequestState));
-                        false -> {ok, Ref}
-                    end;
-                full ->
-                    {error, request_full}
-            end;
-        {error, none} ->
-            logger:error("[INVOKE] ~p error Reason no_provider", [Interface]),
-            {error, no_provider}
-    end.
-
-
-is_sync(Option) ->
-    maps:is_key(sync, Option).
-get_ref(Option) ->
-    maps:get(ref, Option, make_ref()).
-
-get_timeout(Option) ->
-    maps:get(timeout, Option, ?REQUEST_TIME_OUT).
-
-
-sync_receive(Ref, TimeOut) ->
-    receive
-        {'$gen_cast', {response_process, Ref, RpcContent, Response}} ->
-            {ok, Ref, Response, RpcContent}
-    after
-        TimeOut ->
-            {error, timeout}
-    end.
-merge_attachments(#dubbo_request{data = null} = Request, _Option) ->
-    Request;
-merge_attachments(Request, Option) ->
-    Attachements = Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
-    case lists:keyfind(attachments, 1, Option) of
-        false -> OptionAttachments = [];
-        {attachments, OptionAttachments} ->
-            OptionAttachments
-    end,
-    List = [
-        {<<"version">>, <<"0.0.0">>},
-        {<<"timeout">>, <<"5000">>}
-    ],
-    Attachements2 = lists:merge3(Attachements, OptionAttachments, List),
-    Data2 = Request#dubbo_request.data#dubbo_rpc_invocation{attachments = Attachements2},
-    Request#dubbo_request{data = Data2}.
+-callback(invoke(Invoker,Invocation) -> ok).
\ No newline at end of file
diff --git a/include/dubboerl.hrl b/src/dubbo_invoker_cluster.erl
similarity index 87%
copy from include/dubboerl.hrl
copy to src/dubbo_invoker_cluster.erl
index 31204a5..906a0fa 100644
--- a/include/dubboerl.hrl
+++ b/src/dubbo_invoker_cluster.erl
@@ -14,8 +14,8 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--define(PROVIDER_IMPL_TABLE,provider_impl_table).
+-module(dubbo_invoker_cluster).
+-author("dlive").
 
--define(PROVIDER_WORKER,provider_worker).
-
--define(TRAFFIC_CONTROL,traffic_control).
\ No newline at end of file
+%% API
+-export([]).
diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker_old.erl
similarity index 99%
copy from src/dubbo_invoker.erl
copy to src/dubbo_invoker_old.erl
index b29751c..c878656 100644
--- a/src/dubbo_invoker.erl
+++ b/src/dubbo_invoker_old.erl
@@ -14,7 +14,7 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(dubbo_invoker).
+-module(dubbo_invoker_old).
 
 -include("dubbo.hrl").
 %% API
diff --git a/include/dubboerl.hrl b/src/dubbo_protocol.erl
similarity index 83%
copy from include/dubboerl.hrl
copy to src/dubbo_protocol.erl
index 31204a5..3c82119 100644
--- a/include/dubboerl.hrl
+++ b/src/dubbo_protocol.erl
@@ -14,8 +14,13 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--define(PROVIDER_IMPL_TABLE,provider_impl_table).
+-module(dubbo_protocol).
 
--define(PROVIDER_WORKER,provider_worker).
+-callback refer(InterfaceClassInfo,Url)->ok.
 
--define(TRAFFIC_CONTROL,traffic_control).
\ No newline at end of file
+%% API
+-export([refer/2]).
+
+
+refer(InterfaceClassInfo,Url)->
+    dubbo_hooker(protocol_wapper,refer,[InterfaceClassInfo,Url]).
\ No newline at end of file
diff --git a/include/dubboerl.hrl b/src/dubbo_protocol_dubbo.erl
similarity index 87%
copy from include/dubboerl.hrl
copy to src/dubbo_protocol_dubbo.erl
index 31204a5..2242939 100644
--- a/include/dubboerl.hrl
+++ b/src/dubbo_protocol_dubbo.erl
@@ -14,8 +14,8 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--define(PROVIDER_IMPL_TABLE,provider_impl_table).
+-module(dubbo_protocol_dubbo).
+-author("dlive").
 
--define(PROVIDER_WORKER,provider_worker).
-
--define(TRAFFIC_CONTROL,traffic_control).
\ No newline at end of file
+%% API
+-export([]).
diff --git a/src/dubbo_protocol_registry.erl b/src/dubbo_protocol_registry.erl
new file mode 100644
index 0000000..f1d9bc4
--- /dev/null
+++ b/src/dubbo_protocol_registry.erl
@@ -0,0 +1,53 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_protocol_registry).
+-behaviour(dubbo_protocol).
+
+-include("dubboerl.hrl").
+
+%% API
+-export([]).
+
+refer(InterfaceClassInfo,Url)->
+    {ok,UrlInfo} =  dubbo_common_fun:parse_url(Url),
+
+    {ok,RegistryName} = dubbo_registry:setup_register(UrlInfo),
+
+    ConsumerUrl = gen_consumer_url(UrlInfo),
+    %% 通知directory
+    dubbo_registry:register(RegistryName,ConsumerUrl),
+
+    dubbo_directory:subscribe(RegistryName,ConsumerUrl),
+
+    %% return
+    ok.
+
+
+gen_consumer_url(UrlInfo)->
+    Parameters = UrlInfo#dubbo_url.parameters,
+    #{<<"refer">> := Refer} = Parameters,
+    Refer2 = http_uri:decode(Refer),
+    Parameters2 = dubbo_common_fun:parse_url(Refer2,#{}),
+    #{<<"interface">> := Interface} = Parameters2,
+    ConsumerUrlInfo = UrlInfo#dubbo_url{
+        scheme = <<"consumer">>,
+        host = dubbo_common_fun:local_ip_v4_str(),
+        path = Interface,
+        parameters = Parameters2
+    },
+    ConsumerUrl = dubbo_common_fun:map_to_url(ConsumerUrlInfo),
+    ConsumerUrl.
\ No newline at end of file
diff --git a/include/dubboerl.hrl b/src/dubbo_provider_consumer_reg_table.erl
similarity index 87%
copy from include/dubboerl.hrl
copy to src/dubbo_provider_consumer_reg_table.erl
index 31204a5..3386cdc 100644
--- a/include/dubboerl.hrl
+++ b/src/dubbo_provider_consumer_reg_table.erl
@@ -14,8 +14,8 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--define(PROVIDER_IMPL_TABLE,provider_impl_table).
+-module(dubbo_provider_consumer_reg_table).
+-author("dlive").
 
--define(PROVIDER_WORKER,provider_worker).
-
--define(TRAFFIC_CONTROL,traffic_control).
\ No newline at end of file
+%% API
+-export([]).
diff --git a/src/dubbo_reference_config.erl b/src/dubbo_reference_config.erl
new file mode 100644
index 0000000..f6e8bdd
--- /dev/null
+++ b/src/dubbo_reference_config.erl
@@ -0,0 +1,74 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_reference_config).
+
+-record(dubbo_interface_info,{}).
+
+%% API
+-export([]).
+
+init_reference()->
+    InitConfigMap= #{
+
+    },
+    %% 组装各类需要数据
+    ok.
+
+
+create_proxy(InitConfigMap)->
+
+
+    InterfaceClassInfo = #{},
+    Para = gen_parameter(),
+    Url = gen_registry_url(Para),
+    dubbo_extension:run(protoco_wapper,refer,[InterfaceClassInfo,Url]),
+    ok.
+
+    %%application=hello-world&dubbo=2.0.2&pid=68901&refer=application=hello-world&default.check=false&default.lazy=false&default.retries=0&default.sticky=false&default.timeout=300000&dubbo=2.0.2&interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&lazy=false&methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid=68901&register.ip=127.0.0.1&release=2.7.1&retries=0&side=consumer&sticky=false&timestamp=1559727789953&registry=zookeeper&release=2.7.1&timestamp=1559727842451
+
+
+gen_registry_url(Para)->
+    %%todo 组装para & url
+
+    Url = "registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hello-world&dubbo=2.0.2&pid=68901&refer=application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127.0.0.1%26 [...]
+    Url.
+gen_parameter()->
+    Para = #{
+        <<"application">> => get_appname(),
+        <<"dubbo">> => <<"2.0.2">>,
+        <<"pid">> => get_pid(),
+        <<"refer">> => get_refinfo(),
+        <<"registry">> => get_registry_type(),
+        <<"release">> => <<"2.7.1">>,
+        <<"timestamp">> => <<"1559727842451">>
+    },
+
+    Para.
+
+get_appname()->
+    %%todo
+    <<"hello-world">>.
+get_pid()->
+    %%todo
+    <<"68901">>.
+get_refinfo()->
+    %%todo
+    <<"application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127..0.1%26release%3D2.7.1%26retries%3D0%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1559727789953">>.
+
+get_registry_type()->
+    %%todo
+    <<"zookeeper">>.
\ No newline at end of file
diff --git a/src/dubbo_common_fun.erl b/src/dubbo_registry.erl
similarity index 50%
copy from src/dubbo_common_fun.erl
copy to src/dubbo_registry.erl
index 6744171..89eb64b 100644
--- a/src/dubbo_common_fun.erl
+++ b/src/dubbo_registry.erl
@@ -14,18 +14,34 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(dubbo_common_fun).
+-module(dubbo_registry).
+-include("dubboerl.hrl").
+
+-callback start(Url :: binary) -> ok.
+-callback register(Url::binary())-> term().
+-callback subscribe(SubcribeUrl::binary(),NotifyFun::function())->ok.
 
 %% API
--export([local_ip_v4/0, local_ip_v4_str/0]).
+-export([setup_register/1,register/2]).
+
+-spec(setup_register(UrlInfo :: map()) -> {ok, RegistryProcessName :: atom()}|{error, term()}).
+setup_register(UrlInfo) ->
+    RegistryModuleName = get_registry_module(UrlInfo),
+    case whereis(RegistryModuleName) of
+        undefined ->
+            apply(RegistryModuleName, start, [UrlInfo]),
+            {ok, RegistryModuleName};
+        _ ->
+            {ok, RegistryModuleName}
+    end.
+
+register(RegistryName,Url) ->
+    logger:info("call ~p register url ~p",[RegistryName,Url]),
+    Result = apply(RegistryName,register,[Url]),
+    Result.
 
-local_ip_v4() ->
-    {ok, Addrs} = inet:getifaddrs(),
-    hd([
-        Addr || {_, Opts} <- Addrs, {addr, Addr} <- Opts,
-        size(Addr) == 4, Addr =/= {127, 0, 0, 1}
-    ]).
 
-local_ip_v4_str() ->
-    {V1, V2, V3, V4} = local_ip_v4(),
-    list_to_binary(io_lib:format("~p.~p.~p.~p", [V1, V2, V3, V4])).
+get_registry_module(Info) ->
+    RegistryName = Info#dubbo_url.scheme,
+    FullName = << <<"dubbo_registry_">>, RegistryName/binary>>,
+    binary_to_existing_atom(FullName).
\ No newline at end of file
diff --git a/src/dubbo_registry_zookeeper.erl b/src/dubbo_registry_zookeeper.erl
new file mode 100644
index 0000000..ea9ef62
--- /dev/null
+++ b/src/dubbo_registry_zookeeper.erl
@@ -0,0 +1,314 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_registry_zookeeper).
+-behaviour(gen_server).
+-behaviour(dubbo_registry).
+
+-include("dubbo.hrl").
+-include("dubboerl.hrl").
+%% API
+-export([start_link/0, register_consumer/1, register_consumer/2, gen_consumer_node_info/1, register_provider/1, provider_watcher/1]).
+
+-export([start/1,register/1,subscribe/2]).
+%% gen_server callbacks
+-export([init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {zk_pid,notify_fun}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(start_link() ->
+    {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%%                     {ok, State, Timeout} |
+%%                     ignore |
+%%                     {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+-spec(init(Args :: term()) ->
+    {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term()} | ignore).
+init([]) ->
+    {ok, Pid} = connection(),
+    {ok, #state{zk_pid = Pid}}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+    State :: #state{}) ->
+    {reply, Reply :: term(), NewState :: #state{}} |
+    {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
+    {noreply, NewState :: #state{}} |
+    {noreply, NewState :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
+    {stop, Reason :: term(), NewState :: #state{}}).
+
+handle_call({add_consumer, Interface,ConsumerUrl}, _From, State) ->
+    add_consumer(Interface,ConsumerUrl, State),
+    {reply, ok, State};
+handle_call({add_provider, Provider}, _From, State) ->
+    register_provider_path(Provider, State),
+    {reply, ok, State};
+handle_call({subscribe_provider,InterfaceName,NotifyFun}, _From, #state{zk_pid = ZkPid} = State) ->
+    NewState=State#state{notify_fun = NotifyFun},
+    get_provider_list(InterfaceName,ZkPid,NotifyFun),
+    {reply, ok, NewState};
+
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_cast(Request :: term(), State :: #state{}) ->
+    {noreply, NewState :: #state{}} |
+    {noreply, NewState :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term(), NewState :: #state{}}).
+handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid} = State) ->
+    get_provider_and_start(Pid, Interface, Path),
+    {noreply, State};
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%%                                   {noreply, State, Timeout} |
+%%                                   {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
+    {noreply, NewState :: #state{}} |
+    {noreply, NewState :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term(), NewState :: #state{}}).
+handle_info(_Info, State) ->
+    logger:info("zk server recv msg:~p", [_Info]),
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
+    State :: #state{}) -> term()).
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
+    Extra :: term()) ->
+    {ok, NewState :: #state{}} | {error, Reason :: term()}).
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+
+%%----------------------------------------------
+%% dubbo_registry
+%%----------------------------------------------
+start(Url) ->
+    ok.
+register(Url)->
+    {ok,UrlInfo} = dubbo_common_fun:parse_url(Url),
+    InterfaceName = maps:get(<<"interface">>,UrlInfo#dubbo_url.parameters),
+    register(UrlInfo#dubbo_url.scheme,InterfaceName,Url),
+    ok.
+
+register(<<"consumer">>,InterfaceName,Url)->
+    gen_server:call(?SERVER, {add_consumer,InterfaceName, Url}),
+    ok;
+register(<<"provider">>,InterfaceName,Url)->
+
+    ok.
+
+subscribe(SubcribeUrl,NotifyFun)->
+    {ok,UrlInfo} = dubbo_common_fun:parse_url(SubcribeUrl),
+    InterfaceName = maps:get(<<"interface">>,UrlInfo#dubbo_url.parameters),
+    try gen_server:call(?SERVER,{subscribe_provider,InterfaceName,NotifyFun},5000) of
+        ok->
+            ok
+    catch
+        Error:Reason->
+            %%todo improve error type
+            {error,Reason}
+    end.
+
+register_consumer(Consumer) ->
+    gen_server:call(?SERVER, {add_consumer, Consumer}),
+    ok.
+register_consumer(Name, Option) ->
+    Consumer = #consumer_config{interface = Name, methods = [<<"testa">>, <<"testb">>]},
+    register_consumer(Consumer),
+    ok.
+register_provider(Provider) ->
+    gen_server:call(?SERVER, {add_provider, Provider}),
+    ok.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+connection() ->
+    {ok, List} = application:get_env(dubboerl, zookeeper_list),
+    {ok, Pid} = erlzk:connect(List, 30000, [
+        {chroot, "/"},
+        {monitor, self()}]),
+    {ok, Pid}.
+
+add_consumer(InterfaceName,ConsumerUrl, State) ->
+    Pid = State#state.zk_pid,
+%%    ConsumerNode = gen_consumer_node_info(Consumer),
+    ConsumerNode2 = list_to_binary(edoc_lib:escape_uri(binary_to_list(ConsumerUrl))),
+    check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {InterfaceName, p}, {<<"consumers">>, p}, {ConsumerNode2, e}]),
+    %% todo
+%%    get_provider_list(Consumer, State),
+    ok.
+register_provider_path(Provider, State) ->
+    #state{zk_pid = Pid} = State,
+    ProviderNode = dubbo_node_config_util:gen_provider_info(Provider),
+    check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Provider#provider_config.interface, p}, {<<"providers">>, p}, {ProviderNode, e}]),
+    ok.
+
+
+get_provider_list(InterfaceName,ZkPid,NotifyFun) ->
+    InterfacePath = <<<<"/dubbo/">>/binary, InterfaceName/binary, <<"/providers">>/binary>>,
+    ChildList= get_provider_and_start(ZkPid, InterfaceName, InterfacePath),
+    NotifyFun(ChildList),
+    ok.
+get_provider_and_start(Pid, Interface, Path) ->
+    case erlzk:get_children(Pid, Path, spawn(dubbo_registry_zookeeper, provider_watcher, [Interface])) of
+        {ok, ChildList} ->
+            logger:debug("get provider list ~p", [ChildList]),
+%%            start_provider_process(Interface, ChildList),
+            ChildList;
+        {error, R1} ->
+            logger:debug("[add_consumer] get_provider_list error ~p ~p", [R1]),
+            []
+    end.
+
+provider_watcher(Interface) ->
+    receive
+        {node_children_changed, Path} ->
+            gen_server:cast(?SERVER, {provider_node_change, Interface, Path}),
+            logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]);
+        {Event, Path} ->
+%%            Path = "/a",
+%%            Event = node_created
+            logger:debug("provider_watcher get event ~p ~p", [Event, Path])
+    end,
+    ok.
+
+
+create_path(Pid, Path, CreateType) ->
+    case erlzk:create(Pid, Path, CreateType) of
+        {ok, ActualPath} ->
+            logger:debug("[add_consumer] create zk path  success ~p", [ActualPath]),
+            ok;
+        {error, R1} ->
+            logger:debug("[add_consumer] create zk path error ~p ~p", [Path, R1])
+    end,
+    ok.
+check_and_create_path(_Pid, _RootPath, []) ->
+    ok;
+check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) ->
+    CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>,
+    case erlzk:exists(Pid, CheckPath) of
+        {ok, Stat} ->
+            check_and_create_path(Pid, CheckPath, Rst);
+        {error, no_node} ->
+            logger:debug("[add_consumer] check_and_create_path unexist no_node ~p", [CheckPath]),
+            create_path(Pid, CheckPath, CreateType),
+            check_and_create_path(Pid, CheckPath, Rst);
+        {error, R1} ->
+            logger:debug("[add_consumer] check_and_create_path unexist ~p", [R1]),
+            check_and_create_path(Pid, CheckPath, Rst)
+    end.
+
+gen_consumer_node_info(Consumer) ->
+    %% revision参数字段的作用是什么? 暂时不添加
+    Methods = dubbo_lists_util:join(Consumer#consumer_config.methods, <<",">>),
+    Value = io_lib:format(<<"consumer://~s/~s?application=~s&category=~s&check=~p&default.timeout=~p&dubbo=~s&interface=~s&methods=~s&side=~s&timestamp=~p">>,
+        [dubbo_common_fun:local_ip_v4_str(),
+            Consumer#consumer_config.interface,
+            Consumer#consumer_config.application,
+            Consumer#consumer_config.category,
+            Consumer#consumer_config.check,
+            Consumer#consumer_config.default_timeout,
+            Consumer#consumer_config.dubbo_version,
+            Consumer#consumer_config.interface,
+            Methods,
+            Consumer#consumer_config.side,
+            dubbo_time_util:timestamp_ms()
+        ]),
+    list_to_binary(Value).
+
+%%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
+start_provider_process(Interface, ProviderList) ->
+    dubbo_consumer_pool:start_consumer(Interface, ProviderList).
\ No newline at end of file
diff --git a/test/userOperator.erl b/test/userOperator.erl
index 0d35917..e8567ae 100644
--- a/test/userOperator.erl
+++ b/test/userOperator.erl
@@ -67,7 +67,7 @@ getUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec genUserId()->
@@ -96,7 +96,7 @@ genUserId( RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserInfo(Arg0::#userInfoRequest{})->
@@ -127,7 +127,7 @@ queryUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserList(Arg0::list())->
@@ -158,7 +158,7 @@ queryUserList(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 test() ->
     queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}).
\ No newline at end of file


[dubbo-erlang] 03/09: redesign the protocol callback

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit fc0072d302c39140aef5bfd6ec10b8b8fbc1b93f
Author: DLive <xs...@163.com>
AuthorDate: Mon Jun 17 23:22:58 2019 +0800

    redesign the protocol callback
---
 src/dubbo_common_fun.erl     |  2 +-
 src/dubbo_directory.erl      |  7 +------
 src/dubbo_protocol.erl       | 14 +++++++-------
 src/dubbo_protocol_dubbo.erl | 18 ++++++++++++++----
 4 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/src/dubbo_common_fun.erl b/src/dubbo_common_fun.erl
index e74f57f..6717fac 100644
--- a/src/dubbo_common_fun.erl
+++ b/src/dubbo_common_fun.erl
@@ -32,7 +32,7 @@ local_ip_v4_str() ->
     list_to_binary(io_lib:format("~p.~p.~p.~p", [V1, V2, V3, V4])).
 
 
--spec(parse_url(Url :: binary()|list()) -> {ok, map()}).
+-spec(parse_url(Url :: binary()|list()) -> {ok, #dubbo_url{}}).
 parse_url(Url) when is_binary(Url) ->
     parse_url(binary_to_list(Url));
 parse_url(Url) ->
diff --git a/src/dubbo_directory.erl b/src/dubbo_directory.erl
index 08e959c..3674a91 100644
--- a/src/dubbo_directory.erl
+++ b/src/dubbo_directory.erl
@@ -83,12 +83,7 @@ subscribe(RegistryName,SubcribeUrl)->
 notify(Interface,UrlList)->
     %% @todo if UrlList size is 1, and protocol is empty ,need destroyAllInvokers
 
-    case dubbo_extension:run_fold(protocol,refer,[UrlList],{error,no_protocol}) of
-        {ok,Invokers} ->
-            ok;
-        {error,no_protocol}->
-            error
-    end,
+    refresh_invoker(UrlList),
 %%    dubbo_consumer_pool:start_consumer(Interface, UrlList),
     ok.
 
diff --git a/src/dubbo_protocol.erl b/src/dubbo_protocol.erl
index 8808fc8..7ecfcd8 100644
--- a/src/dubbo_protocol.erl
+++ b/src/dubbo_protocol.erl
@@ -16,11 +16,11 @@
 %%------------------------------------------------------------------------------
 -module(dubbo_protocol).
 
--callback refer(Url)->ok.
+-callback refer(Url,Acc)->ok.
 
-%% API
--export([refer/2]).
-
-
-refer(InterfaceClassInfo,Url)->
-    dubbo_hooker(protocol_wapper,refer,[InterfaceClassInfo,Url]).
\ No newline at end of file
+%%%% API
+%%-export([refer/2]).
+%%
+%%
+%%refer(InterfaceClassInfo,Url)->
+%%    dubbo_hooker(protocol_wapper,refer,[InterfaceClassInfo,Url]).
\ No newline at end of file
diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl
index c2fb6dd..06c36e6 100644
--- a/src/dubbo_protocol_dubbo.erl
+++ b/src/dubbo_protocol_dubbo.erl
@@ -15,11 +15,21 @@
 %% limitations under the License.
 %%------------------------------------------------------------------------------
 -module(dubbo_protocol_dubbo).
--author("dlive").
+
+-include("dubboerl.hrl").
 
 %% API
--export([refer/1]).
+-export([refer/2]).
+
+refer(Url,Acc)->
+    {ok,UrlInfo} = dubbo_common_fun:parse_url(Url),
+    case UrlInfo#dubbo_url.scheme of
+        <<"dubbo">> ->
+            {ok,todo};
+        _ ->
+            {skip,Acc}
+    end.
 
-refer(Url)->
+do_refer(UrlInfo)->
 
-    {ok,todo}.
\ No newline at end of file
+    ok.
\ No newline at end of file


[dubbo-erlang] 05/09: dev reference ref process

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit 216b5a7bffebcdeb922bc26bcfa96fca270290db
Author: DLive <xs...@163.com>
AuthorDate: Sat Jun 22 23:39:18 2019 +0800

    dev reference ref process
---
 include/dubbo.hrl                         |  4 +-
 include/dubboerl.hrl                      | 10 +++--
 src/dubbo_directory.erl                   | 29 ++++++++++++-
 src/dubbo_exchanger.erl                   | 18 +++-----
 src/dubbo_protocol_dubbo.erl              | 69 +++++++++++++++++--------------
 src/dubbo_provider_consumer_reg_table.erl | 45 ++++++++++++--------
 6 files changed, 107 insertions(+), 68 deletions(-)

diff --git a/include/dubbo.hrl b/include/dubbo.hrl
index 727c71e..f2d4048 100644
--- a/include/dubbo.hrl
+++ b/include/dubbo.hrl
@@ -102,8 +102,8 @@
 
 
 -record(interface_list, {interface, pid, connection_info}).
--record(provider_node_list, {host_flag, pid, weight, readonly = false}).
--record(connection_info, {connection_id, pid, weight, host_flag, readonly = false}).
+%%-record(provider_node_list, {host_flag, pid, weight, readonly = false}).
+-record(connection_info, {host_flag, pid, weight, readonly = false}).
 
 -type dubbo_request() :: #dubbo_request{}.
 -type dubbo_response() :: #dubbo_response{}.
\ No newline at end of file
diff --git a/include/dubboerl.hrl b/include/dubboerl.hrl
index ce6baf1..a9be5f1 100644
--- a/include/dubboerl.hrl
+++ b/include/dubboerl.hrl
@@ -14,11 +14,13 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--define(PROVIDER_IMPL_TABLE,provider_impl_table).
+-define(PROVIDER_IMPL_TABLE, provider_impl_table).
 
--define(PROVIDER_WORKER,provider_worker).
+-define(PROVIDER_WORKER, provider_worker).
 
--define(TRAFFIC_CONTROL,traffic_control).
+-define(TRAFFIC_CONTROL, traffic_control).
 
 
--record(dubbo_url,{scheme,user_info,host,port,path,parameters,fragment}).
\ No newline at end of file
+-record(dubbo_url, {scheme, user_info, host, port, path, parameters, fragment}).
+
+-record(dubbo_invoker, {host_flag, handle}).
\ No newline at end of file
diff --git a/src/dubbo_directory.erl b/src/dubbo_directory.erl
index 3674a91..07a6dff 100644
--- a/src/dubbo_directory.erl
+++ b/src/dubbo_directory.erl
@@ -17,7 +17,7 @@
 -module(dubbo_directory).
 
 -behaviour(gen_server).
-
+-include("dubboerl.hrl").
 -export([subscribe/2,notify/2]).
 %% API
 -export([start_link/0]).
@@ -87,8 +87,23 @@ notify(Interface,UrlList)->
 %%    dubbo_consumer_pool:start_consumer(Interface, UrlList),
     ok.
 
+
 refresh_invoker(UrlList)->
-    NewInvokers = refresh_invoker(UrlList,[]).
+    case pick_interface(UrlList) of
+        {error,Reason}->
+            fail;
+        {"empty",Interface}->
+            todo_destroy;
+        {_,Interface} ->
+            OldProviderHosts = dubbo_provider_consumer_reg_table:get_interface_provider_node(Interface),
+            NewInvokers = refresh_invoker(UrlList,[]),
+            NewProviderHosts = [Item#dubbo_invoker.host_flag || Item <- NewInvokers],
+            DeleteProverList = OldProviderHosts -- NewProviderHosts,
+            dubbo_provider_consumer_reg_table:clean_invalid_provider(DeleteProverList)
+
+    end.
+%%    OldProviderHosts =
+
 
 refresh_invoker([Url|Rest],Acc)->
     case dubbo_extension:run_fold(protocol,refer,[Url],undefined) of
@@ -100,6 +115,16 @@ refresh_invoker([Url|Rest],Acc)->
             refresh_invoker(Rest,Acc)
     end.
 
+pick_interface([Url | _]) ->
+    case dubbo_common_fun:parse_url(Url) of
+        {ok,UrlInfo}->
+            Interface = maps:get("interface",UrlInfo#dubbo_url.parameters),
+            {UrlInfo#dubbo_url.scheme,Interface};
+        {error,Reason} ->
+            {error,Reason}
+    end.
+
+
 %%--------------------------------------------------------------------
 %% @private
 %% @doc
diff --git a/src/dubbo_exchanger.erl b/src/dubbo_exchanger.erl
index 09a4833..6fb8496 100644
--- a/src/dubbo_exchanger.erl
+++ b/src/dubbo_exchanger.erl
@@ -21,20 +21,14 @@
 %% API
 -export([connect/2]).
 
-connect(Url,Handler) ->
-    case dubbo_node_config_util:parse_provider_info(Url) of
-        {ok, ProviderConfig} ->
-            HostFlag= dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
-            {ok, Pid} = dubbo_transport_pool_sup:add_children(ProviderConfig,Handler),
-            logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
-            {ok,#connection_info{ pid = Pid, weight = get_weight(ProviderConfig), host_flag = HostFlag}};
-        {error, R1} ->
-            logger:error("parse provider info error reason ~p", [R1]),
-            {error,R1}
-    end.
+connect(ProviderConfig, Handler) ->
+    HostFlag = dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
+    {ok, Pid} = dubbo_transport_pool_sup:add_children(ProviderConfig, Handler),
+    logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
+    {ok, #connection_info{pid = Pid, weight = get_weight(ProviderConfig), host_flag = HostFlag}}.
 
 
 
-get_weight(_ProviderConfig)->
+get_weight(_ProviderConfig) ->
     %% todo get weight from provider info
     30.
\ No newline at end of file
diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl
index 1ede1d8..96947cc 100644
--- a/src/dubbo_protocol_dubbo.erl
+++ b/src/dubbo_protocol_dubbo.erl
@@ -26,50 +26,57 @@ refer(Url, Acc) ->
     {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
     case UrlInfo#dubbo_url.scheme of
         <<"dubbo">> ->
-            do_refer(UrlInfo),
-            {ok, todo};
+            {ok,Invoker} = do_refer(UrlInfo),
+            {ok, Invoker};
         _ ->
             {skip, Acc}
     end.
 
 do_refer(UrlInfo) ->
-
-    ok.
-
-
-getClients(ProviderUrl) ->
-    case new_transport(ProviderUrl) of
-        {ok,ConnectionInfoList} ->
-            ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
-            ok;
-        {error,Reason} ->
-            {error,Reason}
+    case dubbo_node_config_util:parse_provider_info(UrlInfo) of
+        {ok, ProviderConfig} ->
+%%            OldHostList = dubbo_provider_consumer_reg_table:get_interface_provider_node(ProviderConfig#provider_config.interface),
+            case getClients(ProviderConfig) of
+                {ok, ConnectionInfoList} ->
+                    dubbo_provider_consumer_reg_table:update_node_conections(ProviderConfig#provider_config.interface,ConnectionInfoList),
+                    HostFlag = dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
+                    {ok,#dubbo_invoker{host_flag = HostFlag,handle = ?MODULE}};
+                {error, Reason} ->
+                    {error, Reason}
+            end;
+        {error, R1} ->
+            logger:error("parse provider info error reason ~p", [R1]),
+            {error, R1}
     end.
 
+getClients(ProviderConfig) ->
+    %% @todo if connections parameter > 1, need new spec transport
+    case new_transport(ProviderConfig) of
+        {ok, ConnectionInfoList} ->
+%%            ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
+            {ok, ConnectionInfoList};
+        {error, Reason} ->
+            {error, Reason}
+    end.
 
 
 %%ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),
 
 
-new_transport(ProviderUrl)->
-    case dubbo_node_config_util:parse_provider_info(ProviderUrl) of
-        {ok, ProviderConfig} ->
-            HostFlag = get_host_flag(ProviderConfig),
-            case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of
-                [] ->
-                    case dubbo_exchanger:connect(ProviderUrl,?MODULE) of
-                        {ok,ConnectionInfo} ->
-                            {ok,[ConnectionInfo]};
-                        {error,Reason} ->
-                            logger:warning("start client fail ~p ~p",[Reason,HostFlag]),
-                            {error,Reason}
-                    end;
-                ConnectionInfoList ->
-                    {ok,ConnectionInfoList}
+new_transport(ProviderConfig) ->
+
+    HostFlag = get_host_flag(ProviderConfig),
+    case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of
+        [] ->
+            case dubbo_exchanger:connect(ProviderConfig, ?MODULE) of
+                {ok, ConnectionInfo} ->
+                    {ok, [ConnectionInfo]};
+                {error, Reason} ->
+                    logger:warning("start client fail ~p ~p", [Reason, HostFlag]),
+                    {error, Reason}
             end;
-        {error, R1} ->
-            logger:error("parse provider info error reason ~p", [R1]),
-            {error,R1}
+        ConnectionInfoList ->
+            {ok, ConnectionInfoList}
     end.
 
 
diff --git a/src/dubbo_provider_consumer_reg_table.erl b/src/dubbo_provider_consumer_reg_table.erl
index c7a8dfa..d47260d 100644
--- a/src/dubbo_provider_consumer_reg_table.erl
+++ b/src/dubbo_provider_consumer_reg_table.erl
@@ -29,7 +29,8 @@
     terminate/2,
     code_change/3]).
 
--export([update_consumer_connections/2,get_host_connections/2, select_connection/1, select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2]).
+-export([update_consumer_connections/2,update_node_conections/2,get_interface_provider_node/1,get_host_connections/2, select_connection/1,
+    select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2,clean_invalid_provider/1]).
 
 -include("dubbo.hrl").
 -define(SERVER, ?MODULE).
@@ -193,11 +194,7 @@ start_consumer(Interface, ProviderNodeInfo) ->
 get_host_connections(Host, Port) ->
     HostFlag = get_host_flag(Host, Port),
     List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag),
-    List2 = lists:map(
-        fun(#provider_node_list{host_flag = HostFlag,pid = Pid,readonly = Readonly}) ->
-            #connection_info{host_flag = HostFlag,pid = Pid,readonly = Readonly}
-        end, List),
-    List2.
+    List.
 
 
 %%%===================================================================
@@ -239,14 +236,29 @@ get_host_connections(Host, Port) ->
 %%                               end, ExecutesList),
 %%    ConnectionList.
 
+
+update_node_conections(HostFlag,Connections)->
+    lists:map(
+        fun(Item) ->
+            HostFlag= Item#connection_info.host_flag,
+            case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#connection_info{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
+                '$end_of_table' ->
+                    I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item),
+                    logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
+                _ ->
+                    ok
+            end
+        end, Connections),
+    ok.
+
 update_consumer_connections(Interface, Connections) ->
     lists:map(
         fun(Item) ->
             HostFlag= Item#connection_info.host_flag,
 
-            case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
+            case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#connection_info{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
                 '$end_of_table' ->
-                    I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid}),
+                    I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item),
                     logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
                 {_ObjectList,_Continuation} ->
                     ok
@@ -269,7 +281,7 @@ update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode)
         logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
         case IsUpdateProvideNode of
             true ->
-                I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag, connection_info = Item}),
+                I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item),
                 logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
             false ->
                 ok
@@ -320,18 +332,17 @@ clean_invalid_provider([HostFlag | DeleteProverList]) ->
     case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
         [] ->
             ok;
-        ProviderNodeList ->
-            ProviderNodeList1 = dubbo_lists_util:del_duplicate(ProviderNodeList),
-            clean_connection_info(ProviderNodeList1)
+        ProviderNodeConnections ->
+            ProviderNodeConnections1 = dubbo_lists_util:del_duplicate(ProviderNodeConnections),
+            clean_connection_info(ProviderNodeConnections1)
     end,
     clean_invalid_provider(DeleteProverList).
 
-clean_connection_info(ProviderNodeList) ->
+clean_connection_info(ProviderNodeConnections) ->
     lists:map(fun(Item) ->
-        Pid = Item#provider_node_list.connection_info#connection_info.pid,
-        ConnectionId = Item#provider_node_list.connection_info#connection_info.connection_id,
+        Pid = Item#connection_info.pid,
         Pattern = #interface_list{pid = Pid, _ = '_'},
         ets:delete_object(?INTERFCE_LIST_TABLE, Pattern),
-        dubbo_transport_pool_sup:stop_children(ConnectionId)
-              end, ProviderNodeList),
+        dubbo_transport_pool_sup:stop_children(Pid)
+              end, ProviderNodeConnections),
     ok.
\ No newline at end of file


[dubbo-erlang] 09/09: feature: support protocol extension

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit af4b9904242b34fa2fb15e4b767613272ff2a887
Author: DLive <xs...@163.com>
AuthorDate: Sun Jun 30 14:17:56 2019 +0800

    feature: support protocol extension
---
 config_example/sys.config                          |   7 +-
 src/dubbo_common.erl => include/constrans.hrl      |   9 +-
 include/dubbo.hrl                                  |  49 ++-
 include/dubboerl.hrl                               |  11 +-
 .../apache/dubbo/erlang/sample/service/App.java    |   2 +-
 .../src/main/resources/applicationConsumer.xml     |  28 ++
 .../src/main/resources/applicationProvider.xml     |   6 +-
 .../apps/dubbo_sample_service/src/userOperator.erl |   8 +-
 .../apps/dubboerl_demo/src/api_gateway_handle.erl  |  25 +-
 src/dubbo_adapter.erl                              |   2 +-
 src/dubbo_client_default.erl                       |  97 +----
 src/dubbo_cluster.erl                              |  10 +-
 src/dubbo_cluster_failfast.erl                     |  40 +-
 src/dubbo_common_fun.erl                           |  78 ++--
 src/dubbo_config_util.erl                          |   7 +-
 src/dubbo_directory.erl                            |  86 +++--
 src/dubbo_extension.erl                            |  95 +++--
 src/dubbo_filter.erl                               |   9 +-
 src/dubbo_invoker.erl                              |  29 +-
 src/dubbo_invoker_cluster.erl                      |  21 -
 src/dubbo_invoker_old.erl                          |  97 -----
 src/dubbo_loadbalance_random.erl                   |   8 +-
 src/dubbo_netty_client.erl                         | 430 ---------------------
 src/dubbo_node_config_util.erl                     |  66 ++--
 src/dubbo_protocol.erl                             |  11 +-
 src/dubbo_protocol_dubbo.erl                       | 122 +++++-
 src/dubbo_protocol_registry.erl                    |  89 ++++-
 src/dubbo_provider_consumer_reg_table.erl          |  78 ++--
 src/dubbo_provider_protocol.erl                    |  15 +-
 src/dubbo_reference_config.erl                     |  76 ++--
 src/dubbo_registry.erl                             |  35 +-
 src/dubbo_registry_zookeeper.erl                   | 201 ++++++----
 src/dubbo_service_config.erl                       | 110 ++++++
 src/{dubbo_zookeeper.erl => dubbo_shutdown.erl}    | 140 +------
 src/dubbo_traffic_control.erl                      |  24 +-
 src/dubbo_type_transfer.erl                        |  25 +-
 src/dubboerl.erl                                   |  21 +-
 src/dubboerl_app.erl                               |  13 +-
 src/dubboerl_sup.erl                               |  23 +-
 test/consumer_SUITE.erl                            |   4 +-
 test/dubbo_config_parser_tests.erl                 |   2 +-
 test/dubbo_consumer_pool_tests.erl                 |   5 +-
 ...er_SUITE.erl => dubbo_service_config_SUITE.erl} |  71 +---
 test/dubbo_zookeeper_tests.erl                     |  25 --
 ...nsumer_SUITE.erl => reference_config_SUITE.erl} |  50 +--
 test/userOperator.erl                              |   8 +-
 46 files changed, 1049 insertions(+), 1319 deletions(-)

diff --git a/config_example/sys.config b/config_example/sys.config
index 07cfaf9..c26fc54 100644
--- a/config_example/sys.config
+++ b/config_example/sys.config
@@ -20,11 +20,10 @@
         {registry,zookeeper},
         {zookeeper_list,[{"127.0.0.1",2181}]},
         {application,<<"testdubboerl">>},
-        {registry,true},
-        {protocol,hessian},
-        {port,20881},
+        {serialization,hessian},
+        {protocol, {dubbo, [{port, 20882}]}},
         {consumer,[
-           % {<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
+%%            {<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
         ]},
         {provider,[
             {dubbo_service_user_impl,userOperator,<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
diff --git a/src/dubbo_common.erl b/include/constrans.hrl
similarity index 87%
rename from src/dubbo_common.erl
rename to include/constrans.hrl
index 1a35b58..6cb4a89 100644
--- a/src/dubbo_common.erl
+++ b/include/constrans.hrl
@@ -14,7 +14,10 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(dubbo_common).
 
-%% API
--export([]).
+
+-define(CATEGORY_KEY, <<"category">>).
+
+-define(PROVIDERS_CATEGORY, <<"providers">>).
+
+-define(CONSUMERS_CATEGORY, <<"consumers">>).
diff --git a/include/dubbo.hrl b/include/dubbo.hrl
index 282e0da..b6a32e9 100644
--- a/include/dubbo.hrl
+++ b/include/dubbo.hrl
@@ -63,15 +63,37 @@
     decode_state
 }).
 
+
+-record(reference_config,{
+    interface,
+    application = <<"NoName">> :: binary(),
+    category = <<"consumers">> :: binary(),
+    check = false :: boolean(),
+    default_timeout = 500 :: integer(),
+    dubbo_version = <<"2.5.3">> :: binary(),
+    methods = [] :: list(),
+    revision = <<"">> :: binary(),
+    side = <<"consumers">> :: binary(),
+    sync = false ::boolean()
+
+}).
+
 -record(dubbo_rpc_invocation, {
     serialVersionUID = -4355285085441097045,
-    className :: string(),
-    classVersion :: string(),
-    methodName :: string(),
-    parameterDesc :: string(),
+    className :: binary(),
+    classVersion :: binary(),
+    methodName :: binary(),
+    parameterDesc :: binary(),
     parameterTypes = [] :: [#type_def{}],
     parameters = [] :: [term()],
-    attachments = [] :: [term()]
+    attachments = [] :: [term()],
+
+    call_ref :: atom(),
+    reference_ops :: #reference_config{},
+    loadbalance :: atom(),
+    source_pid :: pid(),
+    transport_pid :: pid()
+
 }).
 
 -record(consumer_config, {
@@ -96,15 +118,24 @@
     application,
     dubbo = <<"2.5.3">>,
     methods = [],
-    side = <<"provider">>
+    side = <<"provider">>,
+    impl_handle
+}).
+
+-record(invoker,{
+    url,
+    handler
 }).
 
 
--record(interface_info, {interface, loadbalance}).
+-record(interface_info, {interface, loadbalance, protocol}).
 
 -record(interface_list, {interface, pid, connection_info}).
-%%-record(provider_node_list, {host_flag, pid, weight, readonly = false}).
 -record(connection_info, {host_flag, pid, weight, readonly = false}).
 
 -type dubbo_request() :: #dubbo_request{}.
--type dubbo_response() :: #dubbo_response{}.
\ No newline at end of file
+-type dubbo_response() :: #dubbo_response{}.
+-type invocation():: #dubbo_rpc_invocation{}.
+
+%% @doc invoke return info
+-type invoke_result() :: {ok, reference()}| {ok, reference(), Data :: any(), RpcContent :: list()}| {error, Reason :: timeout|no_provider|any()}.
\ No newline at end of file
diff --git a/include/dubboerl.hrl b/include/dubboerl.hrl
index a9be5f1..7c8b1af 100644
--- a/include/dubboerl.hrl
+++ b/include/dubboerl.hrl
@@ -20,7 +20,16 @@
 
 -define(TRAFFIC_CONTROL, traffic_control).
 
+-define(SERVICE_EXPORT_TABLE,dubbo_service_export).
 
--record(dubbo_url, {scheme, user_info, host, port, path, parameters, fragment}).
+-record(dubbo_url, {
+    scheme :: binary() ,
+    user_info :: binary(),
+    host:: binary(),
+    port::integer(),
+    path:: binary(),
+    parameters::map(),
+    fragment::binary()
+}).
 
 -record(dubbo_invoker, {host_flag, handle}).
\ No newline at end of file
diff --git a/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/App.java b/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/App.java
index dc6deb1..52cd1d1 100644
--- a/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/App.java
+++ b/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/App.java
@@ -30,7 +30,7 @@ public class App {
     public static void main(String[] args) throws IOException {
         System.out.println("将要监听服务");
         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
-                new String[]{"applicationProvider.xml"});
+                new String[]{"applicationConsumer.xml"});
         context.start();
         UserOperator userOperator = (UserOperator) context.getBean("userInterface");
         UserInfo result = userOperator.getUserInfo("hh-bb");
diff --git a/samples/dubbo-sample-service/src/main/resources/applicationConsumer.xml b/samples/dubbo-sample-service/src/main/resources/applicationConsumer.xml
new file mode 100644
index 0000000..296bc83
--- /dev/null
+++ b/samples/dubbo-sample-service/src/main/resources/applicationConsumer.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
+    <dubbo:application name="hello-world"/><!-- 注册地址 -->
+    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
+    <dubbo:protocol name="dubbo" port="20880"/>
+
+    <dubbo:reference  id="userInterface" interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" retries="0" />
+</beans>
diff --git a/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml b/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
index 425e41e..7a32064 100644
--- a/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
+++ b/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
@@ -10,8 +10,8 @@
 
     <dubbo:consumer check="false" timeout="300000" id="dubboConsumerConfig" retries="0"/>
 
-<!--    <bean id="userService" class="org.apache.dubbo.erlang.sample.service.impl.UserOperatorImpl"/>-->
-<!--    <dubbo:service interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" ref="userService"/>-->
+    <bean id="userService" class="org.apache.dubbo.erlang.sample.service.impl.UserOperatorImpl"/>
+    <dubbo:service interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" ref="userService"/>
 
-    <dubbo:reference  id="userInterface" interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" retries="0" />
+<!--    <dubbo:reference  id="userInterface" interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" retries="0" />-->
 </beans>
diff --git a/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl b/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
index d7e0809..b92ca62 100644
--- a/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
+++ b/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
@@ -67,7 +67,7 @@ getUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec genUserId()->
@@ -96,7 +96,7 @@ genUserId( RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserInfo(Arg0::#userInfoRequest{})->
@@ -127,7 +127,7 @@ queryUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserList(Arg0::list())->
@@ -158,5 +158,5 @@ queryUserList(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
diff --git a/samples/dubboerl_demo/apps/dubboerl_demo/src/api_gateway_handle.erl b/samples/dubboerl_demo/apps/dubboerl_demo/src/api_gateway_handle.erl
index b89efb8..5430532 100644
--- a/samples/dubboerl_demo/apps/dubboerl_demo/src/api_gateway_handle.erl
+++ b/samples/dubboerl_demo/apps/dubboerl_demo/src/api_gateway_handle.erl
@@ -1,13 +1,20 @@
-%%%-------------------------------------------------------------------
-%%% @author dlive
-%%% @copyright (C) 2018, <COMPANY>
-%%% @doc
-%%%
-%%% @end
-%%% Created : 28. Feb 2018 10:57 PM
-%%%-------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
 -module(api_gateway_handle).
--author("dlive").
 
 -include_lib("dubbo_sample_service/include/dubbo_sample_service.hrl").
 
diff --git a/src/dubbo_adapter.erl b/src/dubbo_adapter.erl
index 00d628f..cd07342 100644
--- a/src/dubbo_adapter.erl
+++ b/src/dubbo_adapter.erl
@@ -28,7 +28,7 @@ reference(Data) ->
         mid = dubbo_id_generator:gen_id(),
         data = Data,
         mversion = <<"0.0.0">>,
-        serialize_type = serialize_value(application:get_env(dubboerl, protocol, hessian))
+        serialize_type = serialize_value(application:get_env(dubboerl, serialization, hessian))
     }.
 
 serialize_value(json) ->
diff --git a/src/dubbo_client_default.erl b/src/dubbo_client_default.erl
index 220c30f..3fe08fa 100644
--- a/src/dubbo_client_default.erl
+++ b/src/dubbo_client_default.erl
@@ -28,7 +28,8 @@
     handle_info/2,
     terminate/2,
     code_change/3]).
--export([start_link/1]).
+
+-export([start_link/2]).
 
 -export([check_recv_data/2]).
 
@@ -38,7 +39,6 @@
 -record(state, {provider_config, socket = undefined,
     heartbeat = #heartbeat{},
     recv_buffer = <<>>,         %%从服务端接收的数据
-    host_flag,
     reconnection_timer,
     handler
 }).
@@ -53,10 +53,8 @@
 %%
 %% @end
 %%--------------------------------------------------------------------
--spec(start_link(Name :: binary(), ProviderConfig :: #provider_config{}) ->
-    {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
-start_link(ProviderConfig) ->
-    gen_server:start_link(?MODULE, [ProviderConfig], []).
+start_link(ProviderConfig, Handle) ->
+    gen_server:start_link(?MODULE, [ProviderConfig, Handle], []).
 
 %%%===================================================================
 %%% gen_server callbacks
@@ -76,7 +74,7 @@ start_link(ProviderConfig) ->
 -spec(init(Args :: term()) ->
     {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term()} | ignore).
-init([HostFlag, ProviderConfig]) ->
+init([ProviderConfig, Handle]) ->
     #provider_config{host = Host, port = Port} = ProviderConfig,
     State = case open(Host, Port) of
                 {ok, Socket} ->
@@ -86,9 +84,9 @@ init([HostFlag, ProviderConfig]) ->
             end,
     NowStamp = dubbo_time_util:timestamp_ms(),
     HeartBeatInfo = #heartbeat{last_read = NowStamp, last_write = NowStamp},
-    logger:info("netty client start ~p", [HostFlag]),
+    logger:info("netty client start ~p ~p", [Host, Port]),
     start_heartbeat_timer(HeartBeatInfo),
-    {ok, State#state{provider_config = ProviderConfig, heartbeat = HeartBeatInfo, host_flag = HostFlag}}.
+    {ok, State#state{provider_config = ProviderConfig, heartbeat = HeartBeatInfo, handler = Handle}}.
 
 %%--------------------------------------------------------------------
 %% @private
@@ -120,11 +118,11 @@ handle_call(_Request, _From, State) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), NewState :: #state{}}).
 
-handle_cast({send_request, Ref, Request, Data, SourcePid, RequestState}, State) ->
+handle_cast({send_request, Ref, Request, Data, SourcePid, Invocation}, State) ->
     logger:debug("[send_request begin] send data to provider consumer mid ~p pid ~p sourcePid ~p", [Request#dubbo_request.mid, self(), SourcePid]),
     NewState = case send_msg(Data, State) of
                    ok ->
-                       save_request_info(Request, SourcePid, Ref, RequestState),
+                       save_request_info(Request, SourcePid, Ref, Invocation),
                        logger:debug("[send_request end] send data to provider consumer pid ~p state ok", [self()]),
                        State;
                    {error, closed} ->
@@ -162,7 +160,7 @@ handle_info({tcp, _Port, Data}, #state{recv_buffer = RecvBuffer} = State) ->
 %%    logger:debug("[INFO] recv one data ~w",[Data]),
     {ok, NextBuffer, NewState} = case check_recv_data(<<RecvBuffer/binary, Data/binary>>, State) of
                                      {next_buffer, NextBuffer2, State3} ->
-                                         logger:debug("[INFO] recv one data state wait next_buffer"),
+                                         logger:debug("recv one data state wait next_buffer"),
                                          {ok, NextBuffer2, State3}
                                  end,
 %%    HeartbeatInfo =update_heartbeat(write,NewState#state.heartbeat),
@@ -232,6 +230,8 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%===================================================================
 
+open(Host, Port) when is_binary(Host) ->
+    open(binary_to_list(Host), Port);
 open(Host, Port) ->
     logger:debug("will connect to provider ~p ~p", [Host, Port]),
     %
@@ -356,76 +356,17 @@ check_recv_data(<<>>, State) ->
     {next_buffer, <<>>, State}.
 
 
-process_data(Data, State) ->
-    <<Header:16/binary, RestData/binary>> = Data,
-    case dubbo_codec:decode_header(Header) of
-        {ok, response, ResponseInfo} ->
-            process_response(ResponseInfo#dubbo_response.is_event, ResponseInfo, RestData, State),
-%%            dubbo_traffic_control:decr_count(State#state.host_flag),
-%%            case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
-%%                undefined->
-%%                    logger:error("dubbo response can't find request data,response ~p",[ResponseInfo]);
-%%                {SourcePid,Ref,_RequestState} ->
-%%                    {ok,Res} = dubbo_codec:decode_response(ResponseInfo,RestData),
-%%
-%%                    logger:info("got one response mid ~p, is_event ~p state ~p",[Res#dubbo_response.mid,Res#dubbo_response.is_event,Res#dubbo_response.state]),
-%%                    case Res#dubbo_response.is_event of
-%%                        false ->
-%%                            %% todo rpccontent need merge response with request
-%%                            RpcContent=[],
-%%                            ResponseData = dubbo_type_transfer:response_to_native(Res),
-%%                            gen_server:cast(SourcePid,{response_process,Ref,RpcContent,ResponseData});
-%%                        _->
-%%                            ok
-%%                    end
-%%            end,
-            {ok, State};
-        {ok, request, RequestInfo} ->
-            {ok, Req} = dubbo_codec:decode_request(RequestInfo, RestData),
-            logger:info("get one request mid ~p, is_event ~p", [Req#dubbo_request.mid, Req#dubbo_request.is_event]),
-            {ok, State2} = process_request(Req#dubbo_request.is_event, Req, State),
-            {ok, State2};
-        {error, Type, RelData} ->
-            logger:error("process_data error type ~p RelData ~p", [Type, RelData]),
-            {ok, State}
-    end.
-
-
-%% @doc process event
--spec process_response(IsEvent :: boolean(), #dubbo_response{}, #state{}, term()) -> ok.
-process_response(false, ResponseInfo, RestData, State) ->
-    dubbo_traffic_control:decr_count(State#state.host_flag),
-    case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
-        undefined ->
-            logger:error("dubbo response can't find request data,response ~p", [ResponseInfo]);
-        {SourcePid, Ref, _RequestState} ->
-            {ok, Res} = dubbo_codec:decode_response(ResponseInfo, RestData),
-            logger:info("got one response mid ~p, is_event ~p state ~p", [Res#dubbo_response.mid, Res#dubbo_response.is_event, Res#dubbo_response.state]),
-            case Res#dubbo_response.is_event of
-                false ->
-                    %% todo rpccontent need merge response with request
-                    RpcContent = [],
-                    ResponseData = dubbo_type_transfer:response_to_native(Res),
-                    gen_server:cast(SourcePid, {response_process, Ref, RpcContent, ResponseData});
-                _ ->
-                    ok
-            end
+process_data(Data, #state{handler = ProtocolHandle} = State) ->
+    case ProtocolHandle:data_receive(Data) of
+        ok ->
+            ok;
+        {do_heartbeat, Mid} ->
+            send_heartbeat_msg(Mid, false,State),
+            ok
     end,
-    {ok, State};
-process_response(true, _ResponseInfo, _RestData, State) ->
     {ok, State}.
 
-process_request(true, #dubbo_request{data = <<"R">>}, State) ->
-    {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
-    {ok, State};
-process_request(true, Request, State) ->
-    {ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State),
-    {ok, NewState};
-process_request(false, Request, State) ->
-    {ok, State}.
 
 
 save_request_info(Request, SourcePid, Ref, RequestState) ->
     put(Request#dubbo_request.mid, {SourcePid, Ref, RequestState}).
-get_earse_request_info(Mid) ->
-    erase(Mid).
\ No newline at end of file
diff --git a/src/dubbo_cluster.erl b/src/dubbo_cluster.erl
index da4031e..3139b47 100644
--- a/src/dubbo_cluster.erl
+++ b/src/dubbo_cluster.erl
@@ -15,7 +15,11 @@
 %% limitations under the License.
 %%------------------------------------------------------------------------------
 -module(dubbo_cluster).
--author("dlive").
 
-%% API
--export([]).
+
+
+select(Interface) ->
+    ok.
+
+get_loadbalance() ->
+    ok.
\ No newline at end of file
diff --git a/src/dubbo_cluster_failfast.erl b/src/dubbo_cluster_failfast.erl
index 6078517..48809f8 100644
--- a/src/dubbo_cluster_failfast.erl
+++ b/src/dubbo_cluster_failfast.erl
@@ -15,7 +15,43 @@
 %% limitations under the License.
 %%------------------------------------------------------------------------------
 -module(dubbo_cluster_failfast).
--author("dlive").
+-behaviour(dubbo_filter).
 
+-include("dubbo.hrl").
 %% API
--export([]).
+-export([invoke/2, do_response/2]).
+
+
+invoke(#dubbo_rpc_invocation{className = Interface, loadbalance = LoadBalance} = Invocation, Acc) ->
+    case dubbo_provider_consumer_reg_table:select_connection(Invocation#dubbo_rpc_invocation.className) of
+        {ok, List} ->
+            Connection = loadbalance_select(LoadBalance, List),
+            #connection_info{pid = Pid, host_flag = HostFlag} = Connection,
+            {ok, Invocation#dubbo_rpc_invocation{transport_pid = Pid}, Acc};
+%%            case dubbo_traffic_control:check_goon(HostFlag, 199) of
+%%                ok ->
+%%
+%%%%                    Request2 = merge_attachments(Request, RpcContext), %% @todo need add rpc context to attachment
+%%
+%%%%                    {ok, RequestData} = dubbo_codec:encode_request(Request2),
+%%%%                    Ref = get_ref(RequestState),
+%%%%                    gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}),
+%%%%                    case is_sync(RequestState) of
+%%%%                        true ->
+%%%%                            sync_receive(Ref, get_timeout(RequestState));
+%%%%                        false -> {ok, Ref}
+%%%%                    end;
+%%                full ->
+%%                    {error, request_full}
+%%            end;
+        {error, none} ->
+            logger:error("[INVOKE] ~p error Reason no_provider", [Interface]),
+            {stop, no_provider}
+    end.
+
+loadbalance_select(LoadBalance, ConnectionList) ->
+    Connection = LoadBalance:select(ConnectionList),
+    Connection.
+
+do_response(Invocation, Result) ->
+    {ok, Invocation, Result}.
diff --git a/src/dubbo_common_fun.erl b/src/dubbo_common_fun.erl
index 5f38fbd..b68ad99 100644
--- a/src/dubbo_common_fun.erl
+++ b/src/dubbo_common_fun.erl
@@ -18,7 +18,9 @@
 
 -include("dubboerl.hrl").
 %% API
--export([local_ip_v4/0, local_ip_v4_str/0, parse_url/1, url_to_binary/1]).
+-export([local_ip_v4/0, local_ip_v4_str/0, parse_url/1, url_to_binary/1, parse_url_parameter/1, binary_list_join/2]).
+
+-define(URL_PATH_SEPARATOR,47).  %% 47 == <<"/">>
 
 local_ip_v4() ->
     {ok, Addrs} = inet:getifaddrs(),
@@ -32,12 +34,12 @@ local_ip_v4_str() ->
     list_to_binary(io_lib:format("~p.~p.~p.~p", [V1, V2, V3, V4])).
 
 
--spec(parse_url(Url :: binary()|list()) -> {ok, #dubbo_url{}}).
+-spec(parse_url(Url :: binary()|list()) -> {ok, #dubbo_url{}}|{error, any()}).
 parse_url(Url) when is_binary(Url) ->
     parse_url(binary_to_list(Url));
 parse_url(Url) ->
     case http_uri:parse(Url, []) of
-        {ok, {Scheme, _UserInfo, Host, Port, _Path, Query}} ->
+        {ok, {Scheme, UserInfo, Host, Port, Path, Query}} ->
             QueryStr = case lists:prefix("?", Query) of
                            true ->
                                [_ | Query2] = Query,
@@ -45,22 +47,34 @@ parse_url(Url) ->
                            false ->
                                Query
                        end,
-            QueryListTmp = string:tokens(QueryStr, "&"),
-            Parameters = parse_url_parameter(QueryListTmp, #{}),
-            Result = #dubbo_url{scheme = Scheme, host = Host, port = Port, parameters = Parameters},
+            Parameters = parse_url_parameter(QueryStr),
+            Result = #dubbo_url{
+                scheme = atom_to_binary(Scheme, utf8),
+                user_info = UserInfo,
+                host = list_to_binary(Host),
+                port = Port,
+                path = Path,
+                parameters = Parameters
+            },
             {ok, Result};
         {error, R1} ->
             {error, R1}
     end.
 
 
+parse_url_parameter(ParameterStr) when is_binary(ParameterStr) ->
+    parse_url_parameter(binary_to_list(ParameterStr));
+parse_url_parameter(ParameterStr) ->
+    QueryListTmp = string:tokens(ParameterStr, "&"),
+    parse_url_parameter(QueryListTmp, #{}).
+
 parse_url_parameter([], Parameters) ->
     Parameters;
 parse_url_parameter([Item | Rest], Parameters) ->
     case string:tokens(Item, "=") of
         KeyPair when length(KeyPair) == 2 ->
             [Key, Value] = KeyPair,
-            parse_url_parameter(Rest, maps:put(Key, Value, Parameters));
+            parse_url_parameter(Rest, maps:put(list_to_binary(Key), list_to_binary(Value), Parameters));
         KeyPair2 ->
             logger:error("parse parameter error, keypair ~p", [KeyPair2]),
             parse_url_parameter(Rest, Parameters)
@@ -68,23 +82,43 @@ parse_url_parameter([Item | Rest], Parameters) ->
 
 
 url_to_binary(UrlInfo) ->
-    ParameterStr =
-        case UrlInfo#dubbo_url.parameters of
-            undefined ->
-                "";
-            Parameter ->
-                KeyValues = maps:to_list(Parameter),
-                KeyValues2 = [io_lib:format("~s=~s", [Key, http_uri:encode(Value)]) || {Key, Value} <= KeyValues],
-                ParameterStr1 = string:join(KeyValues2, "&"),
-                ParameterStr2 = ["?" | ParameterStr1],
-                list_to_binary(ParameterStr2)
-        end,
-    Value = io_lib:format(<<"~s://~s:~p/~s?~s">>,
+    ParameterStr = format_parameter(UrlInfo#dubbo_url.parameters),
+    Value = lists:flatten(io_lib:format(<<"~s://~s:~p/~s?~s">>,
         [
             UrlInfo#dubbo_url.scheme,
             UrlInfo#dubbo_url.host,
             UrlInfo#dubbo_url.port,
-            UrlInfo#dubbo_url.path,
+            format_path(UrlInfo#dubbo_url.path),
             ParameterStr
-        ]),
-    list_to_binary(Value).
\ No newline at end of file
+        ])),
+    list_to_binary(Value).
+format_path(<< ?URL_PATH_SEPARATOR:8,Rest/binary>>) ->
+    logger:debug("format_path1 ~p",[Rest]),
+    Rest;
+format_path([?URL_PATH_SEPARATOR|Rest]) ->
+    logger:debug("format_path3 ~p",[Rest]),
+    Rest;
+format_path(Value) ->
+    logger:debug("format_path2 ~p",[Value]),
+    Value.
+
+format_parameter(undefined) ->
+    "";
+format_parameter(Parameter) when is_map(Parameter) ->
+    KeyValues = maps:to_list(Parameter),
+    format_parameter(KeyValues);
+format_parameter(Parameters) ->
+    KeyValues2 = [io_lib:format("~s=~s", [Key, Value]) || {Key, Value} <- Parameters],
+    ParameterStr1 = string:join(KeyValues2, "&"),
+    ParameterStr1.
+
+binary_list_join([], _Separator) ->
+    <<"">>;
+binary_list_join([H | T], Separator) ->
+    binary_list_join1(H, T, Separator).
+
+binary_list_join1(Header, [], _Separator) ->
+    Header;
+binary_list_join1(Header, [Item | Rest], Separator) ->
+    binary_list_join1(<<Header/binary, Separator/binary, Item/binary>>, Rest, Separator).
+
diff --git a/src/dubbo_config_util.erl b/src/dubbo_config_util.erl
index 4db73d0..0ef0760 100644
--- a/src/dubbo_config_util.erl
+++ b/src/dubbo_config_util.erl
@@ -18,7 +18,7 @@
 
 -include("dubbo.hrl").
 %% API
--export([gen_consumer/3, gen_provider/5]).
+-export([gen_consumer/3, gen_provider/6]).
 
 
 gen_consumer(Application, Interface, Option) ->
@@ -34,7 +34,7 @@ gen_consumer(Application, Interface, Option) ->
         side = <<"consumers">>
     }.
 
-gen_provider(Application, Port, Interface, MethodList, Option) ->
+gen_provider(Application, Port, Interface, MethodList, ImplModuleName, _Option) ->
     Host = dubbo_network_tools:local_ipv4_binary(),
     MethodList2 = [atom_to_binary(Item, utf8) || Item <- MethodList],
     #provider_config{
@@ -46,5 +46,6 @@ gen_provider(Application, Port, Interface, MethodList, Option) ->
         executes = 10,
         application = Application,
         methods = MethodList2,
-        side = <<"provider">>
+        side = <<"provider">>,
+        impl_handle = ImplModuleName
     }.
\ No newline at end of file
diff --git a/src/dubbo_directory.erl b/src/dubbo_directory.erl
index c667f29..4945237 100644
--- a/src/dubbo_directory.erl
+++ b/src/dubbo_directory.erl
@@ -20,7 +20,7 @@
 -include("dubboerl.hrl").
 -include("dubbo.hrl").
 
--export([subscribe/2,notify/2]).
+-export([subscribe/2, notify/2]).
 %% API
 -export([start_link/0]).
 
@@ -72,59 +72,71 @@ start_link() ->
 init([]) ->
     {ok, #state{}}.
 
-subscribe(RegistryName,SubcribeUrl)->
-    try gen_server:call(?SERVER,{subscribe,RegistryName,SubcribeUrl},5000) of
-        ok->
-            ok
-    catch
-        Error:Reason->
-            %% todo improve erro type
-            {error,Reason}
-    end.
-
-notify(Interface,UrlList)->
-    %% @todo if UrlList size is 1, and protocol is empty ,need destroyAllInvokers
+subscribe(RegistryName, SubcribeUrl) ->
+    RegistryName:subscribe(SubcribeUrl, fun dubbo_directory:notify/2),
+    ok.
 
+notify(Interface, []) ->
+    logger:info("[DUBBO] directory get notify, interface provider list is empty"),
+    ok;
+notify(Interface, UrlList) ->
     refresh_invoker(UrlList),
-%%    dubbo_consumer_pool:start_consumer(Interface, UrlList),
     ok.
 
 
-refresh_invoker(UrlList)->
+refresh_invoker(UrlList) ->
     case pick_interface(UrlList) of
-        {error,Reason}->
+        {error, Reason} ->
             fail;
-        {"empty",Interface,_}->
+        {<<"empty">>, Interface,_} ->
+            OldProviderHosts = dubbo_provider_consumer_reg_table:get_interface_provider_node(Interface),
+            dubbo_provider_consumer_reg_table:clean_invalid_provider(OldProviderHosts),
             todo_destroy;
-        {_,Interface,LoadBalance} ->
+        {Schame, Interface, LoadBalance} ->
+            ProtocolModule = binary_to_existing_atom(<<<<"dubbo_protocol_">>/binary, Schame/binary>>, latin1),
+
+            logger:info("[DUBBO] refresh invoker for interface ~p loadbalance ~p protocol ~p", [Interface, LoadBalance, ProtocolModule]),
             OldProviderHosts = dubbo_provider_consumer_reg_table:get_interface_provider_node(Interface),
-            NewInvokers = refresh_invoker(UrlList,[]),
+            NewInvokers = refresh_invoker(UrlList, []),
             NewProviderHosts = [Item#dubbo_invoker.host_flag || Item <- NewInvokers],
             DeleteProverList = OldProviderHosts -- NewProviderHosts,
             dubbo_provider_consumer_reg_table:clean_invalid_provider(DeleteProverList),
-            dubbo_provider_consumer_reg_table:update_connection_info(#interface_info{interface = Interface,loadbalance = LoadBalance})
+
+            lists:map(
+                fun(NewHosts) ->
+                    NewHostConnections = dubbo_provider_consumer_reg_table:query_node_connections(NewHosts),
+                    dubbo_provider_consumer_reg_table:update_consumer_connections(Interface, NewHostConnections)
+                end, NewProviderHosts),
+
+
+%%            dubbo_provider_consumer_reg_table:update_connection_info(#interface_info{interface = Interface,loadbalance = LoadBalance})
+            dubbo_provider_consumer_reg_table:update_interface_info(#interface_info{interface = Interface, loadbalance = LoadBalance, protocol = ProtocolModule})
     end.
 %%    OldProviderHosts =
 
-
-refresh_invoker([Url|Rest],Acc)->
-    case dubbo_extension:run_fold(protocol,refer,[Url],undefined) of
+refresh_invoker([], Acc) ->
+    Acc;
+refresh_invoker([Url | Rest], Acc) ->
+    logger:info("refresh invoker ~s", [Url]),
+    case dubbo_extension:run_fold(protocol, refer, [Url], undefined) of
         undefined ->
-            refresh_invoker(Rest,Acc);
-        {ok,Invoker} ->
-            refresh_invoker(Rest,[Invoker|Acc]);
-        {stop,_}->
-            refresh_invoker(Rest,Acc)
+            refresh_invoker(Rest, Acc);
+        {ok, Invoker} ->
+            refresh_invoker(Rest, [Invoker | Acc]);
+        {stop, _} ->
+            refresh_invoker(Rest, Acc)
     end.
 
 pick_interface([Url | _]) ->
     case dubbo_common_fun:parse_url(Url) of
-        {ok,UrlInfo}->
-            Interface = maps:get("interface",UrlInfo#dubbo_url.parameters),
-            LoadBalance = list_to_atom("dubbo_loadbalance_" ++ maps:get("loadbalance",UrlInfo#dubbo_url.parameters,"random")),
-            {UrlInfo#dubbo_url.scheme,Interface,LoadBalance};
-        {error,Reason} ->
-            {error,Reason}
+        {ok, UrlInfo} ->
+            logger:debug("pick interface info from ~p", [Url]),
+            Interface = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
+            LoadBalanceName = maps:get(<<"loadbalance">>, UrlInfo#dubbo_url.parameters, <<"random">>),
+            LoadBalance = binary_to_existing_atom(<<<<"dubbo_loadbalance_">>/binary, LoadBalanceName/binary>>, latin1),
+            {UrlInfo#dubbo_url.scheme, Interface, LoadBalance};
+        {error, Reason} ->
+            {error, Reason}
     end.
 
 
@@ -143,9 +155,9 @@ pick_interface([Url | _]) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
     {stop, Reason :: term(), NewState :: #state{}}).
-handle_call({subscribe,RegistryName,SubcribeUrl}, _From, State) ->
-    NotifyFun= fun dubbo_directory:notify/1,
-    apply(RegistryName,subscribe,[SubcribeUrl,NotifyFun]),
+handle_call({subscribe, RegistryName, SubcribeUrl}, _From, State) ->
+    NotifyFun = fun dubbo_directory:notify/1,
+    apply(RegistryName, subscribe, [SubcribeUrl, NotifyFun]),
     {reply, ok, State};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
diff --git a/src/dubbo_extension.erl b/src/dubbo_extension.erl
index 4e5eb0c..ee4dae2 100644
--- a/src/dubbo_extension.erl
+++ b/src/dubbo_extension.erl
@@ -18,7 +18,7 @@
 -behaviour(gen_server).
 
 %% API
--export([run/3,run_fold/4,register/3,unregister/3]).
+-export([run/3, run_fold/4, run_fold/5, register/3, unregister/3, invoke/5, invoke_foldr/4]).
 
 
 -export([start_link/0]).
@@ -31,70 +31,104 @@
 -record(state, {}).
 
 
--spec reg(HookName::hookname(), Module::atom(),Priority::integer()) -> ok | {error, term()}.
-register(HookName, Module,Priority) ->
-    gen_server:call(?MODULE, {register, HookName, {Priority, {Module}}}).
+-spec register(HookName :: atom(), Module :: atom(), Priority :: integer()) -> ok | {error, term()}.
+register(HookName, Module, Priority) ->
+    gen_server:call(?MODULE, {register, HookName, {Priority, Module}}).
 
--spec unregister(HookName::hookname(), Module::atom(),Priority::integer()) -> ok.
-unregister(HookName, Module,Priority) ->
+-spec unregister(HookName :: atom(), Module :: atom(), Priority :: integer()) -> ok.
+unregister(HookName, Module, Priority) ->
     gen_server:call(?MODULE, {unregister, HookName, {Priority, Module}}).
 
-%% @doc run all hooks registered for the HookName.
-%% Execution can be interrupted if an hook return the atom `stop'.
--spec run(HookName::hookname(), Args::list()) -> ok.
-run(HookName,Fun, Args) ->
+-spec run(HookName :: atom(), Fun :: atom(), Args :: list()) -> ok.
+run(HookName, Fun, Args) ->
     case find_hooks(HookName) of
         no_hook -> ok;
-        Hooks -> run1(Hooks, HookName,Fun, Args)
+        Hooks ->
+            run1(Hooks, HookName, Fun, Args)
     end.
 
-run1([], _HookName,_Fun, _Args) ->
+run1([], _HookName, _Fun, _Args) ->
     ok;
 run1([M | Rest], HookName, Fun, Args) ->
     Ret = (catch apply(M, Fun, Args)),
     case Ret of
         {'EXIT', Reason} ->
+            io:format(user, "~p~n error running extension: ~p~n", [HookName, Reason]),
             logger:error("~p~n error running extension: ~p~n", [HookName, Reason]),
-            run1(Rest, HookName,Fun, Args);
+            run1(Rest, HookName, Fun, Args);
         stop ->
             ok;
         _ ->
-            run1(Rest, HookName,Fun, Args)
+            run1(Rest, HookName, Fun, Args)
     end.
 
--spec run_fold(HookName::hookname(), Args::list(), Acc::any()) -> Acc2::any().
+-spec run_fold(HookName :: atom(), Fun :: atom(), Args :: list(), Acc :: any()) -> Acc2 :: any().
 run_fold(HookName, Fun, Args, Acc) ->
     case find_hooks(HookName) of
         no_hook -> Acc;
-        Hooks -> run_fold1(Hooks,HookName, Fun, Args, Acc)
+        Hooks -> run_fold1(Hooks, HookName, Fun, Args, Acc)
     end.
 
+run_fold(HookName, Fun, Args, Acc, AppendExtension) ->
+    case find_hooks(HookName) of
+        no_hook -> Acc;
+        Hooks ->
+            run_fold1(Hooks ++ AppendExtension, HookName, Fun, Args, Acc)
+    end.
 
-run_fold1([], _HookName,_Fun, _Args,  Acc) ->
+run_fold1([], _HookName, _Fun, _Args, Acc) ->
     Acc;
-run_fold1([M | Rest], HookName,Fun, Args0,  Acc) ->
+run_fold1([M | Rest], HookName, Fun, Args0, Acc) ->
     Args = Args0 ++ [Acc],
     Ret = (catch apply(M, Fun, Args)),
     case Ret of
         {'EXIT', Reason} ->
-            error_logger:error_msg("~p~n error running hook: ~p~n",
-                [HookName, Reason]),
-            run_fold1(Rest, HookName,Fun,Args0, Acc);
+            logger:error("~p~n error running hook: ~p~n", [HookName, Reason]),
+            run_fold1(Rest, HookName, Fun, Args0, Acc);
         stop ->
             Acc;
         {stop, NewAcc} ->
             NewAcc;
         _ ->
-            run_fold1(Rest, HookName,Fun,Args0, Ret)
+            run_fold1(Rest, HookName, Fun, Args0, Ret)
     end.
 
+invoke_foldr(HookName, Fun, Args, Acc) ->
+    case find_hooks(HookName) of
+        no_hook -> Acc;
+        Hooks ->
+            do_invoke(lists:reverse(Hooks), HookName, Fun, Args, Acc)
+    end.
 
+invoke(HookName, Fun, Args, Acc, AppendExtension) ->
+    case find_hooks(HookName) of
+        no_hook -> Acc;
+        Hooks ->
+            do_invoke(Hooks ++ AppendExtension, HookName, Fun, Args, Acc)
+    end.
+
+do_invoke([], _HookName, _Fun, _Args, Acc) ->
+    Acc;
+do_invoke([M | Rest], HookName, Fun, Args0, Acc) ->
+    Args = Args0 ++ [Acc],
+    Ret = (catch apply(M, Fun, Args)),
+    case Ret of
+        {'EXIT', Reason} ->
+            logger:error("~p~n error running hook: ~p~n", [HookName, Reason]),
+            do_invoke(Rest, HookName, Fun, Args0, Acc);
+        stop ->
+            Acc;
+        {stop, NewAcc} ->
+            NewAcc;
+        {ok, Args2, NewAcc2} ->
+            do_invoke(Rest, HookName, Fun, [Args2], NewAcc2)
+    end.
 
 
 %% @doc retrieve the lists of registered functions for an hook.
--spec find(HookName::hookname()) -> {ok, [{atom(), atom()}]} | error.
+-spec find(HookName :: atom()) -> {ok, [{atom(), atom()}]} | error.
 find(HookName) ->
-    case ?find_hook(HookName) of
+    case find_hooks(HookName) of
         no_hook -> error;
         Hooks -> {ok, Hooks}
     end.
@@ -144,7 +178,7 @@ code_change(_OldVsn, State, _Extra) ->
 terminate(_Reason, _Srv) ->
     ok.
 
-do_register(HookName, {_Priority, ModuleName}=Hook) ->
+do_register(HookName, {_Priority, ModuleName} = Hook) ->
     check_module(ModuleName),
     update_hooks(HookName, [Hook]).
 
@@ -180,10 +214,11 @@ check_module(ModuleName) ->
     _ = code:ensure_loaded(ModuleName),
     ok.
 
-find_hooks(HookName)->
-    case ets:lookup(?TAB,HookName) of
-        []->
+find_hooks(HookName) ->
+    case ets:lookup(?TAB, HookName) of
+        [] ->
             no_hook;
-        [{_, Modules}]->
-            Modules
+        [{_, Modules}] ->
+            Modules1 = [Module || {_, Module} <- Modules],
+            Modules1
     end.
diff --git a/src/dubbo_filter.erl b/src/dubbo_filter.erl
index 7fa7950..c0c1a4f 100644
--- a/src/dubbo_filter.erl
+++ b/src/dubbo_filter.erl
@@ -15,7 +15,14 @@
 %% limitations under the License.
 %%------------------------------------------------------------------------------
 -module(dubbo_filter).
--author("dlive").
+-include("dubbo.hrl").
 
+-type filter_result() :: {stop, term()}|{error, term()}|term().
+
+-callback(invoke(Invocation :: #dubbo_rpc_invocation{}, Acc :: invoke_result()) -> filter_result()).
+
+-callback(on_response(Invocation :: invocation(), Result :: invoke_result()) -> filter_result()).
 %% API
 -export([]).
+
+
diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker.erl
index f01cec4..9981f2d 100644
--- a/src/dubbo_invoker.erl
+++ b/src/dubbo_invoker.erl
@@ -21,11 +21,8 @@
 -export([]).
 
 
--callback(invoke(Invoker,Invocation) -> ok).
-
-
 %% API
--export([invoke_request/2, invoke_request/3, invoke_request/5]).
+-export([invoke_request/2, invoke_request/3, invoke_request/5, invoke_response/2]).
 
 -spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) ->
     {ok, reference()}|
@@ -39,7 +36,8 @@ invoke_request(Interface, Request) ->
     {ok, reference(), Data :: any(), RpcContent :: list()}|
     {error, Reason :: timeout|no_provider|any()}.
 invoke_request(Interface, Request, RequestOption) ->
-    invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
+    invoke_request(Interface, Request, RequestOption, self()).
+%%    invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
 
 
 -spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
@@ -68,6 +66,27 @@ invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
             {error, no_provider}
     end.
 
+invoke_request(Interface, Request, RequestOption, CallBackPid) ->
+    case dubbo_provider_consumer_reg_table:get_interface_info(Interface) of
+        undefined ->
+            {error, no_provider};
+        #interface_info{protocol = Protocol, loadbalance = LoadBalance} ->
+            ReferenceConfig = #reference_config{sync = is_sync(RequestOption)},
+            Ref = get_ref(RequestOption),
+            Invocation = Request#dubbo_request.data#dubbo_rpc_invocation{
+                loadbalance = LoadBalance,
+                call_ref = Ref,
+                reference_ops = ReferenceConfig,
+                source_pid = CallBackPid
+            },
+            Result = dubbo_extension:invoke(filter, invoke, [Invocation], {ok, Ref}, [Protocol]),
+            Result
+    end.
+
+invoke_response(Invocation, Result) ->
+    Result2 = dubbo_extension:invoke_foldr(filter, do_response, [Invocation], Result),
+    gen_server:cast(Invocation#dubbo_rpc_invocation.source_pid, {response_process, Invocation#dubbo_rpc_invocation.call_ref, Invocation#dubbo_rpc_invocation.attachments, Result2}),
+    ok.
 
 is_sync(Option) ->
     maps:is_key(sync, Option).
diff --git a/src/dubbo_invoker_cluster.erl b/src/dubbo_invoker_cluster.erl
deleted file mode 100644
index 906a0fa..0000000
--- a/src/dubbo_invoker_cluster.erl
+++ /dev/null
@@ -1,21 +0,0 @@
-%%------------------------------------------------------------------------------
-%% Licensed to the Apache Software Foundation (ASF) under one or more
-%% contributor license agreements.  See the NOTICE file distributed with
-%% this work for additional information regarding copyright ownership.
-%% The ASF licenses this file to You under the Apache License, Version 2.0
-%% (the "License"); you may not use this file except in compliance with
-%% the License.  You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%------------------------------------------------------------------------------
--module(dubbo_invoker_cluster).
--author("dlive").
-
-%% API
--export([]).
diff --git a/src/dubbo_invoker_old.erl b/src/dubbo_invoker_old.erl
deleted file mode 100644
index 354cef9..0000000
--- a/src/dubbo_invoker_old.erl
+++ /dev/null
@@ -1,97 +0,0 @@
-%%------------------------------------------------------------------------------
-%% Licensed to the Apache Software Foundation (ASF) under one or more
-%% contributor license agreements.  See the NOTICE file distributed with
-%% this work for additional information regarding copyright ownership.
-%% The ASF licenses this file to You under the Apache License, Version 2.0
-%% (the "License"); you may not use this file except in compliance with
-%% the License.  You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%------------------------------------------------------------------------------
--module(dubbo_invoker_old).
-
--include("dubbo.hrl").
-%% API
--export([invoke_request/2, invoke_request/3, invoke_request/5]).
-
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) ->
-    {ok, reference()}|
-    {ok, reference(), Data :: any(), RpcContent :: list()}|
-    {error, Reason :: timeout|no_provider|any()}.
-invoke_request(Interface, Request) ->
-    invoke_request(Interface, Request, [], #{}, self()).
-
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RequestOption :: map()) ->
-    {ok, reference()}|
-    {ok, reference(), Data :: any(), RpcContent :: list()}|
-    {error, Reason :: timeout|no_provider|any()}.
-invoke_request(Interface, Request, RequestOption) ->
-    invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
-
-
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
-    {ok, reference()}|
-    {ok, reference(), Data :: any(), RpcContent :: list()}|
-    {error, Reason :: timeout|no_provider|request_full|any()}.
-invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
-    case dubbo_provider_consumer_reg_table:select_connection(Interface, Request#dubbo_request.mid) of
-        {ok, #connection_info{pid = Pid, host_flag = HostFlag}} ->
-            case dubbo_traffic_control:check_goon(HostFlag, 199) of
-                ok ->
-                    Request2 = merge_attachments(Request, RpcContext),
-                    {ok, RequestData} = dubbo_codec:encode_request(Request2),
-                    Ref = get_ref(RequestState),
-                    gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}),
-                    case is_sync(RequestState) of
-                        true ->
-                            sync_receive(Ref, get_timeout(RequestState));
-                        false -> {ok, Ref}
-                    end;
-                full ->
-                    {error, request_full}
-            end;
-        {error, none} ->
-            logger:error("[INVOKE] ~p error Reason no_provider", [Interface]),
-            {error, no_provider}
-    end.
-
-
-is_sync(Option) ->
-    maps:is_key(sync, Option).
-get_ref(Option) ->
-    maps:get(ref, Option, make_ref()).
-
-get_timeout(Option) ->
-    maps:get(timeout, Option, ?REQUEST_TIME_OUT).
-
-
-sync_receive(Ref, TimeOut) ->
-    receive
-        {'$gen_cast', {response_process, Ref, RpcContent, Response}} ->
-            {ok, Ref, Response, RpcContent}
-    after
-        TimeOut ->
-            {error, timeout}
-    end.
-merge_attachments(#dubbo_request{data = null} = Request, _Option) ->
-    Request;
-merge_attachments(Request, Option) ->
-    Attachements = Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
-    case lists:keyfind(attachments, 1, Option) of
-        false -> OptionAttachments = [];
-        {attachments, OptionAttachments} ->
-            OptionAttachments
-    end,
-    List = [
-        {<<"version">>, <<"0.0.0">>},
-        {<<"timeout">>, <<"5000">>}
-    ],
-    Attachements2 = lists:merge3(Attachements, OptionAttachments, List),
-    Data2 = Request#dubbo_request.data#dubbo_rpc_invocation{attachments = Attachements2},
-    Request#dubbo_request{data = Data2}.
diff --git a/src/dubbo_loadbalance_random.erl b/src/dubbo_loadbalance_random.erl
index 21d4f61..ebd9dd1 100644
--- a/src/dubbo_loadbalance_random.erl
+++ b/src/dubbo_loadbalance_random.erl
@@ -17,4 +17,10 @@
 -module(dubbo_loadbalance_random).
 
 %% API
--export([]).
+-export([select/1]).
+
+select(List) ->
+    RandNum = rand:uniform(65535),
+    Len = length(List),
+    RemNum = (RandNum rem Len) + 1,
+    lists:nth(RemNum, List).
diff --git a/src/dubbo_netty_client.erl b/src/dubbo_netty_client.erl
deleted file mode 100644
index 06f9d1e..0000000
--- a/src/dubbo_netty_client.erl
+++ /dev/null
@@ -1,430 +0,0 @@
-%%------------------------------------------------------------------------------
-%% Licensed to the Apache Software Foundation (ASF) under one or more
-%% contributor license agreements.  See the NOTICE file distributed with
-%% this work for additional information regarding copyright ownership.
-%% The ASF licenses this file to You under the Apache License, Version 2.0
-%% (the "License"); you may not use this file except in compliance with
-%% the License.  You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%------------------------------------------------------------------------------
--module(dubbo_netty_client).
-
--behaviour(gen_server).
-
--include("dubbo.hrl").
-%% API
--export([start_link/4]).
-
-%% gen_server callbacks
--export([init/1,
-    handle_call/3,
-    handle_cast/2,
-    handle_info/2,
-    terminate/2,
-    code_change/3]).
--export([check_recv_data/2]).
-
--define(SERVER, ?MODULE).
-
--record(heartbeat, {last_write = 0, last_read = 0, timeout = 60000, max_timeout = 180000}).
--record(state, {provider_config, socket = undefined,
-    heartbeat = #heartbeat{},
-    recv_buffer = <<>>,         %%从服务端接收的数据
-    host_flag,
-    reconnection_timer
-}).
-
-%%%===================================================================
-%%% API
-%%%===================================================================
-
-%%--------------------------------------------------------------------
-%% @doc
-%% Starts the server
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(start_link(Name :: binary(), HostFlag :: binary(), ProviderConfig :: #provider_config{}, integer()) ->
-    {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
-start_link(Name, HostFlag, ProviderConfig, Index) ->
-    gen_server:start_link({local, Name}, ?MODULE, [HostFlag, ProviderConfig, Index], []).
-
-%%%===================================================================
-%%% gen_server callbacks
-%%%===================================================================
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Initializes the server
-%%
-%% @spec init(Args) -> {ok, State} |
-%%                     {ok, State, Timeout} |
-%%                     ignore |
-%%                     {stop, Reason}
-%% @end
-%%--------------------------------------------------------------------
--spec(init(Args :: term()) ->
-    {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term()} | ignore).
-init([HostFlag, ProviderConfig, Index]) ->
-    erlang:process_flag(min_bin_vheap_size, 1024 * 1024),
-    #provider_config{host = Host, port = Port} = ProviderConfig,
-    State = case open(Host, Port) of
-                {ok, Socket} ->
-                    #state{socket = Socket};
-                {error, _Reason} ->
-                    #state{}
-            end,
-    NowStamp = dubbo_time_util:timestamp_ms(),
-    HeartBeatInfo = #heartbeat{last_read = NowStamp, last_write = NowStamp},
-    logger:info("netty client start ~p", [HostFlag]),
-    start_heartbeat_timer(HeartBeatInfo),
-    {ok, State#state{provider_config = ProviderConfig, heartbeat = HeartBeatInfo, host_flag = HostFlag}}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling call messages
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
-    State :: #state{}) ->
-    {reply, Reply :: term(), NewState :: #state{}} |
-    {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
-    {noreply, NewState :: #state{}} |
-    {noreply, NewState :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
-    {stop, Reason :: term(), NewState :: #state{}}).
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling cast messages
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_cast(Request :: term(), State :: #state{}) ->
-    {noreply, NewState :: #state{}} |
-    {noreply, NewState :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term(), NewState :: #state{}}).
-
-handle_cast({send_request, Ref, Request, Data, SourcePid, RequestState}, State) ->
-    logger:debug("[send_request begin] send data to provider consumer mid ~p pid ~p sourcePid ~p", [Request#dubbo_request.mid, self(), SourcePid]),
-    NewState = case send_msg(Data, State) of
-                   ok ->
-                       save_request_info(Request, SourcePid, Ref, RequestState),
-                       logger:debug("[send_request end] send data to provider consumer pid ~p state ok", [self()]),
-                       State;
-                   {error, closed} ->
-                       logger:warning("send request error, connection is closed"),
-                       State2 = reconnect(State),
-                       State2;
-                   {error, R1} ->
-                       logger:error("[send_request end] send data to provider consumer pid error ~p ~p", [self(), R1]),
-                       State
-               end,
-    HeartbeatInfo = update_heartbeat(write, NewState#state.heartbeat),
-    {noreply, NewState#state{heartbeat = HeartbeatInfo}};
-
-handle_cast(_Request, State) ->
-    {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling all non call/cast messages
-%%
-%% @spec handle_info(Info, State) -> {noreply, State} |
-%%                                   {noreply, State, Timeout} |
-%%                                   {stop, Reason, State}
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
-    {noreply, NewState :: #state{}} |
-    {noreply, NewState :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term(), NewState :: #state{}}).
-
-
-handle_info({tcp, _Port, Data}, #state{recv_buffer = RecvBuffer} = State) ->
-%%    inet:setopts(State#state.socket, [{active, once}]),
-%%    logger:debug("[INFO] recv one data ~w",[Data]),
-    {ok, NextBuffer, NewState} = case check_recv_data(<<RecvBuffer/binary, Data/binary>>, State) of
-                                     {next_buffer, NextBuffer2, State3} ->
-                                         logger:debug("[INFO] recv one data state wait next_buffer"),
-                                         {ok, NextBuffer2, State3}
-                                 end,
-%%    HeartbeatInfo =update_heartbeat(write,NewState#state.heartbeat),
-    {noreply, NewState#state{recv_buffer = NextBuffer}};
-handle_info({tcp_closed, Port}, State) ->
-    logger:info("dubbo connection closed ~p", [Port]),
-    NewState = reconnect(State),
-    {noreply, NewState};
-handle_info({timeout, _TimerRef, {reconnect}}, State) ->
-    NewState = reconnect(State#state{reconnection_timer = undefined}),
-    {noreply, NewState};
-handle_info({timeout, _TimerRef, {heartbeat_timer}}, State) ->
-    {ok, NewState} = case check_heartbeat_state(State) of
-                         {normal} -> {ok, State};
-                         {send_heart} ->
-                             send_heartbeat_msg(undefined, true, State);
-                         {reconnect} ->
-                             %% @todo reconnect
-                             {ok, State}
-                     end,
-    HeartbeatInfo = update_heartbeat(write, NewState#state.heartbeat),
-    start_heartbeat_timer(HeartbeatInfo),
-    {noreply, NewState#state{heartbeat = HeartbeatInfo}};
-handle_info(_Info, State) ->
-    logger:warning("[INFO] get one info:~p", [_Info]),
-%%    inet:setopts(State#state.socket, [{active, once}]),
-%%    case State#state.tmp_pid of
-%%        undefined  ->ok;
-%%        Pid ->
-%%            gen_server:cast(Pid,{msg_back})
-%%    end,
-    HeartbeatInfo = update_heartbeat(write, State#state.heartbeat),
-    {noreply, State#state{heartbeat = HeartbeatInfo}}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any
-%% necessary cleaning up. When it returns, the gen_server terminates
-%% with Reason. The return value is ignored.
-%%
-%% @spec terminate(Reason, State) -> void()
-%% @end
-%%--------------------------------------------------------------------
--spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
-    State :: #state{}) -> term()).
-terminate(_Reason, _State) ->
-    logger:warning("terminate reason:~p", [_Reason]),
-    ok.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Convert process state when code is changed
-%%
-%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% @end
-%%--------------------------------------------------------------------
--spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
-    Extra :: term()) ->
-    {ok, NewState :: #state{}} | {error, Reason :: term()}).
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%%===================================================================
-%%% Internal functions
-%%%===================================================================
-
-open(Host, Port) ->
-    logger:debug("will connect to provider ~p ~p", [Host, Port]),
-    %
-    case gen_tcp:connect(Host, Port, [
-        binary,
-        {packet, 0}, {active, false},
-        {reuseaddr, true},
-        {delay_send, true},
-        {nodelay, true},
-        {high_watermark, 512 * 1024},
-        {low_watermark, 256 * 1024},
-        {sndbuf, 512 * 1024},
-        {recbuf, 512 * 1024}
-    ]) of
-        {ok, Sockets} ->
-            inet:setopts(Sockets, [{active, true}]),
-            {ok, Sockets};
-        Info ->
-            logger:error("start client connection error ~p", [Info]),
-            {error, Info}
-    end.
-
-reconnect(#state{reconnection_timer = Timer} = State) when Timer /= undefined ->
-    State;
-reconnect(State) ->
-    #provider_config{host = Host, port = Port} = State#state.provider_config,
-    case State#state.socket of
-        undefined -> ok;
-        Socket ->
-            gen_tcp:close(Socket)
-    end,
-    case open(Host, Port) of
-        {ok, Socket2} ->
-            logger:warning("reconnect to provider ~p ~p success", [Host, Port]),
-            State#state{socket = Socket2, recv_buffer = <<>>};
-        {error, Reason} ->
-            logger:warning("connect to provider error ~p", [Reason]),
-            TimerRef = erlang:start_timer(2000, self(), {reconnect}),
-            State#state{socket = undefined, reconnection_timer = TimerRef}
-    end.
-
-send_msg(Msg, State) ->
-    case State#state.socket of
-        undefined ->
-            {error, closed};
-        Socket ->
-            case gen_tcp:send(Socket, Msg) of
-                ok ->
-                    ok;
-                {error, Reason} ->
-                    logger:error("send to server error,reason:~p", [Reason]),
-                    {error, Reason}
-            end
-    end.
-
-%%%=================================================================
-%%% 心跳检测
-%%%=================================================================
-start_heartbeat_timer(HeartbeatInfo) ->
-    erlang:start_timer(HeartbeatInfo#heartbeat.timeout, self(), {heartbeat_timer}),
-    ok.
-update_heartbeat(write, Info) ->
-    Info#heartbeat{last_write = dubbo_time_util:timestamp_ms()};
-update_heartbeat(read, Info) ->
-    Info#heartbeat{last_read = dubbo_time_util:timestamp_ms()}.
-
-
-check_heartbeat_state(#state{heartbeat = HeartBeatInfo} = _State) ->
-    Now = dubbo_time_util:timestamp_ms(),
-    #heartbeat{last_read = LastRead, last_write = LastWrite, timeout = Timeout, max_timeout = MaxTimeout} = HeartBeatInfo,
-    if
-        (Now - LastRead) > Timeout ->
-            {send_heart};
-        (Now - LastWrite) > Timeout ->
-            {send_heart};
-        (Now - LastRead) > MaxTimeout ->
-            {reconnect};
-        true ->
-            {normal}
-    end.
-
-
-send_heartbeat_msg(Mid, NeedResponse, State) ->
-    {ok, Bin} = dubbo_heartbeat:generate_request(Mid, NeedResponse),
-    NewState = case send_msg(Bin, State) of
-                   ok ->
-                       logger:info("send one heartbeat to server"),
-                       State;
-                   {error, Reason} ->
-                       logger:warning("dubbo connection send heartbeat error ~p", [Reason]),
-                       State2 = reconnect(State),
-                       State2
-               end,
-    {ok, NewState}.
-
-%%%=================================================================
-%%% 接收数据处理
-%%%=================================================================
--spec check_recv_data(Data :: binary(), State :: #state{}) -> {ready, ReadyData :: binary()} | {ready, ReadyData :: binary(), NextBuffer :: binary()}.
-check_recv_data(<<?DUBBO_MEGIC_HIGH, ?DUBBO_MEGIC_LOW, Rest/binary>> = Data, State) when byte_size(Rest) < 14 ->
-    {next_buffer, Data, State};
-check_recv_data(<<?DUBBO_MEGIC_HIGH, ?DUBBO_MEGIC_LOW, _OtherFlag:80, DataLen:32, Rest/binary>> = Data, State) ->
-    RestSize = byte_size(Rest),
-    if
-        DataLen == RestSize ->
-            {ok, State2} = process_data(Data, State),
-            {next_buffer, <<>>, State2};
-        DataLen > RestSize ->
-            logger:warning("need wait next buffer data ~p", [Data]),
-            {next_buffer, Data, State};
-        DataLen < RestSize ->
-            <<ReadyData:DataLen/binary, NextBuffer/binary>> = Rest,
-            OneData = <<?DUBBO_MEGIC_HIGH:8, ?DUBBO_MEGIC_LOW:8, _OtherFlag:80, DataLen:32, ReadyData/binary>>,
-            {ok, State3} = process_data(OneData, State),
-%%            logger:warning("recevi more data ~w ",[NextBuffer]),
-            check_recv_data(NextBuffer, State3)
-    end;
-check_recv_data(<<Error/integer, Data/binary>>, State) ->
-    logger:error("recv bad header data,Begin Byte:~p", [Error]),
-    check_recv_data(Data, State);
-check_recv_data(<<>>, State) ->
-    {next_buffer, <<>>, State}.
-
-
-process_data(Data, State) ->
-    <<Header:16/binary, RestData/binary>> = Data,
-    case dubbo_codec:decode_header(Header) of
-        {ok, response, ResponseInfo} ->
-            process_response(ResponseInfo#dubbo_response.is_event, ResponseInfo, RestData, State),
-%%            dubbo_traffic_control:decr_count(State#state.host_flag),
-%%            case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
-%%                undefined->
-%%                    logger:error("dubbo response can't find request data,response ~p",[ResponseInfo]);
-%%                {SourcePid,Ref,_RequestState} ->
-%%                    {ok,Res} = dubbo_codec:decode_response(ResponseInfo,RestData),
-%%
-%%                    logger:info("got one response mid ~p, is_event ~p state ~p",[Res#dubbo_response.mid,Res#dubbo_response.is_event,Res#dubbo_response.state]),
-%%                    case Res#dubbo_response.is_event of
-%%                        false ->
-%%                            %% todo rpccontent need merge response with request
-%%                            RpcContent=[],
-%%                            ResponseData = dubbo_type_transfer:response_to_native(Res),
-%%                            gen_server:cast(SourcePid,{response_process,Ref,RpcContent,ResponseData});
-%%                        _->
-%%                            ok
-%%                    end
-%%            end,
-            {ok, State};
-        {ok, request, RequestInfo} ->
-            {ok, Req} = dubbo_codec:decode_request(RequestInfo, RestData),
-            logger:info("get one request mid ~p, is_event ~p", [Req#dubbo_request.mid, Req#dubbo_request.is_event]),
-            {ok, State2} = process_request(Req#dubbo_request.is_event, Req, State),
-            {ok, State2};
-        {error, Type, RelData} ->
-            logger:error("process_data error type ~p RelData ~p", [Type, RelData]),
-            {ok, State}
-    end.
-
-
-%% @doc process event
--spec process_response(IsEvent :: boolean(), #dubbo_response{}, #state{}, term()) -> ok.
-process_response(false, ResponseInfo, RestData, State) ->
-    dubbo_traffic_control:decr_count(State#state.host_flag),
-    case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
-        undefined ->
-            logger:error("dubbo response can't find request data,response ~p", [ResponseInfo]);
-        {SourcePid, Ref, _RequestState} ->
-            {ok, Res} = dubbo_codec:decode_response(ResponseInfo, RestData),
-            logger:info("got one response mid ~p, is_event ~p state ~p", [Res#dubbo_response.mid, Res#dubbo_response.is_event, Res#dubbo_response.state]),
-            case Res#dubbo_response.is_event of
-                false ->
-                    %% todo rpccontent need merge response with request
-                    RpcContent = [],
-                    ResponseData = dubbo_type_transfer:response_to_native(Res),
-                    gen_server:cast(SourcePid, {response_process, Ref, RpcContent, ResponseData});
-                _ ->
-                    ok
-            end
-    end,
-    {ok, State};
-process_response(true, _ResponseInfo, _RestData, State) ->
-    {ok, State}.
-
-process_request(true, #dubbo_request{data = <<"R">>}, State) ->
-    {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
-    {ok, State};
-process_request(true, Request, State) ->
-    {ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State),
-    {ok, NewState};
-process_request(false, Request, State) ->
-    {ok, State}.
-
-
-save_request_info(Request, SourcePid, Ref, RequestState) ->
-    put(Request#dubbo_request.mid, {SourcePid, Ref, RequestState}).
-get_earse_request_info(Mid) ->
-    erase(Mid).
\ No newline at end of file
diff --git a/src/dubbo_node_config_util.erl b/src/dubbo_node_config_util.erl
index 32207da..9e3108a 100644
--- a/src/dubbo_node_config_util.erl
+++ b/src/dubbo_node_config_util.erl
@@ -17,30 +17,19 @@
 -module(dubbo_node_config_util).
 
 -include("dubbo.hrl").
+-include("dubboerl.hrl").
 %% API
 -export([parse_provider_info/1, gen_provider_info/1]).
 
-parse_provider_info(ProviderStr) when is_binary(ProviderStr) ->
-    parse_provider_info(binary_to_list(ProviderStr));
-parse_provider_info(ProviderStr) ->
-    case http_uri:parse(http_uri:decode(ProviderStr), [{scheme_defaults, [{dubbo, 20880}]}]) of
-        {ok, {Scheme, _UserInfo, Host, Port, _Path, Query}} ->
-            QueryStr = case lists:prefix("?", Query) of
-                           true ->
-                               [_ | Query2] = Query,
-                               Query2;
-                           false ->
-                               Query
-                       end,
-            QueryListTmp = string:tokens(QueryStr, "&"),
-            ProviderConfig = parse_parameter(QueryListTmp, #provider_config{protocol = Scheme, host = Host, port = Port}),
-            logger:debug("parse provider info string ~p,result: ~p", [ProviderStr, ProviderConfig]),
-            {ok, ProviderConfig};
-        {error, R1} ->
-            logger:debug("parse provider error string ~p, error ~p", [ProviderStr, R1]),
-            {error, R1}
-    end.
-
+parse_provider_info(#dubbo_url{scheme = Scheme, host = Host, port = Port, parameters = Parameters}) ->
+    ProviderInfo = #provider_config{protocol = Scheme, host = Host, port = Port},
+    maps:to_list(Parameters),
+    ProviderConfig = maps:fold(
+        fun(K, V, Acc1) ->
+            parse_parameter(K, V, Acc1)
+        end, ProviderInfo, Parameters),
+    logger:debug("parse provider,result: ~p", [ProviderConfig]),
+    {ok, ProviderConfig}.
 
 parse_parameter([], Config) ->
     Config;
@@ -54,24 +43,23 @@ parse_parameter([Item | Rest], Config) ->
             logger:error("parse parameter error, keypair ~p", [KeyPair2]),
             parse_parameter(Rest, Config)
     end.
-parse_parameter("anyhost", Value, Config) ->
-    Config#provider_config{anyhost = list_to_atom(Value)};
-parse_parameter("application", Value, Config) ->
-    Config#provider_config{application = list_to_binary(Value)};
-parse_parameter("dubbo", Value, Config) ->
-    Config#provider_config{dubbo = list_to_binary(Value)};
-parse_parameter("executes", Value, Config) ->
-    Config#provider_config{executes = list_to_integer(Value)};
-parse_parameter("interface", Value, Config) ->
-    Config#provider_config{interface = list_to_binary(Value)};
-parse_parameter("methods", Value, Config) ->
-    MethodList = string:tokens(Value, ","),
-    MethodList2 = [list_to_binary(Item) || Item <- MethodList],
-    Config#provider_config{methods = MethodList2};
-parse_parameter("side", Value, Config) ->
-    Config#provider_config{side = list_to_binary(Value)};
-parse_parameter("interface", Value, Config) ->
-    Config#provider_config{interface = list_to_binary(Value)};
+parse_parameter(<<"anyhost">>, Value, Config) ->
+    Config#provider_config{anyhost = binary_to_existing_atom(Value, latin1)};
+parse_parameter(<<"application">>, Value, Config) ->
+    Config#provider_config{application = Value};
+parse_parameter(<<"dubbo">>, Value, Config) ->
+    Config#provider_config{dubbo = Value};
+parse_parameter(<<"executes">>, Value, Config) ->
+    Config#provider_config{executes = binary_to_integer(Value)};
+parse_parameter(<<"interface">>, Value, Config) ->
+    Config#provider_config{interface = Value};
+parse_parameter(<<"methods">>, Value, Config) ->
+    MethodList = binary:split(Value, <<",">>, [global, trim_all]),
+    Config#provider_config{methods = MethodList};
+parse_parameter(<<"side">>, Value, Config) ->
+    Config#provider_config{side = Value};
+parse_parameter(<<"interface">>, Value, Config) ->
+    Config#provider_config{interface = Value};
 parse_parameter(_, _, Config) ->
     Config.
 
diff --git a/src/dubbo_protocol.erl b/src/dubbo_protocol.erl
index 7ecfcd8..9a5e659 100644
--- a/src/dubbo_protocol.erl
+++ b/src/dubbo_protocol.erl
@@ -16,11 +16,8 @@
 %%------------------------------------------------------------------------------
 -module(dubbo_protocol).
 
--callback refer(Url,Acc)->ok.
+-include("dubbo.hrl").
 
-%%%% API
-%%-export([refer/2]).
-%%
-%%
-%%refer(InterfaceClassInfo,Url)->
-%%    dubbo_hooker(protocol_wapper,refer,[InterfaceClassInfo,Url]).
\ No newline at end of file
+-callback refer(Url :: binary(), Acc :: term()) -> ok.
+
+-callback export(Invoker :: #invoker{}, Acc :: term()) -> ok.
diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl
index 96947cc..fc6a224 100644
--- a/src/dubbo_protocol_dubbo.erl
+++ b/src/dubbo_protocol_dubbo.erl
@@ -20,13 +20,14 @@
 -include("dubbo.hrl").
 
 %% API
--export([refer/2]).
+-export([refer/2, invoke/2, data_receive/1]).
+-export([export/2]).
 
 refer(Url, Acc) ->
     {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
     case UrlInfo#dubbo_url.scheme of
         <<"dubbo">> ->
-            {ok,Invoker} = do_refer(UrlInfo),
+            {ok, Invoker} = do_refer(UrlInfo),
             {ok, Invoker};
         _ ->
             {skip, Acc}
@@ -38,9 +39,9 @@ do_refer(UrlInfo) ->
 %%            OldHostList = dubbo_provider_consumer_reg_table:get_interface_provider_node(ProviderConfig#provider_config.interface),
             case getClients(ProviderConfig) of
                 {ok, ConnectionInfoList} ->
-                    dubbo_provider_consumer_reg_table:update_node_conections(ProviderConfig#provider_config.interface,ConnectionInfoList),
+                    dubbo_provider_consumer_reg_table:update_node_conections(ProviderConfig#provider_config.interface, ConnectionInfoList),
                     HostFlag = dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
-                    {ok,#dubbo_invoker{host_flag = HostFlag,handle = ?MODULE}};
+                    {ok, #dubbo_invoker{host_flag = HostFlag, handle = ?MODULE}};
                 {error, Reason} ->
                     {error, Reason}
             end;
@@ -49,30 +50,30 @@ do_refer(UrlInfo) ->
             {error, R1}
     end.
 
+export(Invoker, _Acc) ->
+    registry_provider_impl_module(Invoker),
+    ok = service_listen_check_start(Invoker#invoker.url),
+    {ok, Invoker}.
+
 getClients(ProviderConfig) ->
     %% @todo if connections parameter > 1, need new spec transport
     case new_transport(ProviderConfig) of
         {ok, ConnectionInfoList} ->
-%%            ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
             {ok, ConnectionInfoList};
         {error, Reason} ->
             {error, Reason}
     end.
 
 
-%%ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),
-
-
 new_transport(ProviderConfig) ->
 
-    HostFlag = get_host_flag(ProviderConfig),
-    case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of
+    case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config.host, ProviderConfig#provider_config.port) of
         [] ->
             case dubbo_exchanger:connect(ProviderConfig, ?MODULE) of
                 {ok, ConnectionInfo} ->
                     {ok, [ConnectionInfo]};
                 {error, Reason} ->
-                    logger:warning("start client fail ~p ~p", [Reason, HostFlag]),
+                    logger:warning("start client fail ~p ~p ~p", [Reason, ProviderConfig#provider_config.host, ProviderConfig#provider_config.port]),
                     {error, Reason}
             end;
         ConnectionInfoList ->
@@ -82,3 +83,102 @@ new_transport(ProviderConfig) ->
 
 
 
+invoke(#dubbo_rpc_invocation{source_pid = CallBackPid, transport_pid = TransportPid, call_ref = Ref} = Invocation, Acc) ->
+
+%%    Request2 = merge_attachments(Request, RpcContext), %% @todo need add rpc context to attachment
+    Request = dubbo_adapter:reference(Invocation),
+    {ok, RequestData} = dubbo_codec:encode_request(Request),
+    gen_server:cast(TransportPid, {send_request, Ref, Request, RequestData, CallBackPid, Invocation}),
+    {ok, Invocation, Acc}.
+%%    case is_sync(RequestState) of
+%%        true ->
+%%            sync_receive(Ref, get_timeout(RequestState));
+%%        false -> {ok, Ref}
+%%    end.
+
+
+
+-spec(data_receive(binary()) -> ok|{do_heartbeat, Mid :: integer()}).
+data_receive(Data) ->
+    <<Header:16/binary, RestData/binary>> = Data,
+    case dubbo_codec:decode_header(Header) of
+        {ok, response, ResponseInfo} ->
+            process_response(ResponseInfo#dubbo_response.is_event, ResponseInfo, RestData);
+        {ok, request, RequestInfo} ->
+            {ok, Req} = dubbo_codec:decode_request(RequestInfo, RestData),
+            logger:info("get one request mid ~p, is_event ~p", [Req#dubbo_request.mid, Req#dubbo_request.is_event]),
+            process_request(Req#dubbo_request.is_event, Req);
+        {error, Type, RelData} ->
+            logger:error("process_data error type ~p RelData ~p", [Type, RelData]),
+            ok
+    end.
+
+
+%% @doc process event
+-spec process_response(IsEvent :: boolean(), #dubbo_response{}, RestData :: binary()) -> ok.
+process_response(false, ResponseInfo, RestData) ->
+%%    dubbo_traffic_control:decr_count(State#state.host_flag),
+
+    %% @todo traffic need move limit filter
+    case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
+        undefined ->
+            logger:error("dubbo response can't find request data,response ~p", [ResponseInfo]);
+        {SourcePid, Ref, Invocation} ->
+            {ok, Res} = dubbo_codec:decode_response(ResponseInfo, RestData),
+            logger:info("got one response mid ~p, is_event ~p state ~p", [Res#dubbo_response.mid, Res#dubbo_response.is_event, Res#dubbo_response.state]),
+            case Res#dubbo_response.is_event of
+                false ->
+                    %% @todo rpccontent need merge response with request
+                    ResponseData = dubbo_type_transfer:response_to_native(Res),
+                    dubbo_invoker:invoke_response(Invocation, ResponseData);
+                _ ->
+                    ok
+            end
+    end,
+    ok;
+process_response(true, _ResponseInfo, _RestData) ->
+    ok.
+
+process_request(true, #dubbo_request{data = <<"R">>}) ->
+    {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
+    ok;
+process_request(true, Request) ->
+    {do_heartbeat, Request#dubbo_request.mid};
+process_request(false, Request) ->
+    ok.
+
+get_earse_request_info(Mid) ->
+    erase(Mid).
+
+
+registry_provider_impl_module(Invoker) ->
+
+    case dubbo_common_fun:parse_url(Invoker#invoker.url) of
+        {ok, UrlInfo} ->
+            Interface = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
+            ok = dubbo_provider_protocol:register_impl_provider(Interface, Invoker#invoker.handler)
+    end.
+
+
+service_listen_check_start(Url) ->
+    case dubbo_common_fun:parse_url(Url) of
+        {ok,#dubbo_url{port = Port}} ->
+            case server_is_start() of
+                true ->
+                    ok;
+                false ->
+                    {ok, _} = ranch:start_listener(dubbo_provider, ranch_tcp, [{port, Port}], dubbo_provider_protocol, []),
+                    ok
+            end;
+        {error,Reason} ->
+            {error,Reason}
+    end.
+
+server_is_start() ->
+    try ranch:get_protocol_options(dubbo_provider) of
+        _ ->
+            true
+    catch
+        _:_ ->
+            false
+    end.
\ No newline at end of file
diff --git a/src/dubbo_protocol_registry.erl b/src/dubbo_protocol_registry.erl
index b2fde17..47174f6 100644
--- a/src/dubbo_protocol_registry.erl
+++ b/src/dubbo_protocol_registry.erl
@@ -18,36 +18,97 @@
 -behaviour(dubbo_protocol).
 
 -include("dubboerl.hrl").
+-include("dubbo.hrl").
+-include("constrans.hrl").
 
-%% API
--export([refer/1]).
 
-refer(Url)->
-    {ok,UrlInfo} =  dubbo_common_fun:parse_url(Url),
+%% API
+-export([refer/2, export/2, destroy/0]).
 
-    {ok,RegistryName} = dubbo_registry:setup_register(UrlInfo),
+refer(Url, Acc) ->
+    {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
+    RegistryUrlInfo = gen_registry_urlinfo(UrlInfo),
+    {ok, RegistryName} = dubbo_registry:setup_register(RegistryUrlInfo),
 
     ConsumerUrl = gen_consumer_url(UrlInfo),
-    %% 通知directory
-    dubbo_registry:register(RegistryName,ConsumerUrl),
+    dubbo_registry:register(RegistryName, ConsumerUrl),
+
+    dubbo_directory:subscribe(RegistryName, ConsumerUrl),
+
+    ok.
+
+export(Invoker, Acc) ->
+    {ok, UrlInfo} = dubbo_common_fun:parse_url(Invoker#invoker.url),
+    %% url = registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hello-world&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.1.5%3A20880%2Forg.apache.dubbo.erlang.sample.service.facade.UserOperator%3Fanyhost%3Dtrue%26application%3Dhello-world%26bean.name%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26bind.ip%3D192.168.1.5%26bind.port%3D20880%26default.deprecated%3Dfalse%26default.dynamic%3Dfalse%26default.register%3Dtrue%26deprecated%3Dfalse%26dubbo%3 [...]
+    ProtocolUrl = get_provider_url(UrlInfo),
+    {ok, InterfaceKey} = do_local_export(Invoker, ProtocolUrl),
+
+    RegistryUrlInfo = gen_registry_urlinfo(UrlInfo),
+    {ok, RegistryName} = dubbo_registry:setup_register(RegistryUrlInfo),
+    dubbo_registry:register(RegistryName, ProtocolUrl),
+
+    register_export_info(ProtocolUrl, RegistryName, InterfaceKey),
+    {ok, Invoker}.
+
+destroy() ->
+    List = ets:tab2list(?SERVICE_EXPORT_TABLE),
+    lists:map(
+        fun(Item) ->
 
-    dubbo_directory:subscribe(RegistryName,ConsumerUrl),
+            {ProtocolUrl, RegistryModule, _} = Item,
+            io:format(user, "destroy url ~p~n", [ProtocolUrl]),
+            unexport(RegistryModule, ProtocolUrl)
+        end, List),
+    ok.
 
-    %% return
+unexport(RegistryModule, Url) ->
+    dubbo_registry:unregister(RegistryModule, Url),
     ok.
 
+do_local_export(Invoker, Url) ->
+    %% Url = dubbo://127.0.0.1:20880/org.apache.dubbo.erlang.sample.service.facade.UserOperator?anyhost=true&application=hello-world&bean.name=org.apache.dubbo.erlang.sample.service.facade.UserOperator&bind.ip=127.0.0.1&bind.port=20880&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid [...]
+    {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
+    Protocol = UrlInfo#dubbo_url.scheme,
+    ProtocolModule = binary_to_existing_atom(<<<<"dubbo_protocol_">>/binary, Protocol/binary>>, latin1),
+    _Result = apply(ProtocolModule, export, [Invoker#invoker{url = Url}, ok]),
+
+    InterfaceKey = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
+
+    {ok, InterfaceKey}.
+
+register_export_info(ProtocolUrl, RegistryModule, InterfaceKey) ->
+    ets:insert(?SERVICE_EXPORT_TABLE, {ProtocolUrl, RegistryModule, InterfaceKey}),
+    ok.
 
-gen_consumer_url(UrlInfo)->
+
+gen_consumer_url(UrlInfo) ->
     Parameters = UrlInfo#dubbo_url.parameters,
     #{<<"refer">> := Refer} = Parameters,
     Refer2 = http_uri:decode(Refer),
-    Parameters2 = dubbo_common_fun:parse_url(Refer2,#{}),
-    #{<<"interface">> := Interface} = Parameters2,
+    Parameters2 = dubbo_common_fun:parse_url_parameter(Refer2),
+    Parameters3 = Parameters2#{
+        ?CATEGORY_KEY => ?CONSUMERS_CATEGORY
+    },
+    #{<<"interface">> := Interface} = Parameters3,
     ConsumerUrlInfo = UrlInfo#dubbo_url{
         scheme = <<"consumer">>,
         host = dubbo_common_fun:local_ip_v4_str(),
         path = Interface,
-        parameters = Parameters2
+        parameters = Parameters3
     },
     ConsumerUrl = dubbo_common_fun:url_to_binary(ConsumerUrlInfo),
-    ConsumerUrl.
\ No newline at end of file
+    ConsumerUrl.
+get_provider_url(UrlInfo) ->
+    ExportUrl = maps:get(<<"export">>, UrlInfo#dubbo_url.parameters),
+    ExportUrl2 = http_uri:decode(ExportUrl),
+    {ok,ExportUrlInfo} = dubbo_common_fun:parse_url(ExportUrl2),
+    ParameterNew = maps:put(?CATEGORY_KEY,?PROVIDERS_CATEGORY,ExportUrlInfo#dubbo_url.parameters),
+    ExportUrlInfoNew = ExportUrlInfo#dubbo_url{parameters = ParameterNew},
+    logger:debug("registry gen provider url info ~p",[ExportUrlInfoNew]),
+    dubbo_common_fun:url_to_binary(ExportUrlInfoNew).
+
+gen_registry_urlinfo(UrlInfo) ->
+    Parameters = UrlInfo#dubbo_url.parameters,
+    UrlInfo#dubbo_url{
+        scheme = maps:get(<<"registry">>, Parameters, <<"zookeeper">>)
+    }.
\ No newline at end of file
diff --git a/src/dubbo_provider_consumer_reg_table.erl b/src/dubbo_provider_consumer_reg_table.erl
index 1c193b8..31c862b 100644
--- a/src/dubbo_provider_consumer_reg_table.erl
+++ b/src/dubbo_provider_consumer_reg_table.erl
@@ -29,15 +29,15 @@
     terminate/2,
     code_change/3]).
 
--export([update_consumer_connections/2,update_node_conections/2,get_interface_provider_node/1,get_host_connections/2, select_connection/1,
-    select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2,clean_invalid_provider/1]).
+-export([update_consumer_connections/2, update_node_conections/2, query_node_connections/1, get_interface_provider_node/1, get_host_connections/2, select_connection/1,
+    update_connection_readonly/2, get_host_flag/1, get_host_flag/2, clean_invalid_provider/1, update_interface_info/1, get_interface_info/1]).
 
 -include("dubbo.hrl").
 -define(SERVER, ?MODULE).
 
 -define(INTERFCE_LIST_TABLE, interface_list).
 
--define(INTERFAE_INFO_TABLE,dubbo_interface_info).
+-define(INTERFACE_INFO_TABLE, dubbo_interface_info).
 
 -define(PROVIDER_NODE_LIST_TABLE, provider_node_list).
 
@@ -99,12 +99,12 @@ init_ets_table() ->
         _Type1:Reason1 ->
             logger:error("new ets table  PROVIDER_NODE_LIST_TABLE error ~p", [Reason1])
     end,
-    try ets:new(?INTERFAE_INFO_TABLE, [public, named_table, {keypos, 2}]) of
-        ?INTERFAE_INFO_TABLE ->
+    try ets:new(?INTERFACE_INFO_TABLE, [public, named_table, {keypos, 2}]) of
+        ?INTERFACE_INFO_TABLE ->
             ok
     catch
-        _Type1:Reason1 ->
-            logger:error("new ets table  PROVIDER_NODE_LIST_TABLE error ~p", [Reason1])
+        _Type2:Reason2 ->
+            logger:error("new ets table  INTERFACE_INFO_TABLE error ~p", [Reason2])
     end,
     ok.
 %%--------------------------------------------------------------------
@@ -123,13 +123,13 @@ init_ets_table() ->
     {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
     {stop, Reason :: term(), NewState :: #state{}}).
 
-handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) ->
-
-    OldProviderList = get_interface_provider_node(Interface),
-    NewProviderList = add_consumer(ProviderNodeList, []),
-    DeleteProverList = OldProviderList -- NewProviderList,
-    clean_invalid_provider(DeleteProverList),
-    {reply, ok, State};
+%%handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) ->
+%%
+%%    OldProviderList = get_interface_provider_node(Interface),
+%%    NewProviderList = add_consumer(ProviderNodeList, []),
+%%    DeleteProverList = OldProviderList -- NewProviderList,
+%%    clean_invalid_provider(DeleteProverList),
+%%    {reply, ok, State};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
@@ -204,10 +204,20 @@ get_host_connections(Host, Port) ->
     List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag),
     List.
 
-update_interface_info(InterfaceInfo)->
-    ets:insert(?INTERFAE_INFO_TABLE,InterfaceInfo).
 
 
+update_interface_info(InterfaceInfo) ->
+    ets:insert(?INTERFACE_INFO_TABLE, InterfaceInfo).
+
+
+get_interface_info(Interface) ->
+    case ets:lookup(?INTERFACE_INFO_TABLE, Interface) of
+        [] ->
+            undefined;
+        [Result] ->
+            Result
+    end.
+
 %%%===================================================================
 %%% Internal functions
 %%%===================================================================
@@ -248,32 +258,27 @@ update_interface_info(InterfaceInfo)->
 %%    ConnectionList.
 
 
-update_node_conections(HostFlag,Connections)->
+update_node_conections(Interface, Connections) ->
     lists:map(
         fun(Item) ->
-            HostFlag= Item#connection_info.host_flag,
-            case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#connection_info{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
-                '$end_of_table' ->
+            HostFlag = Item#connection_info.host_flag,
+            case ets:match_object(?PROVIDER_NODE_LIST_TABLE, #connection_info{host_flag = HostFlag, pid = Item#connection_info.pid, _ = "_"}) of
+                [] ->
                     I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item),
-                    logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
+                    logger:debug("update_node_conections insert one record ~p result:~p", [HostFlag, I2]);
                 _ ->
+                    logger:debug("update_node_conections hostflag ~p already exit ", [HostFlag]),
                     ok
             end
         end, Connections),
     ok.
 
+query_node_connections(Hostflag) ->
+    ets:lookup(?PROVIDER_NODE_LIST_TABLE, Hostflag).
+
 update_consumer_connections(Interface, Connections) ->
     lists:map(
         fun(Item) ->
-            HostFlag= Item#connection_info.host_flag,
-
-            case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#connection_info{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
-                '$end_of_table' ->
-                    I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item),
-                    logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
-                {_ObjectList,_Continuation} ->
-                    ok
-            end,
             I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}),
             logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
             ok
@@ -281,10 +286,10 @@ update_consumer_connections(Interface, Connections) ->
     ok.
 
 get_host_flag(ProviderConfig) ->
-    HostFlag = <<(list_to_binary(ProviderConfig#provider_config.host))/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>,
+    HostFlag = <<(ProviderConfig#provider_config.host)/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>,
     HostFlag.
 get_host_flag(Host, Port) ->
-    <<(list_to_binary(Host))/binary, <<"_">>/binary, (integer_to_binary(Port))/binary>>.
+    <<Host/binary, <<"_">>/binary, (integer_to_binary(Port))/binary>>.
 
 update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) ->
     lists:map(fun(Item) ->
@@ -311,17 +316,12 @@ get_interface_provider_node(Interface) ->
     end.
 
 select_connection(Interface) ->
-    RandNum = rand:uniform(2048),
-    select_connection(Interface, RandNum).
-select_connection(Interface, RandNum) ->
     case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
         [] ->
             {error, none};
         List ->
-            Len = length(List),
-            RemNum = (RandNum rem Len) + 1,
-            InterfaceListItem = lists:nth(RemNum, List),
-            {ok, InterfaceListItem#interface_list.connection_info}
+            Ret = [Item#interface_list.connection_info || Item <- List],
+            {ok, Ret}
     end.
 
 -spec(update_connection_readonly(pid(), boolean()) -> ok).
diff --git a/src/dubbo_provider_protocol.erl b/src/dubbo_provider_protocol.erl
index 11434b6..5615916 100644
--- a/src/dubbo_provider_protocol.erl
+++ b/src/dubbo_provider_protocol.erl
@@ -23,7 +23,7 @@
 
 
 %% API
--export([start_link/4, register_impl_provider/3, select_impl_provider/1]).
+-export([start_link/4, register_impl_provider/2, select_impl_provider/1]).
 
 %% gen_server callbacks
 -export([init/1,
@@ -58,10 +58,11 @@ start_link(Ref, Socket, Transport, Opts) ->
 %% we can use the -behaviour(gen_server) attribute.
 %init([]) -> {ok, undefined}.
 
-init({Ref, Socket, Transport, _Opts = []}) ->
-    logger:info("provider connected"),
+init({Ref, Socket, Transport, _Opts}) ->
+    {ok, {IP, Port}} = inet:peername(Socket),
+    logger:info("consumer ~p:~p connect the server", [IP, Port]),
     ok = ranch:accept_ack(Ref),
-%%    ok = Transport:setopts(Socket, [{active, once}]),
+
     ok = Transport:setopts(Socket, [{active, true}, {packet, 0}]),
     gen_server:enter_loop(?MODULE, [],
         #state{socket = Socket, transport = Transport},
@@ -114,8 +115,8 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-register_impl_provider(Interface, ImplModuleName, ModuleName) ->
-    ets:insert(?PROVIDER_IMPL_TABLE, {Interface, ImplModuleName, ModuleName}),
+register_impl_provider(Interface, ImplModuleName) ->
+    ets:insert(?PROVIDER_IMPL_TABLE, {Interface, ImplModuleName}),
     ok.
 
 -spec select_impl_provider(Interface :: binary()) -> {ok, binary()} | {error, term()}.
@@ -123,7 +124,7 @@ select_impl_provider(Interface) ->
     case ets:lookup(?PROVIDER_IMPL_TABLE, Interface) of
         [] ->
             {error, no_provider};
-        [{Interface, ImplModuleName, ModuleName}] ->
+        [{Interface, ImplModuleName}] ->
             {ok, ImplModuleName}
     end.
 
diff --git a/src/dubbo_reference_config.erl b/src/dubbo_reference_config.erl
index 7ab7f86..2465831 100644
--- a/src/dubbo_reference_config.erl
+++ b/src/dubbo_reference_config.erl
@@ -19,12 +19,12 @@
 -include("dubbo.hrl").
 -include("dubboerl.hrl").
 
--record(dubbo_interface_info,{}).
+-record(dubbo_interface_info, {}).
 
 %% API
 -export([init_reference/1]).
 
-init_reference(ConsumerInfo)->
+init_reference(ConsumerInfo) ->
 %%    InitConfigMap= #{
 %%
 %%    },
@@ -33,25 +33,23 @@ init_reference(ConsumerInfo)->
     ok.
 
 
-create_proxy(ConsumerInfo)->
-
-
+create_proxy(ConsumerInfo) ->
 
     Para = gen_parameter(ConsumerInfo),
     Url = gen_registry_url(Para),
-    dubbo_extension:run(protocol_wapper,refer,[Url]),
+    ok = dubbo_extension:run_fold(protocol_wapper, refer, [Url], ok),
     ok.
 
-    %%application=hello-world&dubbo=2.0.2&pid=68901&refer=application=hello-world&default.check=false&default.lazy=false&default.retries=0&default.sticky=false&default.timeout=300000&dubbo=2.0.2&interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&lazy=false&methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid=68901&register.ip=127.0.0.1&release=2.7.1&retries=0&side=consumer&sticky=false&timestamp=1559727789953&registry=zookeeper&release=2.7.1&timestamp=1559727842451
+%%application=hello-world&dubbo=2.0.2&pid=68901&refer=application=hello-world&default.check=false&default.lazy=false&default.retries=0&default.sticky=false&default.timeout=300000&dubbo=2.0.2&interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&lazy=false&methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid=68901&register.ip=127.0.0.1&release=2.7.1&retries=0&side=consumer&sticky=false&timestamp=1559727789953&registry=zookeeper&release=2.7.1&timestamp=1559727842451
 
 
-gen_registry_url(Para)->
+gen_registry_url(Para) ->
     %%todo 组装para & url
-    {Host,Port} = get_registry_host_port(),
+    {Host, Port} = dubbo_registry:get_registry_host_port(),
     UrlInfo = #dubbo_url{
         scheme = <<"registry">>,
         host = list_to_binary(Host),
-        port = integer_to_binary(Port),
+        port = Port,
         path = <<"org.apache.dubbo.registry.RegistryService">>,
         parameters = Para
     },
@@ -59,53 +57,45 @@ gen_registry_url(Para)->
 %%    Url = "registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hello-world&dubbo=2.0.2&pid=68901&refer=application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127.0.0.1% [...]
 %%    Url.
 
-get_registry_host_port()->
-    %% @todo need adapter other registry
-    RegistryList = application:get_env(dubboerl,zookeeper_list,[{"127.0.0.1",2181}]),
-    [Item|_] = RegistryList,
-    Item.
 
-gen_parameter(ConsumerInfo)->
+gen_parameter(ConsumerInfo) ->
     Para = #{
         <<"application">> => get_appname(ConsumerInfo),
         <<"dubbo">> => <<"2.0.2">>,
-        <<"pid">> => get_pid(),
+        <<"pid">> => list_to_binary(get_pid()),
         <<"refer">> => get_refinfo(ConsumerInfo),
-        <<"registry">> => get_registry_type(),
+        <<"registry">> => dubbo_registry:get_registry_type(),
         <<"release">> => <<"2.7.1">>,
         <<"timestamp">> => integer_to_binary(dubbo_time_util:timestamp_ms())
     },
-
     Para.
 
-get_appname(ConsumerInfo)->
+get_appname(ConsumerInfo) ->
     ConsumerInfo#consumer_config.application.
-get_pid()->
+get_pid() ->
     os:getpid().
-get_refinfo(ConsumerInfo)->
-    KeyValues=[
-        {"application",ConsumerInfo#consumer_config.application},
-        {"default.check",ConsumerInfo#consumer_config.check},
-        {"default.lazy","false"},
-        {"default.retries","0"},
-        {"default.sticky","false"},
-        {"default.timeout","300000"},
-        {"dubbo","2.0.2"},
-        {"interface",ConsumerInfo#consumer_config.interface},
-        {"lazy","false"},
-        {"methods",ConsumerInfo#consumer_config.methods},
-        {"register.ip",ConsumerInfo#consumer_config.application},
-        {"release","2.7.1"},
-        {"pid",get_pid()},
-        {"side","consumer"},
-        {"sticky","false"},
-        {"timestamp",dubbo_time_util:timestamp_ms()}
+get_refinfo(ConsumerInfo) ->
+    KeyValues = [
+        {"application", ConsumerInfo#consumer_config.application},
+        {"default.check", atom_to_list(ConsumerInfo#consumer_config.check)},
+        {"default.lazy", "false"},
+        {"default.retries", "0"},
+        {"default.sticky", "false"},
+        {"default.timeout", "300000"},
+        {"dubbo", "2.0.2"},
+        {"interface", ConsumerInfo#consumer_config.interface},
+        {"lazy", "false"},
+        {"methods", string:join(ConsumerInfo#consumer_config.methods, ",")},
+        {"register.ip", ConsumerInfo#consumer_config.application},
+        {"release", "2.7.1"},
+        {"pid", get_pid()},
+        {"side", "consumer"},
+        {"sticky", "false"},
+        {"timestamp", integer_to_list(dubbo_time_util:timestamp_ms())}
     ],
-    KeyValues2 = [io_lib:format("~s=~p", [Key, Value]) || {Key, Value} <= KeyValues],
+    KeyValues2 = [io_lib:format("~s=~s", [Key, Value]) || {Key, Value} <- KeyValues],
     ParameterStr1 = string:join(KeyValues2, "&"),
     list_to_binary(http_uri:encode(ParameterStr1)).
 %%    <<"application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127..0.1%26release%3D2.7.1%26retries%3D0%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1559727789953">>.
 
-get_registry_type()->
-    %%todo
-    atom_to_binary(application:get_env(dubboerl,registry,zookeeper)).
\ No newline at end of file
+
diff --git a/src/dubbo_registry.erl b/src/dubbo_registry.erl
index 89eb64b..01af02c 100644
--- a/src/dubbo_registry.erl
+++ b/src/dubbo_registry.erl
@@ -18,13 +18,13 @@
 -include("dubboerl.hrl").
 
 -callback start(Url :: binary) -> ok.
--callback register(Url::binary())-> term().
--callback subscribe(SubcribeUrl::binary(),NotifyFun::function())->ok.
+-callback register(Url :: binary()) -> term().
+-callback subscribe(SubcribeUrl :: binary(), NotifyFun :: function()) -> ok.
 
 %% API
--export([setup_register/1,register/2]).
+-export([setup_register/1, register/2, unregister/2, get_registry_host_port/0, get_registry_type/0, get_registry_module/1]).
 
--spec(setup_register(UrlInfo :: map()) -> {ok, RegistryProcessName :: atom()}|{error, term()}).
+-spec(setup_register(UrlInfo :: #dubbo_url{}) -> {ok, RegistryProcessName :: atom()}|{error, term()}).
 setup_register(UrlInfo) ->
     RegistryModuleName = get_registry_module(UrlInfo),
     case whereis(RegistryModuleName) of
@@ -35,13 +35,28 @@ setup_register(UrlInfo) ->
             {ok, RegistryModuleName}
     end.
 
-register(RegistryName,Url) ->
-    logger:info("call ~p register url ~p",[RegistryName,Url]),
-    Result = apply(RegistryName,register,[Url]),
+register(RegistryName, Url) ->
+    logger:info("call ~p register url ~p", [RegistryName, Url]),
+    Result = apply(RegistryName, register, [Url]),
+    Result.
+unregister(RegistryName, Url) ->
+    logger:info("call ~p unregister url ~p", [RegistryName, Url]),
+    Result = apply(RegistryName, unregister, [Url]),
     Result.
-
 
 get_registry_module(Info) ->
     RegistryName = Info#dubbo_url.scheme,
-    FullName = << <<"dubbo_registry_">>, RegistryName/binary>>,
-    binary_to_existing_atom(FullName).
\ No newline at end of file
+    FullName = <<<<"dubbo_registry_">>/binary, RegistryName/binary>>,
+    binary_to_existing_atom(FullName, latin1).
+
+
+
+get_registry_host_port() ->
+    %% @todo need adapter other registry
+    RegistryList = application:get_env(dubboerl, zookeeper_list, [{"127.0.0.1", 2181}]),
+    [Item | _] = RegistryList,
+    Item.
+
+get_registry_type() ->
+    %%todo
+    atom_to_binary(application:get_env(dubboerl, registry, zookeeper), utf8).
\ No newline at end of file
diff --git a/src/dubbo_registry_zookeeper.erl b/src/dubbo_registry_zookeeper.erl
index 5d14588..915bb71 100644
--- a/src/dubbo_registry_zookeeper.erl
+++ b/src/dubbo_registry_zookeeper.erl
@@ -21,9 +21,9 @@
 -include("dubbo.hrl").
 -include("dubboerl.hrl").
 %% API
--export([start_link/0, register_consumer/1, register_consumer/2, gen_consumer_node_info/1, register_provider/1, provider_watcher/1]).
+-export([start_link/0, register_provider/1, provider_watcher/1]).
 
--export([start/1,register/1,subscribe/2]).
+-export([start/1, register/1, unregister/1, subscribe/2]).
 %% gen_server callbacks
 -export([init/1,
     handle_call/3,
@@ -34,7 +34,7 @@
 
 -define(SERVER, ?MODULE).
 
--record(state, {zk_pid,notify_fun}).
+-record(state, {zk_pid, provider_notify_fun}).
 
 %%%===================================================================
 %%% API
@@ -88,16 +88,17 @@ init([]) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
     {stop, Reason :: term(), NewState :: #state{}}).
-
-handle_call({add_consumer, Interface,ConsumerUrl}, _From, State) ->
-    add_consumer(Interface,ConsumerUrl, State),
+handle_call({do_register, Url}, _From, State) ->
+    do_register(State#state.zk_pid, Url),
     {reply, ok, State};
-handle_call({add_provider, Provider}, _From, State) ->
-    register_provider_path(Provider, State),
+handle_call({do_unregister, Url}, _From, State) ->
+    do_unregister(State#state.zk_pid, Url),
     {reply, ok, State};
-handle_call({subscribe_provider,InterfaceName,NotifyFun}, _From, #state{zk_pid = ZkPid} = State) ->
-    NewState=State#state{notify_fun = NotifyFun},
-    get_provider_list(InterfaceName,ZkPid,NotifyFun),
+handle_call({subscribe_provider, InterfaceName, NotifyFun}, _From, #state{zk_pid = ZkPid} = State) ->
+    logger:debug("subscribe provider ~p notify fun ~p",[InterfaceName,NotifyFun]),
+    NewState = State#state{provider_notify_fun = NotifyFun},
+    List = get_provider_list(InterfaceName, ZkPid),
+    notify_provider_change(NotifyFun, InterfaceName, List),
     {reply, ok, NewState};
 
 handle_call(_Request, _From, State) ->
@@ -114,8 +115,9 @@ handle_call(_Request, _From, State) ->
     {noreply, NewState :: #state{}} |
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), NewState :: #state{}}).
-handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid} = State) ->
-    get_provider_and_start(Pid, Interface, Path),
+handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid, provider_notify_fun = NotifyFun} = State) ->
+    ProviderList = get_provider_and_start(Pid, Interface, Path),
+    notify_provider_change(NotifyFun, Interface, ProviderList),
     {noreply, State};
 handle_cast(_Request, State) ->
     {noreply, State}.
@@ -135,7 +137,6 @@ handle_cast(_Request, State) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), NewState :: #state{}}).
 handle_info(_Info, State) ->
-    logger:info("zk server recv msg:~p", [_Info]),
     {noreply, State}.
 
 %%--------------------------------------------------------------------
@@ -169,44 +170,84 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-
 %%----------------------------------------------
 %% dubbo_registry
 %%----------------------------------------------
 start(Url) ->
     ok.
-register(Url)->
-    {ok,UrlInfo} = dubbo_common_fun:parse_url(Url),
-    InterfaceName = maps:get(<<"interface">>,UrlInfo#dubbo_url.parameters),
-    register(UrlInfo#dubbo_url.scheme,InterfaceName,Url),
+%%register(Url) ->
+%%    {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
+%%    InterfaceName = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
+%%    register(UrlInfo#dubbo_url.scheme, InterfaceName, Url),
+%%    ok.
+
+%%register(<<"consumer">>, InterfaceName, Url) ->
+%%    gen_server:call(?SERVER, {add_consumer, InterfaceName, Url}),
+%%    ok.
+
+register(Url) ->
+    gen_server:call(?SERVER, {do_register, Url}, 10000),
     ok.
 
-register(<<"consumer">>,InterfaceName,Url)->
-    gen_server:call(?SERVER, {add_consumer,InterfaceName, Url}),
-    ok;
-register(<<"provider">>,InterfaceName,Url)->
-
+unregister(Url) ->
+    gen_server:call(?SERVER, {do_unregister, Url}, 10000),
     ok.
 
-subscribe(SubcribeUrl,NotifyFun)->
-    {ok,UrlInfo} = dubbo_common_fun:parse_url(SubcribeUrl),
-    InterfaceName = maps:get(<<"interface">>,UrlInfo#dubbo_url.parameters),
-    try gen_server:call(?SERVER,{subscribe_provider,InterfaceName,NotifyFun},5000) of
-        ok->
+do_register(Pid, Url) ->
+    case dubbo_common_fun:parse_url(Url) of
+        {ok, UrlInfo} ->
+            CreateNodeList = [{get_register_node(Item, UrlInfo), p} || Item <- [root, service, category]],
+            UrlNode = {list_to_binary(edoc_lib:escape_uri(binary_to_list(Url))), get_dynamic(UrlInfo)},
+            CreateNodeList2 = CreateNodeList ++ [UrlNode],
+            RetFullNode = check_and_create_path(Pid, <<"">>, CreateNodeList2),
+            {ok, RetFullNode};
+        Reason ->
+            logger:error("zk parse url fail reason ~p", [Reason]),
+            {error, Reason}
+    end.
+do_unregister(Pid, Url) ->
+    case dubbo_common_fun:parse_url(Url) of
+        {ok, UrlInfo} ->
+            CreateNodeList = [get_register_node(Item, UrlInfo) || Item <- [root, service, category]],
+            UrlNode = list_to_binary(edoc_lib:escape_uri(binary_to_list(Url))),
+            CreateNodeList2 = CreateNodeList ++ [UrlNode],
+            Path = dubbo_common_fun:binary_list_join(CreateNodeList2, <<"/">>),
+            FullPath = <<<<"/">>/binary, Path/binary>>,
+            del_path(Pid, FullPath);
+        Reason ->
+            logger:error("zk parse url fail reason ~p", [Reason]),
+            {error, Reason}
+    end.
+
+get_dynamic(UrlInfo) ->
+    case maps:get(<<"dynamic">>, UrlInfo#dubbo_url.parameters, <<"true">>) of
+        <<"true">> ->
+            e;
+        _ ->
+            p
+    end.
+
+get_register_node(root, _UrlInfo) ->
+    <<"dubbo">>;
+get_register_node(service, UrlInfo) ->
+    maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters);
+get_register_node(category, UrlInfo) ->
+    maps:get(<<"category">>, UrlInfo#dubbo_url.parameters, <<"providers">>).
+
+
+subscribe(SubcribeUrl, NotifyFun) ->
+    {ok, UrlInfo} = dubbo_common_fun:parse_url(SubcribeUrl),
+    InterfaceName = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
+    try gen_server:call(?SERVER, {subscribe_provider, InterfaceName, NotifyFun}, 5000) of
+        ok ->
             ok
     catch
-        Error:Reason->
+        _Error:Reason ->
             %%todo improve error type
-            {error,Reason}
+            {error, Reason}
     end.
 
-register_consumer(Consumer) ->
-    gen_server:call(?SERVER, {add_consumer, Consumer}),
-    ok.
-register_consumer(Name, Option) ->
-    Consumer = #consumer_config{interface = Name, methods = [<<"testa">>, <<"testb">>]},
-    register_consumer(Consumer),
-    ok.
+
 register_provider(Provider) ->
     gen_server:call(?SERVER, {add_provider, Provider}),
     ok.
@@ -222,9 +263,8 @@ connection() ->
         {monitor, self()}]),
     {ok, Pid}.
 
-add_consumer(InterfaceName,ConsumerUrl, State) ->
+add_consumer(InterfaceName, ConsumerUrl, State) ->
     Pid = State#state.zk_pid,
-%%    ConsumerNode = gen_consumer_node_info(Consumer),
     ConsumerNode2 = list_to_binary(edoc_lib:escape_uri(binary_to_list(ConsumerUrl))),
     check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {InterfaceName, p}, {<<"consumers">>, p}, {ConsumerNode2, e}]),
     %% todo
@@ -237,19 +277,21 @@ register_provider_path(Provider, State) ->
     ok.
 
 
-get_provider_list(InterfaceName,ZkPid,NotifyFun) ->
+get_provider_list(InterfaceName, ZkPid) ->
     InterfacePath = <<<<"/dubbo/">>/binary, InterfaceName/binary, <<"/providers">>/binary>>,
-    ChildList= get_provider_and_start(ZkPid, InterfaceName, InterfacePath),
-    NotifyFun(InterfaceName,ChildList),
-    ok.
+    ChildList = get_provider_and_start(ZkPid, InterfaceName, InterfacePath),
+    ChildList.
 get_provider_and_start(Pid, Interface, Path) ->
     case erlzk:get_children(Pid, Path, spawn(dubbo_registry_zookeeper, provider_watcher, [Interface])) of
         {ok, ChildList} ->
             logger:debug("get provider list ~p", [ChildList]),
-%%            start_provider_process(Interface, ChildList),
             ChildList;
+        {error, no_node} ->
+            logger:warning("interface ~p provide zk node unexist", [Interface]),
+            check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Interface, p}, {<<"providers">>, p}]),
+            get_provider_and_start(Pid, Interface, Path);
         {error, R1} ->
-            logger:debug("[add_consumer] get_provider_list error ~p ~p", [R1]),
+            logger:debug("[add_consumer] get_provider_list error ~p", [R1]),
             []
     end.
 
@@ -259,56 +301,61 @@ provider_watcher(Interface) ->
             gen_server:cast(?SERVER, {provider_node_change, Interface, Path}),
             logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]);
         {Event, Path} ->
-%%            Path = "/a",
-%%            Event = node_created
             logger:debug("provider_watcher get event ~p ~p", [Event, Path])
     end,
     ok.
 
+notify_provider_change(Fun, Interface, []) ->
+    UrlInfo = #dubbo_url{
+        scheme = <<"empty">>,
+        host = <<"127.0.0.1">>,
+        path = Interface,
+        port = 80,
+        parameters = #{
+            <<"interface">> => Interface
+        }
+    },
+    UrlInfoBin = dubbo_common_fun:url_to_binary(UrlInfo),
+    logger:debug("notify provider change fun ~p", [Fun]),
+    Fun(Interface, [UrlInfoBin]),
+    ok;
+notify_provider_change(Fun, Interface, List) ->
+    List2 = [http_uri:decode(Item) || Item <- List],
+    logger:debug("notify provider change fun ~p", [Fun]),
+    Fun(Interface, List2),
+    ok.
+
+del_path(Pid, Path) ->
+    case erlzk:delete(Pid, Path) of
+        ok ->
+            ok;
+        {error, Reason} ->
+            logger:warning("zookeeper registry del path error ~p path ~p", [Reason, Path]),
+            {error, Reason}
+    end.
 
 create_path(Pid, Path, CreateType) ->
     case erlzk:create(Pid, Path, CreateType) of
         {ok, ActualPath} ->
-            logger:debug("[add_consumer] create zk path  success ~p", [ActualPath]),
+            logger:debug("create zk path  success ~p", [ActualPath]),
             ok;
         {error, R1} ->
-            logger:debug("[add_consumer] create zk path error ~p ~p", [Path, R1])
+            logger:debug("create zk path error ~p ~p", [Path, R1])
     end,
     ok.
-check_and_create_path(_Pid, _RootPath, []) ->
-    ok;
+check_and_create_path(_Pid, RootPath, []) ->
+    RootPath;
 check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) ->
     CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>,
+
     case erlzk:exists(Pid, CheckPath) of
         {ok, Stat} ->
             check_and_create_path(Pid, CheckPath, Rst);
         {error, no_node} ->
-            logger:debug("[add_consumer] check_and_create_path unexist no_node ~p", [CheckPath]),
+            logger:debug("check_and_create_path unexist no_node ~p", [CheckPath]),
             create_path(Pid, CheckPath, CreateType),
             check_and_create_path(Pid, CheckPath, Rst);
         {error, R1} ->
-            logger:debug("[add_consumer] check_and_create_path unexist ~p", [R1]),
+            logger:debug("check_and_create_path unexist ~p", [R1]),
             check_and_create_path(Pid, CheckPath, Rst)
-    end.
-
-gen_consumer_node_info(Consumer) ->
-    %% revision参数字段的作用是什么? 暂时不添加
-    Methods = dubbo_lists_util:join(Consumer#consumer_config.methods, <<",">>),
-    Value = io_lib:format(<<"consumer://~s/~s?application=~s&category=~s&check=~p&default.timeout=~p&dubbo=~s&interface=~s&methods=~s&side=~s&timestamp=~p">>,
-        [dubbo_common_fun:local_ip_v4_str(),
-            Consumer#consumer_config.interface,
-            Consumer#consumer_config.application,
-            Consumer#consumer_config.category,
-            Consumer#consumer_config.check,
-            Consumer#consumer_config.default_timeout,
-            Consumer#consumer_config.dubbo_version,
-            Consumer#consumer_config.interface,
-            Methods,
-            Consumer#consumer_config.side,
-            dubbo_time_util:timestamp_ms()
-        ]),
-    list_to_binary(Value).
-
-%%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
-start_provider_process(Interface, ProviderList) ->
-    dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList).
\ No newline at end of file
+    end.
\ No newline at end of file
diff --git a/src/dubbo_service_config.erl b/src/dubbo_service_config.erl
new file mode 100644
index 0000000..2f595ba
--- /dev/null
+++ b/src/dubbo_service_config.erl
@@ -0,0 +1,110 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_service_config).
+
+-include("dubbo.hrl").
+-include("dubboerl.hrl").
+%% API
+-export([export/1]).
+
+-spec(export(#provider_config{}) -> ok).
+export(ProviderInfo) ->
+    logger:debug("will export provider info ~p", [ProviderInfo]),
+    do_export(ProviderInfo),
+    ok.
+
+do_export(ProviderInfo) ->
+    do_export_protocol(ProviderInfo),
+    ok.
+
+do_export_protocol(ProviderInfo) ->
+    Url = get_registry_url(ProviderInfo),
+    logger:debug("do export protocol for url ~p", [Url]),
+    Invoker = #invoker{url = Url, handler = ProviderInfo#provider_config.impl_handle},
+    dubbo_extension:run_fold(protocol_wapper, export, [Invoker], ok),
+    ok.
+
+
+
+
+get_registry_url(ProviderInfo) ->
+    {Host, Port} = dubbo_registry:get_registry_host_port(),
+    UrlInfo = #dubbo_url{
+        scheme = <<"registry">>,
+        host = list_to_binary(Host),
+        port = Port,
+        path = <<"org.apache.dubbo.registry.RegistryService">>,
+        parameters = gen_registry_parameter(ProviderInfo)
+    },
+    dubbo_common_fun:url_to_binary(UrlInfo).
+
+gen_registry_parameter(ProviderInfo) ->
+    Para = #{
+        <<"application">> => ProviderInfo#provider_config.application,
+        <<"dubbo">> => <<"2.0.2">>,
+        <<"pid">> => list_to_binary(os:getpid()),
+        <<"export">> => get_export_info(ProviderInfo),
+        <<"registry">> => dubbo_registry:get_registry_type(),
+        <<"release">> => <<"2.7.1">>,
+        <<"timestamp">> => integer_to_binary(dubbo_time_util:timestamp_ms())
+    },
+    Para.
+
+get_export_info(ProviderInfo) ->
+    %%dubbo://127.0.0.1:20880/org.apache.dubbo.erlang.sample.service.facade.UserOperator?
+    %% anyhost=true&
+    %% application=hello-world&
+    %% bean.name=org.apache.dubbo.erlang.sample.service.facade.UserOperator&
+    %% bind.ip=127.0.0.1&bind.port=20880&default.deprecated=false&
+    %% default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&
+    %% dynamic=false&generic=false&
+    %% interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&
+    %% methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid=90956&register=true&release=2.7.1&side=provider&timestamp=1562725983984
+    Para = [
+        {"anyhost", "true"},
+        {"application", ProviderInfo#provider_config.application},
+        {"bean.name", ProviderInfo#provider_config.interface},
+        {"bind.ip", dubbo_common_fun:local_ip_v4_str()},
+        {"bind.port", integer_to_list(ProviderInfo#provider_config.port)},
+        {"default.deprecated", "false"},
+        {"default.dynamic", "false"},
+        {"default.register", "true"},
+        {"deprecated", "false"},
+        {"dynamic", "false"},
+        {"generic", "false"},
+        {"interface", ProviderInfo#provider_config.interface},
+        {"methods", format_methods_str(ProviderInfo#provider_config.methods)},
+        {"pid", os:getpid()},
+        {"register", "true"},
+        {"release", "2.7.1"},
+        {"side", "provider"},
+        {"dubbo", "2.0.2"},
+        {"timestamp", integer_to_list(dubbo_time_util:timestamp_ms())}
+    ],
+    UrlInfo = #dubbo_url{
+        scheme = ProviderInfo#provider_config.protocol,
+        host = dubbo_common_fun:local_ip_v4_str(),
+        port = ProviderInfo#provider_config.port,
+        path = ProviderInfo#provider_config.interface,
+        parameters = Para
+    },
+    Url = dubbo_common_fun:url_to_binary(UrlInfo),
+    list_to_binary(http_uri:encode(binary_to_list(Url))).
+
+format_methods_str(Methods) ->
+    Methods2 = [binary_to_list(Item) || Item <- Methods],
+    string:join(Methods2, ",").
\ No newline at end of file
diff --git a/src/dubbo_zookeeper.erl b/src/dubbo_shutdown.erl
similarity index 50%
rename from src/dubbo_zookeeper.erl
rename to src/dubbo_shutdown.erl
index 84a95d6..856bd41 100644
--- a/src/dubbo_zookeeper.erl
+++ b/src/dubbo_shutdown.erl
@@ -14,12 +14,12 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(dubbo_zookeeper).
+-module(dubbo_shutdown).
+
 -behaviour(gen_server).
 
--include("dubbo.hrl").
 %% API
--export([start_link/0, register_consumer/1, register_consumer/2, gen_consumer_node_info/1, register_provider/1, provider_watcher/1]).
+-export([start_link/0]).
 
 %% gen_server callbacks
 -export([init/1,
@@ -31,7 +31,7 @@
 
 -define(SERVER, ?MODULE).
 
--record(state, {zk_pid}).
+-record(state, {}).
 
 %%%===================================================================
 %%% API
@@ -67,8 +67,9 @@ start_link() ->
     {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term()} | ignore).
 init([]) ->
-    {ok, Pid} = connection(),
-    {ok, #state{zk_pid = Pid}}.
+    process_flag(trap_exit, true),
+    io:format("start shutdown ser"),
+    {ok, #state{}}.
 
 %%--------------------------------------------------------------------
 %% @private
@@ -85,13 +86,6 @@ init([]) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
     {stop, Reason :: term(), NewState :: #state{}}).
-
-handle_call({add_consumer, Consumer}, _From, State) ->
-    add_consumer(Consumer, State),
-    {reply, ok, State};
-handle_call({add_provider, Provider}, _From, State) ->
-    register_provider_path(Provider, State),
-    {reply, ok, State};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
@@ -106,9 +100,6 @@ handle_call(_Request, _From, State) ->
     {noreply, NewState :: #state{}} |
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), NewState :: #state{}}).
-handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid} = State) ->
-    get_provider_and_start(Pid, Interface, Path),
-    {noreply, State};
 handle_cast(_Request, State) ->
     {noreply, State}.
 
@@ -127,7 +118,6 @@ handle_cast(_Request, State) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), NewState :: #state{}}).
 handle_info(_Info, State) ->
-    logger:info("zk server recv msg:~p", [_Info]),
     {noreply, State}.
 
 %%--------------------------------------------------------------------
@@ -144,6 +134,7 @@ handle_info(_Info, State) ->
 -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
     State :: #state{}) -> term()).
 terminate(_Reason, _State) ->
+    destroy(),
     ok.
 
 %%--------------------------------------------------------------------
@@ -160,117 +151,10 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-
-register_consumer(Consumer) ->
-    gen_server:call(?SERVER, {add_consumer, Consumer}),
-    ok.
-register_consumer(Name, Option) ->
-    Consumer = #consumer_config{interface = Name, methods = [<<"testa">>, <<"testb">>]},
-    register_consumer(Consumer),
-    ok.
-register_provider(Provider) ->
-    gen_server:call(?SERVER, {add_provider, Provider}),
-    ok.
-
 %%%===================================================================
 %%% Internal functions
 %%%===================================================================
-
-connection() ->
-    {ok, List} = application:get_env(dubboerl, zookeeper_list),
-    {ok, Pid} = erlzk:connect(List, 30000, [
-        {chroot, "/"},
-        {monitor, self()}]),
-    {ok, Pid}.
-
-add_consumer(Consumer, State) ->
-    Pid = State#state.zk_pid,
-%%    InterfacePath= << <<"/dubbo/">>/binary,Name/binary ,<<"consumers">>/binary >>,
-    ConsumerNode = gen_consumer_node_info(Consumer),
-    ConsumerNode2 = list_to_binary(edoc_lib:escape_uri(binary_to_list(ConsumerNode))),
-    check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Consumer#consumer_config.interface, p}, {<<"consumers">>, p}, {ConsumerNode2, e}]),
-    get_provider_list(Consumer, State),
-    ok.
-register_provider_path(Provider, State) ->
-    #state{zk_pid = Pid} = State,
-    ProviderNode = dubbo_node_config_util:gen_provider_info(Provider),
-    check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Provider#provider_config.interface, p}, {<<"providers">>, p}, {ProviderNode, e}]),
-    ok.
-
-
-get_provider_list(Consumer, State) ->
-    Pid = State#state.zk_pid,
-    InterfacePath = <<<<"/dubbo/">>/binary, (Consumer#consumer_config.interface)/binary, <<"/providers">>/binary>>,
-    get_provider_and_start(Pid, Consumer#consumer_config.interface, InterfacePath),
-    ok.
-get_provider_and_start(Pid, Interface, Path) ->
-    case erlzk:get_children(Pid, Path, spawn(dubbo_zookeeper, provider_watcher, [Interface])) of
-        {ok, ChildList} ->
-            logger:debug("get provider list ~p", [ChildList]),
-            start_provider_process(Interface, ChildList),
-            ok;
-        {error, R1} ->
-            logger:debug("[add_consumer] get_provider_list error ~p ~p", [R1]),
-            ok
-    end.
-
-provider_watcher(Interface) ->
-    receive
-        {node_children_changed, Path} ->
-            gen_server:cast(?SERVER, {provider_node_change, Interface, Path}),
-            logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]);
-        {Event, Path} ->
-%%            Path = "/a",
-%%            Event = node_created
-            logger:debug("provider_watcher get event ~p ~p", [Event, Path])
-    end,
-    ok.
-
-
-create_path(Pid, Path, CreateType) ->
-    case erlzk:create(Pid, Path, CreateType) of
-        {ok, ActualPath} ->
-            logger:debug("[add_consumer] create zk path  success ~p", [ActualPath]),
-            ok;
-        {error, R1} ->
-            logger:debug("[add_consumer] create zk path error ~p ~p", [Path, R1])
-    end,
-    ok.
-check_and_create_path(_Pid, _RootPath, []) ->
-    ok;
-check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) ->
-    CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>,
-    case erlzk:exists(Pid, CheckPath) of
-        {ok, Stat} ->
-            check_and_create_path(Pid, CheckPath, Rst);
-        {error, no_node} ->
-            logger:debug("[add_consumer] check_and_create_path unexist no_node ~p", [CheckPath]),
-            create_path(Pid, CheckPath, CreateType),
-            check_and_create_path(Pid, CheckPath, Rst);
-        {error, R1} ->
-            logger:debug("[add_consumer] check_and_create_path unexist ~p", [R1]),
-            check_and_create_path(Pid, CheckPath, Rst)
-    end.
-
-gen_consumer_node_info(Consumer) ->
-    %% revision参数字段的作用是什么? 暂时不添加
-    Methods = dubbo_lists_util:join(Consumer#consumer_config.methods, <<",">>),
-    Value = io_lib:format(<<"consumer://~s/~s?application=~s&category=~s&check=~p&default.timeout=~p&dubbo=~s&interface=~s&methods=~s&side=~s&timestamp=~p">>,
-        [dubbo_common_fun:local_ip_v4_str(),
-            Consumer#consumer_config.interface,
-            Consumer#consumer_config.application,
-            Consumer#consumer_config.category,
-            Consumer#consumer_config.check,
-            Consumer#consumer_config.default_timeout,
-            Consumer#consumer_config.dubbo_version,
-            Consumer#consumer_config.interface,
-            Methods,
-            Consumer#consumer_config.side,
-            dubbo_time_util:timestamp_ms()
-        ]),
-    list_to_binary(Value).
-
-%%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
-start_provider_process(Interface, ProviderList) ->
-    dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList).
-
+destroy() ->
+    logger:info("dubbo hook shutdown event"),
+    dubbo_extension:run(protocol_wapper, destroy, []),
+    ok.
\ No newline at end of file
diff --git a/src/dubbo_traffic_control.erl b/src/dubbo_traffic_control.erl
index 0a75aa7..853ce82 100644
--- a/src/dubbo_traffic_control.erl
+++ b/src/dubbo_traffic_control.erl
@@ -1,11 +1,19 @@
-%%%-------------------------------------------------------------------
-%%% @author dlive
-%%% @copyright (C) 2018, <COMPANY>
-%%% @doc
-%%%
-%%% @end
-%%% Created : 22. May 2018 1:58 PM
-%%%-------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
 -module(dubbo_traffic_control).
 -include("dubboerl.hrl").
 %% API
diff --git a/src/dubbo_type_transfer.erl b/src/dubbo_type_transfer.erl
index 827c3d2..3f1af96 100644
--- a/src/dubbo_type_transfer.erl
+++ b/src/dubbo_type_transfer.erl
@@ -1,11 +1,19 @@
-%%%-------------------------------------------------------------------
-%%% @author dlive
-%%% @copyright (C) 2016, <COMPANY>
-%%% @doc
-%%%
-%%% @end
-%%% Created : 27. 十月 2016 下午8:28
-%%%-------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
 -module(dubbo_type_transfer).
 -include("hessian.hrl").
 -include("dubbo.hrl").
@@ -42,7 +50,6 @@ java_to_native(#list{values = ForeignData} = Data, State) ->
     ForeignDataNew = [java_to_native(ValueItem, State) || ValueItem <- ForeignData],
     ForeignDataNew;
 java_to_native(Data, _) ->
-    logger:debug("java_to_native unkonw type ~p", [Data]),
     Data.
 
 get_deftype(ForeignType) ->
diff --git a/src/dubboerl.erl b/src/dubboerl.erl
index 03cc8a0..e3b71e4 100644
--- a/src/dubboerl.erl
+++ b/src/dubboerl.erl
@@ -33,7 +33,6 @@ start_consumer() ->
     ApplicationName = application:get_env(dubboerl, application, <<"defaultApplication">>),
     lists:map(fun({Interface, Option}) ->
         ConsumerInfo = dubbo_config_util:gen_consumer(ApplicationName, Interface, Option),
-%%        dubbo_zookeeper:register_consumer(ConsumerInfo),
         dubbo_reference_config:init_reference(ConsumerInfo),
         logger:info("consumer refer success ~p", [Interface])
               end, ConsumerList),
@@ -43,21 +42,15 @@ start_provider() ->
     ProviderList = application:get_env(dubboerl, provider, []),
     ApplicationName = application:get_env(dubboerl, application, <<"defaultApplication">>),
     DubboServerPort = application:get_env(dubboerl, port, ?DUBBO_DEFAULT_PORT),
-    start_provider_listen(DubboServerPort),
-    lists:map(fun({ImplModuleName, BehaviourModuleName, Interface, Option}) ->
-        ok = dubbo_provider_protocol:register_impl_provider(Interface, ImplModuleName, BehaviourModuleName),
-        MethodList = apply(BehaviourModuleName, get_method_999_list, []),
-        ProviderInfo = dubbo_config_util:gen_provider(ApplicationName, DubboServerPort, Interface, MethodList, Option),
-        dubbo_zookeeper:register_provider(ProviderInfo),
-        logger:info("register provider success ~p ~p", [ImplModuleName, Interface])
-              end, ProviderList),
-    ok.
 
-start_provider_listen(Port) ->
-    {ok, _} = ranch:start_listener(tcp_reverse,
-        ranch_tcp, [{port, Port}], dubbo_provider_protocol, []),
+    lists:map(
+        fun({ImplModuleName, BehaviourModuleName, Interface, Option}) ->
+            MethodList = apply(BehaviourModuleName, get_method_999_list, []),
+            ProviderInfo = dubbo_config_util:gen_provider(ApplicationName, DubboServerPort, Interface, MethodList, ImplModuleName, Option),
+            dubbo_service_config:export(ProviderInfo),
+            logger:info("register provider success ~p ~p", [ImplModuleName, Interface])
+        end, ProviderList),
     ok.
 
 
 
-
diff --git a/src/dubboerl_app.erl b/src/dubboerl_app.erl
index 0f51768..d27b690 100644
--- a/src/dubboerl_app.erl
+++ b/src/dubboerl_app.erl
@@ -29,9 +29,9 @@
 start(_StartType, _StartArgs) ->
     logger:info("[START] dubbo framework server start"),
     case dubboerl_sup:start_link() of
-        {ok,Pid} ->
+        {ok, Pid} ->
             init_default_hooks(),
-            {ok,Pid};
+            {ok, Pid};
         Result ->
             Result
     end.
@@ -43,13 +43,14 @@ stop(_State) ->
 %%====================================================================
 %% Internal functions
 %%====================================================================
-init_default_hooks()->
-    dubbo_extension:register(protocol,dubbo_protocol_dubbo,10),
-    dubbo_extension:register(protocol_wapper,dubbo_protocol_registry,10),
-
+init_default_hooks() ->
+    dubbo_extension:register(protocol, dubbo_protocol_dubbo, 10),
+    dubbo_extension:register(protocol_wapper, dubbo_protocol_registry, 10),
+    dubbo_extension:register(filter, application:get_env(dubboerl, cluster, dubbo_cluster_failfast), 1),
     ok.
 env_init() ->
     ets:new(?PROVIDER_IMPL_TABLE, [public, named_table]),
+    ets:new(?SERVICE_EXPORT_TABLE, [public, named_table]),
     dubbo_traffic_control:init(),
     dubbo_type_register:init(),
     register_type_list().
diff --git a/src/dubboerl_sup.erl b/src/dubboerl_sup.erl
index a0d2fb0..a7b2bac 100644
--- a/src/dubboerl_sup.erl
+++ b/src/dubboerl_sup.erl
@@ -41,20 +41,23 @@ start_link() ->
 %% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules}
 init([]) ->
     dubboerl_app:env_init(),
-    ZK = {dubbo_zookeeper, {dubbo_zookeeper, start_link, []}, transient, 5000, worker, [dubbo_zookeeper]},
-%%    NettySer = {dubbo_netty_client,{dubbo_netty_client, start_link, []},transient,5000,worker,[dubbo_netty_client]},
+    %% @todo registry need move registry sup
+    ZK = {dubbo_registry_zookeeper, {dubbo_registry_zookeeper, start_link, []}, transient, 5000, worker, [dubbo_registry_zookeeper]},
+
+    ExtensionSer = {dubbo_extension, {dubbo_extension, start_link, []}, transient, 5000, worker, [dubbo_extension]},
     Id_count = {dubbo_id_generator, {dubbo_id_generator, start_link, []}, transient, 5000, worker, [dubbo_id_generator]},
     ProviderPoolSup = {dubbo_provider_worker_sup, {dubbo_provider_worker_sup, start_link, []}, transient, 5000, supervisor, [dubbo_provider_worker_sup]},
     ConsumerPoolSup = {dubbo_transport_pool_sup, {dubbo_transport_pool_sup, start_link, []}, transient, 5000, supervisor, [dubbo_transport_pool_sup]},
     ConsumerPool = {dubbo_provider_consumer_reg_table, {dubbo_provider_consumer_reg_table, start_link, []}, transient, 5000, worker, [dubbo_provider_consumer_reg_table]},
-    ListNew1 =
-        case application:get_env(dubboerl, registry, false) of
-            true ->
-                [ZK];
-            false ->
-                []
-        end,
-    ListNew = ListNew1 ++ [Id_count, ConsumerPool, ConsumerPoolSup, ProviderPoolSup],
+    ShutdownSer = {dubbo_shutdown, {dubbo_shutdown, start_link, []}, transient, 10000, worker, [dubbo_shutdown]},
+%%    ListNew1 =
+%%        case application:get_env(dubboerl, registry, false) of
+%%            true ->
+%%                [ZK];
+%%            false ->
+%%                []
+%%        end,
+    ListNew = [Id_count, ExtensionSer, ZK, ConsumerPool, ConsumerPoolSup, ProviderPoolSup, ShutdownSer],
     {ok, {{one_for_one, 60, 10}, ListNew}}.
 
 %%====================================================================
diff --git a/test/consumer_SUITE.erl b/test/consumer_SUITE.erl
index ade6032..ef1fe19 100644
--- a/test/consumer_SUITE.erl
+++ b/test/consumer_SUITE.erl
@@ -141,14 +141,14 @@ lib_type_register(_Config) ->
     ok.
 
 json_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, json),
+    application:set_env(dubboerl, serialization, json),
     R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
     io:format(user, "json_sync_invoker result ~p ~n", [R1]),
     R2 = userOperator:genUserId(),
     io:format(user, "json_sync_invoker result2 ~p ~n", [R2]),
     ok.
 hessian_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, hessian),
+    application:set_env(dubboerl, serialization, hessian),
     R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
     io:format(user, "json_sync_invoker result ~p ~n", [R1]),
     R2 = userOperator:genUserId(),
diff --git a/test/dubbo_config_parser_tests.erl b/test/dubbo_config_parser_tests.erl
index 9ff0be4..060d4f5 100644
--- a/test/dubbo_config_parser_tests.erl
+++ b/test/dubbo_config_parser_tests.erl
@@ -19,7 +19,7 @@
 -include("dubbo.hrl").
 
 gen_provice_config_test() ->
-    ProviderConfigInfo = dubbo_config_util:gen_provider(<<"defaultApp">>, 20880, <<"org.apache.dubbo.test.interface">>, [method1], []),
+    ProviderConfigInfo = dubbo_config_util:gen_provider(<<"defaultApp">>, 20880, <<"org.apache.dubbo.test.interface">>, [method1],dubbo_service_user_impl, []),
     ProvideNode = dubbo_node_config_util:gen_provider_info(ProviderConfigInfo),
     ?assert(is_binary(ProvideNode)).
 
diff --git a/test/dubbo_consumer_pool_tests.erl b/test/dubbo_consumer_pool_tests.erl
index 0a67f1a..1ed5a6a 100644
--- a/test/dubbo_consumer_pool_tests.erl
+++ b/test/dubbo_consumer_pool_tests.erl
@@ -15,7 +15,6 @@
 %% limitations under the License.
 %%------------------------------------------------------------------------------
 -module(dubbo_consumer_pool_tests).
--author("dlive").
 
 -include_lib("eunit/include/eunit.hrl").
 -include("dubbo.hrl").
@@ -25,8 +24,8 @@ update_readonly_test() ->
     InterfaceName= <<"testinterfacename">>,
     HostFalg= <<"127.0.0.1/20880">>,
     ConnectionList = [
-        #connection_info{connection_id=1,pid= testpid,weight = 30,host_flag = HostFalg},
-        #connection_info{connection_id=2,pid= testpid2,weight = 30,host_flag = HostFalg}
+        #connection_info{pid= testpid,weight = 30,host_flag = HostFalg},
+        #connection_info{pid= testpid2,weight = 30,host_flag = HostFalg}
     ],
     dubbo_provider_consumer_reg_table:update_connection_info(InterfaceName,HostFalg,ConnectionList,true),
     {ok,Size} = dubbo_provider_consumer_reg_table:update_connection_readonly(testpid,false),
diff --git a/test/consumer_SUITE.erl b/test/dubbo_service_config_SUITE.erl
similarity index 62%
copy from test/consumer_SUITE.erl
copy to test/dubbo_service_config_SUITE.erl
index ade6032..41d8399 100644
--- a/test/consumer_SUITE.erl
+++ b/test/dubbo_service_config_SUITE.erl
@@ -14,46 +14,31 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(consumer_SUITE).
-%% API
--export([]).
+-module(dubbo_service_config_SUITE).
 
 -compile(export_all).
 
--include_lib("common_test/include/ct.hrl").
--include("dubbo_sample_service.hrl").
-%%--------------------------------------------------------------------
-%% Function: suite() -> Info
-%% Info = [tuple()]
-%%--------------------------------------------------------------------
 suite() ->
     [{timetrap, {seconds, 60}}].
 
-%%--------------------------------------------------------------------
-%% Function: init_per_suite(Config0) ->
-%%               Config1 | {skip,Reason} | {skip_and_save,Reason,Config1}
-%% Config0 = Config1 = [tuple()]
-%% Reason = term()
-%%--------------------------------------------------------------------
+
+
 init_per_suite(Config) ->
     logger:add_handler(testttt, logger_std_h, #{
-        level => debug
+        level => all
     }),
     Start = application:ensure_all_started(dubboerl),
-%%  dubboerl:init(),
-    dubboerl:start_provider(),
-    timer:sleep(2000),
-    dubboerl:start_consumer(),
-    dubbo_sample_service_app:register_type_list(),
-    timer:sleep(5000),
-    io:format(user, "test case start info ~p~n", [Start]),
-    [{appid, 1}].
+%%    dubboerl:init(),
+    io:format(user, "test case start dubboerl info ~p~n", [Start]),
+    [].
 
 %%--------------------------------------------------------------------
 %% Function: end_per_suite(Config0) -> term() | {save_config,Config1}
 %% Config0 = Config1 = [tuple()]
 %%--------------------------------------------------------------------
 end_per_suite(_Config) ->
+    application:stop(dubboerl),
+    timer:sleep(1000),
     ok.
 
 %%--------------------------------------------------------------------
@@ -109,7 +94,7 @@ end_per_testcase(_TestCase, _Config) ->
 %%--------------------------------------------------------------------
 groups() ->
     [
-        {consumer1, [sequence], [lib_type_register, json_sync_invoker, hessian_sync_invoker]}
+        {service_test, [sequence], [export_interface]}
     ].
 
 %%--------------------------------------------------------------------
@@ -120,37 +105,11 @@ groups() ->
 %% Reason = term()
 %%--------------------------------------------------------------------
 all() ->
-    [{group, consumer1}].
+    [{group, service_test}].
 
-%%--------------------------------------------------------------------
-%% Function: TestCase() -> Info
-%% Info = [tuple()]
-%%--------------------------------------------------------------------
-lib_type_register() ->
-    [].
-
-%%--------------------------------------------------------------------
-%% Function: TestCase(Config0) ->
-%%               ok | exit() | {skip,Reason} | {comment,Comment} |
-%%               {save_config,Config1} | {skip_and_save,Reason,Config1}
-%% Config0 = Config1 = [tuple()]
-%% Reason = term()
-%% Comment = term()
-%%--------------------------------------------------------------------
-lib_type_register(_Config) ->
-    ok.
 
-json_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, json),
-    R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
-    io:format(user, "json_sync_invoker result ~p ~n", [R1]),
-    R2 = userOperator:genUserId(),
-    io:format(user, "json_sync_invoker result2 ~p ~n", [R2]),
-    ok.
-hessian_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, hessian),
-    R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
-    io:format(user, "json_sync_invoker result ~p ~n", [R1]),
-    R2 = userOperator:genUserId(),
-    io:format(user, "json_sync_invoker result2 ~p ~n", [R2]),
+export_interface(_Config) ->
+    MethodList = apply(userOperator, get_method_999_list, []),
+    ProviderInfo = dubbo_config_util:gen_provider(<<"test-application">>, 20880, <<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>, MethodList, dubbo_service_user_impl, []),
+    dubbo_service_config:export(ProviderInfo),
     ok.
\ No newline at end of file
diff --git a/test/dubbo_zookeeper_tests.erl b/test/dubbo_zookeeper_tests.erl
deleted file mode 100644
index 499feb6..0000000
--- a/test/dubbo_zookeeper_tests.erl
+++ /dev/null
@@ -1,25 +0,0 @@
-%%------------------------------------------------------------------------------
-%% Licensed to the Apache Software Foundation (ASF) under one or more
-%% contributor license agreements.  See the NOTICE file distributed with
-%% this work for additional information regarding copyright ownership.
-%% The ASF licenses this file to You under the Apache License, Version 2.0
-%% (the "License"); you may not use this file except in compliance with
-%% the License.  You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%------------------------------------------------------------------------------
--module(dubbo_zookeeper_tests).
--include_lib("eunit/include/eunit.hrl").
--include("dubbo.hrl").
-exist_test() ->
-    Consumer = #consumer_config{interface = <<"com.ifcoder.demo.facade.User">>,
-        methods = [<<"a">>, <<"b">>]},
-    V = dubbo_zookeeper:gen_consumer_node_info(Consumer),
-    ?debugFmt("consumer info ~p", [V]),
-    ?assert(is_binary(V)).
diff --git a/test/consumer_SUITE.erl b/test/reference_config_SUITE.erl
similarity index 73%
copy from test/consumer_SUITE.erl
copy to test/reference_config_SUITE.erl
index ade6032..4005311 100644
--- a/test/consumer_SUITE.erl
+++ b/test/reference_config_SUITE.erl
@@ -14,10 +14,9 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(consumer_SUITE).
-%% API
--export([]).
+-module(reference_config_SUITE).
 
+%% API
 -compile(export_all).
 
 -include_lib("common_test/include/ct.hrl").
@@ -40,12 +39,7 @@ init_per_suite(Config) ->
         level => debug
     }),
     Start = application:ensure_all_started(dubboerl),
-%%  dubboerl:init(),
-    dubboerl:start_provider(),
-    timer:sleep(2000),
-    dubboerl:start_consumer(),
-    dubbo_sample_service_app:register_type_list(),
-    timer:sleep(5000),
+%%    dubboerl:init(),
     io:format(user, "test case start info ~p~n", [Start]),
     [{appid, 1}].
 
@@ -109,7 +103,7 @@ end_per_testcase(_TestCase, _Config) ->
 %%--------------------------------------------------------------------
 groups() ->
     [
-        {consumer1, [sequence], [lib_type_register, json_sync_invoker, hessian_sync_invoker]}
+        {reference_test, [sequence], [registry_interface]}
     ].
 
 %%--------------------------------------------------------------------
@@ -120,37 +114,11 @@ groups() ->
 %% Reason = term()
 %%--------------------------------------------------------------------
 all() ->
-    [{group, consumer1}].
-
-%%--------------------------------------------------------------------
-%% Function: TestCase() -> Info
-%% Info = [tuple()]
-%%--------------------------------------------------------------------
-lib_type_register() ->
-    [].
+    [{group, reference_test}].
 
-%%--------------------------------------------------------------------
-%% Function: TestCase(Config0) ->
-%%               ok | exit() | {skip,Reason} | {comment,Comment} |
-%%               {save_config,Config1} | {skip_and_save,Reason,Config1}
-%% Config0 = Config1 = [tuple()]
-%% Reason = term()
-%% Comment = term()
-%%--------------------------------------------------------------------
-lib_type_register(_Config) ->
-    ok.
 
-json_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, json),
-    R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
-    io:format(user, "json_sync_invoker result ~p ~n", [R1]),
-    R2 = userOperator:genUserId(),
-    io:format(user, "json_sync_invoker result2 ~p ~n", [R2]),
-    ok.
-hessian_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, hessian),
-    R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
-    io:format(user, "json_sync_invoker result ~p ~n", [R1]),
-    R2 = userOperator:genUserId(),
-    io:format(user, "json_sync_invoker result2 ~p ~n", [R2]),
+registry_interface(_Config) ->
+    ConsumerInfo = dubbo_config_util:gen_consumer("test_app", <<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>, []),
+%%        dubbo_zookeeper:register_consumer(ConsumerInfo),
+    dubbo_reference_config:init_reference(ConsumerInfo),
     ok.
\ No newline at end of file
diff --git a/test/userOperator.erl b/test/userOperator.erl
index e8567ae..0d35917 100644
--- a/test/userOperator.erl
+++ b/test/userOperator.erl
@@ -67,7 +67,7 @@ getUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec genUserId()->
@@ -96,7 +96,7 @@ genUserId( RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserInfo(Arg0::#userInfoRequest{})->
@@ -127,7 +127,7 @@ queryUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserList(Arg0::list())->
@@ -158,7 +158,7 @@ queryUserList(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 test() ->
     queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}).
\ No newline at end of file


[dubbo-erlang] 07/09: redesign: invoker process

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit a5a0cde2469b17b3fc2e710de8c395ac97b8d2c1
Author: DLive <xs...@163.com>
AuthorDate: Wed Jun 26 18:05:16 2019 +0800

    redesign: invoker process
---
 src/dubbo_invoker.erl | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 src/dubboerl_app.erl  | 16 ++++++++--
 2 files changed, 95 insertions(+), 4 deletions(-)

diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker.erl
index 0a3527b..f01cec4 100644
--- a/src/dubbo_invoker.erl
+++ b/src/dubbo_invoker.erl
@@ -16,8 +16,89 @@
 %%------------------------------------------------------------------------------
 -module(dubbo_invoker).
 
+-include("dubbo.hrl").
 %% API
 -export([]).
 
 
--callback(invoke(Invoker,Invocation) -> ok).
\ No newline at end of file
+-callback(invoke(Invoker,Invocation) -> ok).
+
+
+%% API
+-export([invoke_request/2, invoke_request/3, invoke_request/5]).
+
+-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) ->
+    {ok, reference()}|
+    {ok, reference(), Data :: any(), RpcContent :: list()}|
+    {error, Reason :: timeout|no_provider|any()}.
+invoke_request(Interface, Request) ->
+    invoke_request(Interface, Request, [], #{}, self()).
+
+-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RequestOption :: map()) ->
+    {ok, reference()}|
+    {ok, reference(), Data :: any(), RpcContent :: list()}|
+    {error, Reason :: timeout|no_provider|any()}.
+invoke_request(Interface, Request, RequestOption) ->
+    invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
+
+
+-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
+    {ok, reference()}|
+    {ok, reference(), Data :: any(), RpcContent :: list()}|
+    {error, Reason :: timeout|no_provider|request_full|any()}.
+invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
+    case dubbo_provider_consumer_reg_table:select_connection(Interface, Request#dubbo_request.mid) of
+        {ok, #connection_info{pid = Pid, host_flag = HostFlag}} ->
+            case dubbo_traffic_control:check_goon(HostFlag, 199) of
+                ok ->
+                    Request2 = merge_attachments(Request, RpcContext),
+                    {ok, RequestData} = dubbo_codec:encode_request(Request2),
+                    Ref = get_ref(RequestState),
+                    gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}),
+                    case is_sync(RequestState) of
+                        true ->
+                            sync_receive(Ref, get_timeout(RequestState));
+                        false -> {ok, Ref}
+                    end;
+                full ->
+                    {error, request_full}
+            end;
+        {error, none} ->
+            logger:error("[INVOKE] ~p error Reason no_provider", [Interface]),
+            {error, no_provider}
+    end.
+
+
+is_sync(Option) ->
+    maps:is_key(sync, Option).
+get_ref(Option) ->
+    maps:get(ref, Option, make_ref()).
+
+get_timeout(Option) ->
+    maps:get(timeout, Option, ?REQUEST_TIME_OUT).
+
+
+sync_receive(Ref, TimeOut) ->
+    receive
+        {'$gen_cast', {response_process, Ref, RpcContent, Response}} ->
+            {ok, Ref, Response, RpcContent}
+    after
+        TimeOut ->
+            {error, timeout}
+    end.
+merge_attachments(#dubbo_request{data = null} = Request, _Option) ->
+    Request;
+merge_attachments(Request, Option) ->
+    Attachements = Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
+    case lists:keyfind(attachments, 1, Option) of
+        false -> OptionAttachments = [];
+        {attachments, OptionAttachments} ->
+            OptionAttachments
+    end,
+    List = [
+        {<<"version">>, <<"0.0.0">>},
+        {<<"timeout">>, <<"5000">>}
+    ],
+    Attachements2 = lists:merge3(Attachements, OptionAttachments, List),
+    Data2 = Request#dubbo_request.data#dubbo_rpc_invocation{attachments = Attachements2},
+    Request#dubbo_request{data = Data2}.
diff --git a/src/dubboerl_app.erl b/src/dubboerl_app.erl
index 223e842..0f51768 100644
--- a/src/dubboerl_app.erl
+++ b/src/dubboerl_app.erl
@@ -27,9 +27,14 @@
 %%====================================================================
 
 start(_StartType, _StartArgs) ->
-    io:format("[START] dubbo framework server start~n"),
-%%    env_init(),
-    dubboerl_sup:start_link().
+    logger:info("[START] dubbo framework server start"),
+    case dubboerl_sup:start_link() of
+        {ok,Pid} ->
+            init_default_hooks(),
+            {ok,Pid};
+        Result ->
+            Result
+    end.
 
 %%--------------------------------------------------------------------
 stop(_State) ->
@@ -38,6 +43,11 @@ stop(_State) ->
 %%====================================================================
 %% Internal functions
 %%====================================================================
+init_default_hooks()->
+    dubbo_extension:register(protocol,dubbo_protocol_dubbo,10),
+    dubbo_extension:register(protocol_wapper,dubbo_protocol_registry,10),
+
+    ok.
 env_init() ->
     ets:new(?PROVIDER_IMPL_TABLE, [public, named_table]),
     dubbo_traffic_control:init(),


[dubbo-erlang] 04/09: dev client pool

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit 1f64b4b0ca14f3178e1b590b1dd6611cf92f81ab
Author: DLive <xs...@163.com>
AuthorDate: Fri Jun 21 10:28:25 2019 +0800

    dev client pool
---
 include/dubbo.hrl                                  |   2 +-
 ...o_netty_client.erl => dubbo_client_default.erl} |  21 +-
 src/dubbo_consumer_pool.erl                        | 305 --------------------
 .../dubbo_exchanger.erl                            |  35 ++-
 src/dubbo_invoker_old.erl                          |   2 +-
 src/dubbo_netty_client.erl                         |   2 +-
 src/dubbo_protocol_dubbo.erl                       |  54 +++-
 src/dubbo_provider_consumer_reg_table.erl          | 320 ++++++++++++++++++++-
 src/dubbo_registry_zookeeper.erl                   |   2 +-
 ...r_pool_sup.erl => dubbo_transport_pool_sup.erl} |  13 +-
 src/dubbo_zookeeper.erl                            |   2 +-
 src/dubboerl_sup.erl                               |   4 +-
 test/dubbo_consumer_pool_tests.erl                 |   6 +-
 13 files changed, 415 insertions(+), 353 deletions(-)

diff --git a/include/dubbo.hrl b/include/dubbo.hrl
index ad2277a..727c71e 100644
--- a/include/dubbo.hrl
+++ b/include/dubbo.hrl
@@ -102,7 +102,7 @@
 
 
 -record(interface_list, {interface, pid, connection_info}).
--record(provider_node_list, {host_flag, connection_info}).
+-record(provider_node_list, {host_flag, pid, weight, readonly = false}).
 -record(connection_info, {connection_id, pid, weight, host_flag, readonly = false}).
 
 -type dubbo_request() :: #dubbo_request{}.
diff --git a/src/dubbo_netty_client.erl b/src/dubbo_client_default.erl
similarity index 97%
copy from src/dubbo_netty_client.erl
copy to src/dubbo_client_default.erl
index 0181d33..220c30f 100644
--- a/src/dubbo_netty_client.erl
+++ b/src/dubbo_client_default.erl
@@ -14,13 +14,12 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(dubbo_netty_client).
+-module(dubbo_client_default).
 
 -behaviour(gen_server).
 
+
 -include("dubbo.hrl").
-%% API
--export([start_link/4]).
 
 %% gen_server callbacks
 -export([init/1,
@@ -29,6 +28,8 @@
     handle_info/2,
     terminate/2,
     code_change/3]).
+-export([start_link/1]).
+
 -export([check_recv_data/2]).
 
 -define(SERVER, ?MODULE).
@@ -38,7 +39,8 @@
     heartbeat = #heartbeat{},
     recv_buffer = <<>>,         %%从服务端接收的数据
     host_flag,
-    reconnection_timer
+    reconnection_timer,
+    handler
 }).
 
 %%%===================================================================
@@ -51,10 +53,10 @@
 %%
 %% @end
 %%--------------------------------------------------------------------
--spec(start_link(Name :: binary(), HostFlag :: binary(), ProviderConfig :: #provider_config{}, integer()) ->
+-spec(start_link(Name :: binary(), ProviderConfig :: #provider_config{}) ->
     {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
-start_link(Name, HostFlag, ProviderConfig, Index) ->
-    gen_server:start_link({local, Name}, ?MODULE, [HostFlag, ProviderConfig, Index], []).
+start_link(ProviderConfig) ->
+    gen_server:start_link(?MODULE, [ProviderConfig], []).
 
 %%%===================================================================
 %%% gen_server callbacks
@@ -74,8 +76,7 @@ start_link(Name, HostFlag, ProviderConfig, Index) ->
 -spec(init(Args :: term()) ->
     {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term()} | ignore).
-init([HostFlag, ProviderConfig, Index]) ->
-    erlang:process_flag(min_bin_vheap_size, 1024 * 1024),
+init([HostFlag, ProviderConfig]) ->
     #provider_config{host = Host, port = Port} = ProviderConfig,
     State = case open(Host, Port) of
                 {ok, Socket} ->
@@ -415,7 +416,7 @@ process_response(true, _ResponseInfo, _RestData, State) ->
     {ok, State}.
 
 process_request(true, #dubbo_request{data = <<"R">>}, State) ->
-    {ok, _} = dubbo_consumer_pool:update_connection_readonly(self(), true),
+    {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
     {ok, State};
 process_request(true, Request, State) ->
     {ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State),
diff --git a/src/dubbo_consumer_pool.erl b/src/dubbo_consumer_pool.erl
deleted file mode 100644
index 0a01d38..0000000
--- a/src/dubbo_consumer_pool.erl
+++ /dev/null
@@ -1,305 +0,0 @@
-%%------------------------------------------------------------------------------
-%% Licensed to the Apache Software Foundation (ASF) under one or more
-%% contributor license agreements.  See the NOTICE file distributed with
-%% this work for additional information regarding copyright ownership.
-%% The ASF licenses this file to You under the Apache License, Version 2.0
-%% (the "License"); you may not use this file except in compliance with
-%% the License.  You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%------------------------------------------------------------------------------
--module(dubbo_consumer_pool).
-
--behaviour(gen_server).
-
-%% API
--export([start_link/0, start_consumer/2]).
-
-%% gen_server callbacks
--export([init/1,
-    handle_call/3,
-    handle_cast/2,
-    handle_info/2,
-    terminate/2,
-    code_change/3]).
-
--export([select_connection/1, select_connection/2, update_connection_readonly/2]).
-
--include("dubbo.hrl").
--define(SERVER, ?MODULE).
-
--define(INTERFCE_LIST_TABLE, interface_list).
--define(PROVIDER_NODE_LIST_TABLE, provider_node_list).
-
--record(state, {}).
-
--ifdef(TEST).
--compile([export_all]).
--endif.
-
-
-%%%===================================================================
-%%% API
-%%%===================================================================
-
-%%--------------------------------------------------------------------
-%% @doc
-%% Starts the server
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(start_link() ->
-    {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
-start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-%%%===================================================================
-%%% gen_server callbacks
-%%%===================================================================
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Initializes the server
-%%
-%% @spec init(Args) -> {ok, State} |
-%%                     {ok, State, Timeout} |
-%%                     ignore |
-%%                     {stop, Reason}
-%% @end
-%%--------------------------------------------------------------------
--spec(init(Args :: term()) ->
-    {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term()} | ignore).
-init([]) ->
-    init_ets_table(),
-    {ok, #state{}}.
-init_ets_table() ->
-    try ets:new(?INTERFCE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
-        ?INTERFCE_LIST_TABLE ->
-            ok
-    catch
-        _Type:Reason ->
-            logger:error("new ets table error ~p", [Reason]),
-            error
-    end,
-    try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
-        ?PROVIDER_NODE_LIST_TABLE ->
-            ok
-    catch
-        _Type1:Reason1 ->
-            logger:error("new ets table error ~p", [Reason1]),
-            error
-    end,
-    ok.
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling call messages
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
-    State :: #state{}) ->
-    {reply, Reply :: term(), NewState :: #state{}} |
-    {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
-    {noreply, NewState :: #state{}} |
-    {noreply, NewState :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
-    {stop, Reason :: term(), NewState :: #state{}}).
-
-handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) ->
-
-    OldProviderList = get_interface_provider_node(Interface),
-    NewProviderList = add_consumer(ProviderNodeList, []),
-    DeleteProverList = OldProviderList -- NewProviderList,
-    clean_invalid_provider(DeleteProverList),
-    {reply, ok, State};
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling cast messages
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_cast(Request :: term(), State :: #state{}) ->
-    {noreply, NewState :: #state{}} |
-    {noreply, NewState :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term(), NewState :: #state{}}).
-handle_cast(_Request, State) ->
-    {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling all non call/cast messages
-%%
-%% @spec handle_info(Info, State) -> {noreply, State} |
-%%                                   {noreply, State, Timeout} |
-%%                                   {stop, Reason, State}
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
-    {noreply, NewState :: #state{}} |
-    {noreply, NewState :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term(), NewState :: #state{}}).
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any
-%% necessary cleaning up. When it returns, the gen_server terminates
-%% with Reason. The return value is ignored.
-%%
-%% @spec terminate(Reason, State) -> void()
-%% @end
-%%--------------------------------------------------------------------
--spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
-    State :: #state{}) -> term()).
-terminate(_Reason, _State) ->
-    ok.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Convert process state when code is changed
-%%
-%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% @end
-%%--------------------------------------------------------------------
--spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
-    Extra :: term()) ->
-    {ok, NewState :: #state{}} | {error, Reason :: term()}).
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-start_consumer(Interface, ProviderNodeInfo) ->
-    gen_server:call(?SERVER, {add_consumer, Interface, ProviderNodeInfo}).
-
-
-%%%===================================================================
-%%% Internal functions
-%%%===================================================================
-add_consumer([], RegisterList) ->
-    RegisterList;
-add_consumer([ProviderNodeInfo | ProviderList], RegisterList) ->
-    case dubbo_node_config_util:parse_provider_info(ProviderNodeInfo) of
-        {ok, ProviderConfig} ->
-            HostFlag = get_host_flag(ProviderConfig),
-            case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
-                [] ->
-                    ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
-                    ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),
-                    ok;
-                List ->
-                    List2 = lists:map(fun(#provider_node_list{connection_info = ConnectionItem}) ->
-                        ConnectionItem
-                                      end, List),
-                    ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, List2, false),
-                    ok
-            end,
-            add_consumer(ProviderList, [HostFlag] ++ RegisterList);
-        {error, R1} ->
-            logger:error("parse provider info error reason ~p", [R1]),
-            add_consumer(ProviderList, RegisterList)
-    end.
-
-start_provider_process(HostFlag, Weight, ProviderConfig) ->
-    ExecutesList = lists:seq(1, ProviderConfig#provider_config.executes),
-    ConnectionList = lists:map(fun(Item) ->
-        ConnectionFlag = <<HostFlag/binary, (integer_to_binary(Item))/binary>>,
-        ConnectionFlagTerm = binary_to_atom(ConnectionFlag, utf8),
-        AChild = {ConnectionFlagTerm, {dubbo_netty_client, start_link, [ConnectionFlagTerm, HostFlag, ProviderConfig, Item]}, permanent, 2000, worker, [dubbo_netty_client]},
-        {ok, Pid} = dubbo_consumer_pool_sup:add_children(AChild),
-        logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
-        #connection_info{connection_id = ConnectionFlagTerm, pid = Pid, weight = Weight, host_flag = HostFlag}
-                               end, ExecutesList),
-    ConnectionList.
-get_host_flag(ProviderConfig) ->
-    HostFlag = <<(list_to_binary(ProviderConfig#provider_config.host))/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>,
-    HostFlag.
-
-update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) ->
-    lists:map(fun(Item) ->
-        I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}),
-        logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
-        case IsUpdateProvideNode of
-            true ->
-                I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag, connection_info = Item}),
-                logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
-            false ->
-                ok
-        end,
-        ok
-              end, ConnectionList),
-    ok.
-
-get_interface_provider_node(Interface) ->
-    case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
-        [] ->
-            [];
-        List ->
-            ListRet = [Item#interface_list.connection_info#connection_info.host_flag || Item <- List],
-            dubbo_lists_util:del_duplicate(ListRet)
-    end.
-
-select_connection(Interface) ->
-    RandNum = rand:uniform(2048),
-    select_connection(Interface, RandNum).
-select_connection(Interface, RandNum) ->
-    case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
-        [] ->
-            {error, none};
-        List ->
-            Len = length(List),
-            RemNum = (RandNum rem Len) + 1,
-            InterfaceListItem = lists:nth(RemNum, List),
-            {ok, InterfaceListItem#interface_list.connection_info}
-    end.
-
--spec(update_connection_readonly(pid(), boolean()) -> ok).
-update_connection_readonly(ConnectionPid, Readonly) ->
-    Pattern = #interface_list{pid = ConnectionPid, _ = '_'},
-    Objects = ets:match_object(?INTERFCE_LIST_TABLE, Pattern),
-    lists:map(fun(#interface_list{interface = Interface, pid = Pid, connection_info = ConnectionInfo} = InterferConnection) ->
-        logger:debug("[dubbo] update interface ~p ~p readonly", [Interface, Pid]),
-        NewConnectionInfo = ConnectionInfo#connection_info{readonly = Readonly},
-        NewObject = InterferConnection#interface_list{connection_info = NewConnectionInfo},
-        ets:delete_object(?INTERFCE_LIST_TABLE, InterferConnection),
-        ets:insert(?INTERFCE_LIST_TABLE, NewObject)
-              end, Objects),
-    {ok, length(Objects)}.
-
-clean_invalid_provider([]) ->
-    ok;
-clean_invalid_provider([HostFlag | DeleteProverList]) ->
-    case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
-        [] ->
-            ok;
-        ProviderNodeList ->
-            ProviderNodeList1 = dubbo_lists_util:del_duplicate(ProviderNodeList),
-            clean_connection_info(ProviderNodeList1)
-    end,
-    clean_invalid_provider(DeleteProverList).
-
-clean_connection_info(ProviderNodeList) ->
-    lists:map(fun(Item) ->
-        Pid = Item#provider_node_list.connection_info#connection_info.pid,
-        ConnectionId = Item#provider_node_list.connection_info#connection_info.connection_id,
-        Pattern = #interface_list{pid = Pid, _ = '_'},
-        ets:delete_object(?INTERFCE_LIST_TABLE, Pattern),
-        dubbo_consumer_pool_sup:stop_children(ConnectionId)
-              end, ProviderNodeList),
-    ok.
\ No newline at end of file
diff --git a/test/dubbo_consumer_pool_tests.erl b/src/dubbo_exchanger.erl
similarity index 57%
copy from test/dubbo_consumer_pool_tests.erl
copy to src/dubbo_exchanger.erl
index 740ed84..09a4833 100644
--- a/test/dubbo_consumer_pool_tests.erl
+++ b/src/dubbo_exchanger.erl
@@ -14,20 +14,27 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(dubbo_consumer_pool_tests).
--author("dlive").
+-module(dubbo_exchanger).
 
--include_lib("eunit/include/eunit.hrl").
 -include("dubbo.hrl").
 
-update_readonly_test() ->
-    dubbo_consumer_pool:start_link(),
-    InterfaceName= <<"testinterfacename">>,
-    HostFalg= <<"127.0.0.1/20880">>,
-    ConnectionList = [
-        #connection_info{connection_id=1,pid= testpid,weight = 30,host_flag = HostFalg},
-        #connection_info{connection_id=2,pid= testpid2,weight = 30,host_flag = HostFalg}
-    ],
-    dubbo_consumer_pool:update_connection_info(InterfaceName,HostFalg,ConnectionList,true),
-    {ok,Size} = dubbo_consumer_pool:update_connection_readonly(testpid,false),
-    ?assertEqual(1,Size).
+%% API
+-export([connect/2]).
+
+connect(Url,Handler) ->
+    case dubbo_node_config_util:parse_provider_info(Url) of
+        {ok, ProviderConfig} ->
+            HostFlag= dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
+            {ok, Pid} = dubbo_transport_pool_sup:add_children(ProviderConfig,Handler),
+            logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
+            {ok,#connection_info{ pid = Pid, weight = get_weight(ProviderConfig), host_flag = HostFlag}};
+        {error, R1} ->
+            logger:error("parse provider info error reason ~p", [R1]),
+            {error,R1}
+    end.
+
+
+
+get_weight(_ProviderConfig)->
+    %% todo get weight from provider info
+    30.
\ No newline at end of file
diff --git a/src/dubbo_invoker_old.erl b/src/dubbo_invoker_old.erl
index c878656..354cef9 100644
--- a/src/dubbo_invoker_old.erl
+++ b/src/dubbo_invoker_old.erl
@@ -40,7 +40,7 @@ invoke_request(Interface, Request, RequestOption) ->
     {ok, reference(), Data :: any(), RpcContent :: list()}|
     {error, Reason :: timeout|no_provider|request_full|any()}.
 invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
-    case dubbo_consumer_pool:select_connection(Interface, Request#dubbo_request.mid) of
+    case dubbo_provider_consumer_reg_table:select_connection(Interface, Request#dubbo_request.mid) of
         {ok, #connection_info{pid = Pid, host_flag = HostFlag}} ->
             case dubbo_traffic_control:check_goon(HostFlag, 199) of
                 ok ->
diff --git a/src/dubbo_netty_client.erl b/src/dubbo_netty_client.erl
index 0181d33..06f9d1e 100644
--- a/src/dubbo_netty_client.erl
+++ b/src/dubbo_netty_client.erl
@@ -415,7 +415,7 @@ process_response(true, _ResponseInfo, _RestData, State) ->
     {ok, State}.
 
 process_request(true, #dubbo_request{data = <<"R">>}, State) ->
-    {ok, _} = dubbo_consumer_pool:update_connection_readonly(self(), true),
+    {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
     {ok, State};
 process_request(true, Request, State) ->
     {ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State),
diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl
index 06c36e6..1ede1d8 100644
--- a/src/dubbo_protocol_dubbo.erl
+++ b/src/dubbo_protocol_dubbo.erl
@@ -17,19 +17,61 @@
 -module(dubbo_protocol_dubbo).
 
 -include("dubboerl.hrl").
+-include("dubbo.hrl").
 
 %% API
 -export([refer/2]).
 
-refer(Url,Acc)->
-    {ok,UrlInfo} = dubbo_common_fun:parse_url(Url),
+refer(Url, Acc) ->
+    {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
     case UrlInfo#dubbo_url.scheme of
         <<"dubbo">> ->
-            {ok,todo};
+            do_refer(UrlInfo),
+            {ok, todo};
         _ ->
-            {skip,Acc}
+            {skip, Acc}
     end.
 
-do_refer(UrlInfo)->
+do_refer(UrlInfo) ->
+
+    ok.
+
+
+getClients(ProviderUrl) ->
+    case new_transport(ProviderUrl) of
+        {ok,ConnectionInfoList} ->
+            ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
+            ok;
+        {error,Reason} ->
+            {error,Reason}
+    end.
+
+
+
+%%ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),
+
+
+new_transport(ProviderUrl)->
+    case dubbo_node_config_util:parse_provider_info(ProviderUrl) of
+        {ok, ProviderConfig} ->
+            HostFlag = get_host_flag(ProviderConfig),
+            case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of
+                [] ->
+                    case dubbo_exchanger:connect(ProviderUrl,?MODULE) of
+                        {ok,ConnectionInfo} ->
+                            {ok,[ConnectionInfo]};
+                        {error,Reason} ->
+                            logger:warning("start client fail ~p ~p",[Reason,HostFlag]),
+                            {error,Reason}
+                    end;
+                ConnectionInfoList ->
+                    {ok,ConnectionInfoList}
+            end;
+        {error, R1} ->
+            logger:error("parse provider info error reason ~p", [R1]),
+            {error,R1}
+    end.
+
+
+
 
-    ok.
\ No newline at end of file
diff --git a/src/dubbo_provider_consumer_reg_table.erl b/src/dubbo_provider_consumer_reg_table.erl
index 3386cdc..c7a8dfa 100644
--- a/src/dubbo_provider_consumer_reg_table.erl
+++ b/src/dubbo_provider_consumer_reg_table.erl
@@ -15,7 +15,323 @@
 %% limitations under the License.
 %%------------------------------------------------------------------------------
 -module(dubbo_provider_consumer_reg_table).
--author("dlive").
+
+-behaviour(gen_server).
 
 %% API
--export([]).
+-export([start_link/0, start_consumer/2]).
+
+%% gen_server callbacks
+-export([init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3]).
+
+-export([update_consumer_connections/2,get_host_connections/2, select_connection/1, select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2]).
+
+-include("dubbo.hrl").
+-define(SERVER, ?MODULE).
+
+-define(INTERFCE_LIST_TABLE, interface_list).
+-define(PROVIDER_NODE_LIST_TABLE, provider_node_list).
+
+-record(state, {}).
+
+-ifdef(TEST).
+-compile([export_all]).
+-endif.
+
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(start_link() ->
+    {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%%                     {ok, State, Timeout} |
+%%                     ignore |
+%%                     {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+-spec(init(Args :: term()) ->
+    {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term()} | ignore).
+init([]) ->
+    init_ets_table(),
+    {ok, #state{}}.
+init_ets_table() ->
+    try ets:new(?INTERFCE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
+        ?INTERFCE_LIST_TABLE ->
+            ok
+    catch
+        _Type:Reason ->
+            logger:error("new ets table error ~p", [Reason]),
+            error
+    end,
+    try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
+        ?PROVIDER_NODE_LIST_TABLE ->
+            ok
+    catch
+        _Type1:Reason1 ->
+            logger:error("new ets table error ~p", [Reason1]),
+            error
+    end,
+    ok.
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+    State :: #state{}) ->
+    {reply, Reply :: term(), NewState :: #state{}} |
+    {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
+    {noreply, NewState :: #state{}} |
+    {noreply, NewState :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
+    {stop, Reason :: term(), NewState :: #state{}}).
+
+handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) ->
+
+    OldProviderList = get_interface_provider_node(Interface),
+    NewProviderList = add_consumer(ProviderNodeList, []),
+    DeleteProverList = OldProviderList -- NewProviderList,
+    clean_invalid_provider(DeleteProverList),
+    {reply, ok, State};
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_cast(Request :: term(), State :: #state{}) ->
+    {noreply, NewState :: #state{}} |
+    {noreply, NewState :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term(), NewState :: #state{}}).
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%%                                   {noreply, State, Timeout} |
+%%                                   {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
+    {noreply, NewState :: #state{}} |
+    {noreply, NewState :: #state{}, timeout() | hibernate} |
+    {stop, Reason :: term(), NewState :: #state{}}).
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
+    State :: #state{}) -> term()).
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
+    Extra :: term()) ->
+    {ok, NewState :: #state{}} | {error, Reason :: term()}).
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+start_consumer(Interface, ProviderNodeInfo) ->
+    gen_server:call(?SERVER, {add_consumer, Interface, ProviderNodeInfo}).
+
+
+
+get_host_connections(Host, Port) ->
+    HostFlag = get_host_flag(Host, Port),
+    List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag),
+    List2 = lists:map(
+        fun(#provider_node_list{host_flag = HostFlag,pid = Pid,readonly = Readonly}) ->
+            #connection_info{host_flag = HostFlag,pid = Pid,readonly = Readonly}
+        end, List),
+    List2.
+
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+%%add_consumer([], RegisterList) ->
+%%    RegisterList;
+%%add_consumer([ProviderNodeInfo | ProviderList], RegisterList) ->
+%%    case dubbo_node_config_util:parse_provider_info(ProviderNodeInfo) of
+%%        {ok, ProviderConfig} ->
+%%            HostFlag = get_host_flag(ProviderConfig),
+%%            case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
+%%                [] ->
+%%                    ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
+%%                    ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),
+%%                    ok;
+%%                List ->
+%%                    List2 = lists:map(fun(#provider_node_list{connection_info = ConnectionItem}) ->
+%%                        ConnectionItem
+%%                                      end, List),
+%%                    ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, List2, false),
+%%                    ok
+%%            end,
+%%            add_consumer(ProviderList, [HostFlag] ++ RegisterList);
+%%        {error, R1} ->
+%%            logger:error("parse provider info error reason ~p", [R1]),
+%%            add_consumer(ProviderList, RegisterList)
+%%    end.
+%%
+%%start_provider_process(HostFlag, Weight, ProviderConfig) ->
+%%    ExecutesList = lists:seq(1, ProviderConfig#provider_config.executes),
+%%    ConnectionList = lists:map(fun(Item) ->
+%%        ConnectionFlag = <<HostFlag/binary, (integer_to_binary(Item))/binary>>,
+%%        ConnectionFlagTerm = binary_to_atom(ConnectionFlag, utf8),
+%%        AChild = {ConnectionFlagTerm, {dubbo_netty_client, start_link, [ConnectionFlagTerm, HostFlag, ProviderConfig, Item]}, permanent, 2000, worker, [dubbo_netty_client]},
+%%        {ok, Pid} = dubbo_transport_pool_sup:add_children(AChild),
+%%        logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
+%%        #connection_info{connection_id = ConnectionFlagTerm, pid = Pid, weight = Weight, host_flag = HostFlag}
+%%                               end, ExecutesList),
+%%    ConnectionList.
+
+update_consumer_connections(Interface, Connections) ->
+    lists:map(
+        fun(Item) ->
+            HostFlag= Item#connection_info.host_flag,
+
+            case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
+                '$end_of_table' ->
+                    I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid}),
+                    logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
+                {_ObjectList,_Continuation} ->
+                    ok
+            end,
+            I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}),
+            logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
+            ok
+        end, Connections),
+    ok.
+
+get_host_flag(ProviderConfig) ->
+    HostFlag = <<(list_to_binary(ProviderConfig#provider_config.host))/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>,
+    HostFlag.
+get_host_flag(Host, Port) ->
+    <<(list_to_binary(Host))/binary, <<"_">>/binary, (integer_to_binary(Port))/binary>>.
+
+update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) ->
+    lists:map(fun(Item) ->
+        I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}),
+        logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
+        case IsUpdateProvideNode of
+            true ->
+                I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag, connection_info = Item}),
+                logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
+            false ->
+                ok
+        end,
+        ok
+              end, ConnectionList),
+    ok.
+
+get_interface_provider_node(Interface) ->
+    case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
+        [] ->
+            [];
+        List ->
+            ListRet = [Item#interface_list.connection_info#connection_info.host_flag || Item <- List],
+            dubbo_lists_util:del_duplicate(ListRet)
+    end.
+
+select_connection(Interface) ->
+    RandNum = rand:uniform(2048),
+    select_connection(Interface, RandNum).
+select_connection(Interface, RandNum) ->
+    case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
+        [] ->
+            {error, none};
+        List ->
+            Len = length(List),
+            RemNum = (RandNum rem Len) + 1,
+            InterfaceListItem = lists:nth(RemNum, List),
+            {ok, InterfaceListItem#interface_list.connection_info}
+    end.
+
+-spec(update_connection_readonly(pid(), boolean()) -> ok).
+update_connection_readonly(ConnectionPid, Readonly) ->
+    Pattern = #interface_list{pid = ConnectionPid, _ = '_'},
+    Objects = ets:match_object(?INTERFCE_LIST_TABLE, Pattern),
+    lists:map(fun(#interface_list{interface = Interface, pid = Pid, connection_info = ConnectionInfo} = InterferConnection) ->
+        logger:debug("[dubbo] update interface ~p ~p readonly", [Interface, Pid]),
+        NewConnectionInfo = ConnectionInfo#connection_info{readonly = Readonly},
+        NewObject = InterferConnection#interface_list{connection_info = NewConnectionInfo},
+        ets:delete_object(?INTERFCE_LIST_TABLE, InterferConnection),
+        ets:insert(?INTERFCE_LIST_TABLE, NewObject)
+              end, Objects),
+    {ok, length(Objects)}.
+
+clean_invalid_provider([]) ->
+    ok;
+clean_invalid_provider([HostFlag | DeleteProverList]) ->
+    case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
+        [] ->
+            ok;
+        ProviderNodeList ->
+            ProviderNodeList1 = dubbo_lists_util:del_duplicate(ProviderNodeList),
+            clean_connection_info(ProviderNodeList1)
+    end,
+    clean_invalid_provider(DeleteProverList).
+
+clean_connection_info(ProviderNodeList) ->
+    lists:map(fun(Item) ->
+        Pid = Item#provider_node_list.connection_info#connection_info.pid,
+        ConnectionId = Item#provider_node_list.connection_info#connection_info.connection_id,
+        Pattern = #interface_list{pid = Pid, _ = '_'},
+        ets:delete_object(?INTERFCE_LIST_TABLE, Pattern),
+        dubbo_transport_pool_sup:stop_children(ConnectionId)
+              end, ProviderNodeList),
+    ok.
\ No newline at end of file
diff --git a/src/dubbo_registry_zookeeper.erl b/src/dubbo_registry_zookeeper.erl
index 0b5b3f5..5d14588 100644
--- a/src/dubbo_registry_zookeeper.erl
+++ b/src/dubbo_registry_zookeeper.erl
@@ -311,4 +311,4 @@ gen_consumer_node_info(Consumer) ->
 
 %%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
 start_provider_process(Interface, ProviderList) ->
-    dubbo_consumer_pool:start_consumer(Interface, ProviderList).
\ No newline at end of file
+    dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList).
\ No newline at end of file
diff --git a/src/dubbo_consumer_pool_sup.erl b/src/dubbo_transport_pool_sup.erl
similarity index 87%
rename from src/dubbo_consumer_pool_sup.erl
rename to src/dubbo_transport_pool_sup.erl
index 77a6dbe..019c57d 100644
--- a/src/dubbo_consumer_pool_sup.erl
+++ b/src/dubbo_transport_pool_sup.erl
@@ -14,12 +14,12 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(dubbo_consumer_pool_sup).
+-module(dubbo_transport_pool_sup).
 
 -behaviour(supervisor).
 
 %% API
--export([start_link/0, add_children/1, stop_children/1]).
+-export([start_link/0, add_children/2, stop_children/1]).
 
 %% Supervisor callbacks
 -export([init/1]).
@@ -63,17 +63,18 @@ start_link() ->
     ignore |
     {error, Reason :: term()}).
 init([]) ->
-    RestartStrategy = one_for_one,
+    RestartStrategy = simple_one_for_one,
     MaxRestarts = 1000,
     MaxSecondsBetweenRestarts = 3600,
 
     SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
+    Child = {dubbo_client_default, {dubbo_client_default, start_link, []}, permanent, 2000, worker, [dubbo_client_default]},
+    {ok, {SupFlags, [Child]}}.
 
-    {ok, {SupFlags, []}}.
 
+add_children(ProvideConfig, Handler) ->
+    supervisor:start_child(?SERVER, [ProvideConfig, Handler]).
 
-add_children(ChildSpec) ->
-    supervisor:start_child(?SERVER, ChildSpec).
 stop_children(ChildID) ->
     supervisor:terminate_child(?SERVER, ChildID).
 %%%===================================================================
diff --git a/src/dubbo_zookeeper.erl b/src/dubbo_zookeeper.erl
index f62ace1..84a95d6 100644
--- a/src/dubbo_zookeeper.erl
+++ b/src/dubbo_zookeeper.erl
@@ -272,5 +272,5 @@ gen_consumer_node_info(Consumer) ->
 
 %%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
 start_provider_process(Interface, ProviderList) ->
-    dubbo_consumer_pool:start_consumer(Interface, ProviderList).
+    dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList).
 
diff --git a/src/dubboerl_sup.erl b/src/dubboerl_sup.erl
index a829015..a0d2fb0 100644
--- a/src/dubboerl_sup.erl
+++ b/src/dubboerl_sup.erl
@@ -45,8 +45,8 @@ init([]) ->
 %%    NettySer = {dubbo_netty_client,{dubbo_netty_client, start_link, []},transient,5000,worker,[dubbo_netty_client]},
     Id_count = {dubbo_id_generator, {dubbo_id_generator, start_link, []}, transient, 5000, worker, [dubbo_id_generator]},
     ProviderPoolSup = {dubbo_provider_worker_sup, {dubbo_provider_worker_sup, start_link, []}, transient, 5000, supervisor, [dubbo_provider_worker_sup]},
-    ConsumerPoolSup = {dubbo_consumer_pool_sup, {dubbo_consumer_pool_sup, start_link, []}, transient, 5000, supervisor, [dubbo_consumer_pool_sup]},
-    ConsumerPool = {dubbo_consumer_pool, {dubbo_consumer_pool, start_link, []}, transient, 5000, worker, [dubbo_consumer_pool]},
+    ConsumerPoolSup = {dubbo_transport_pool_sup, {dubbo_transport_pool_sup, start_link, []}, transient, 5000, supervisor, [dubbo_transport_pool_sup]},
+    ConsumerPool = {dubbo_provider_consumer_reg_table, {dubbo_provider_consumer_reg_table, start_link, []}, transient, 5000, worker, [dubbo_provider_consumer_reg_table]},
     ListNew1 =
         case application:get_env(dubboerl, registry, false) of
             true ->
diff --git a/test/dubbo_consumer_pool_tests.erl b/test/dubbo_consumer_pool_tests.erl
index 740ed84..0a67f1a 100644
--- a/test/dubbo_consumer_pool_tests.erl
+++ b/test/dubbo_consumer_pool_tests.erl
@@ -21,13 +21,13 @@
 -include("dubbo.hrl").
 
 update_readonly_test() ->
-    dubbo_consumer_pool:start_link(),
+    dubbo_provider_consumer_reg_table:start_link(),
     InterfaceName= <<"testinterfacename">>,
     HostFalg= <<"127.0.0.1/20880">>,
     ConnectionList = [
         #connection_info{connection_id=1,pid= testpid,weight = 30,host_flag = HostFalg},
         #connection_info{connection_id=2,pid= testpid2,weight = 30,host_flag = HostFalg}
     ],
-    dubbo_consumer_pool:update_connection_info(InterfaceName,HostFalg,ConnectionList,true),
-    {ok,Size} = dubbo_consumer_pool:update_connection_readonly(testpid,false),
+    dubbo_provider_consumer_reg_table:update_connection_info(InterfaceName,HostFalg,ConnectionList,true),
+    {ok,Size} = dubbo_provider_consumer_reg_table:update_connection_readonly(testpid,false),
     ?assertEqual(1,Size).