You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by dl...@apache.org on 2019/07/23 17:08:36 UTC

[dubbo-erlang] 09/09: feature: support protocol extension

This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit af4b9904242b34fa2fb15e4b767613272ff2a887
Author: DLive <xs...@163.com>
AuthorDate: Sun Jun 30 14:17:56 2019 +0800

    feature: support protocol extension
---
 config_example/sys.config                          |   7 +-
 src/dubbo_common.erl => include/constrans.hrl      |   9 +-
 include/dubbo.hrl                                  |  49 ++-
 include/dubboerl.hrl                               |  11 +-
 .../apache/dubbo/erlang/sample/service/App.java    |   2 +-
 .../src/main/resources/applicationConsumer.xml     |  28 ++
 .../src/main/resources/applicationProvider.xml     |   6 +-
 .../apps/dubbo_sample_service/src/userOperator.erl |   8 +-
 .../apps/dubboerl_demo/src/api_gateway_handle.erl  |  25 +-
 src/dubbo_adapter.erl                              |   2 +-
 src/dubbo_client_default.erl                       |  97 +----
 src/dubbo_cluster.erl                              |  10 +-
 src/dubbo_cluster_failfast.erl                     |  40 +-
 src/dubbo_common_fun.erl                           |  78 ++--
 src/dubbo_config_util.erl                          |   7 +-
 src/dubbo_directory.erl                            |  86 +++--
 src/dubbo_extension.erl                            |  95 +++--
 src/dubbo_filter.erl                               |   9 +-
 src/dubbo_invoker.erl                              |  29 +-
 src/dubbo_invoker_cluster.erl                      |  21 -
 src/dubbo_invoker_old.erl                          |  97 -----
 src/dubbo_loadbalance_random.erl                   |   8 +-
 src/dubbo_netty_client.erl                         | 430 ---------------------
 src/dubbo_node_config_util.erl                     |  66 ++--
 src/dubbo_protocol.erl                             |  11 +-
 src/dubbo_protocol_dubbo.erl                       | 122 +++++-
 src/dubbo_protocol_registry.erl                    |  89 ++++-
 src/dubbo_provider_consumer_reg_table.erl          |  78 ++--
 src/dubbo_provider_protocol.erl                    |  15 +-
 src/dubbo_reference_config.erl                     |  76 ++--
 src/dubbo_registry.erl                             |  35 +-
 src/dubbo_registry_zookeeper.erl                   | 201 ++++++----
 src/dubbo_service_config.erl                       | 110 ++++++
 src/{dubbo_zookeeper.erl => dubbo_shutdown.erl}    | 140 +------
 src/dubbo_traffic_control.erl                      |  24 +-
 src/dubbo_type_transfer.erl                        |  25 +-
 src/dubboerl.erl                                   |  21 +-
 src/dubboerl_app.erl                               |  13 +-
 src/dubboerl_sup.erl                               |  23 +-
 test/consumer_SUITE.erl                            |   4 +-
 test/dubbo_config_parser_tests.erl                 |   2 +-
 test/dubbo_consumer_pool_tests.erl                 |   5 +-
 ...er_SUITE.erl => dubbo_service_config_SUITE.erl} |  71 +---
 test/dubbo_zookeeper_tests.erl                     |  25 --
 ...nsumer_SUITE.erl => reference_config_SUITE.erl} |  50 +--
 test/userOperator.erl                              |   8 +-
 46 files changed, 1049 insertions(+), 1319 deletions(-)

diff --git a/config_example/sys.config b/config_example/sys.config
index 07cfaf9..c26fc54 100644
--- a/config_example/sys.config
+++ b/config_example/sys.config
@@ -20,11 +20,10 @@
         {registry,zookeeper},
         {zookeeper_list,[{"127.0.0.1",2181}]},
         {application,<<"testdubboerl">>},
-        {registry,true},
-        {protocol,hessian},
-        {port,20881},
+        {serialization,hessian},
+        {protocol, {dubbo, [{port, 20882}]}},
         {consumer,[
-           % {<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
+%%            {<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
         ]},
         {provider,[
             {dubbo_service_user_impl,userOperator,<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
diff --git a/src/dubbo_common.erl b/include/constrans.hrl
similarity index 87%
rename from src/dubbo_common.erl
rename to include/constrans.hrl
index 1a35b58..6cb4a89 100644
--- a/src/dubbo_common.erl
+++ b/include/constrans.hrl
@@ -14,7 +14,10 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(dubbo_common).
 
-%% API
--export([]).
+
+-define(CATEGORY_KEY, <<"category">>).
+
+-define(PROVIDERS_CATEGORY, <<"providers">>).
+
+-define(CONSUMERS_CATEGORY, <<"consumers">>).
diff --git a/include/dubbo.hrl b/include/dubbo.hrl
index 282e0da..b6a32e9 100644
--- a/include/dubbo.hrl
+++ b/include/dubbo.hrl
@@ -63,15 +63,37 @@
     decode_state
 }).
 
+
+-record(reference_config,{
+    interface,
+    application = <<"NoName">> :: binary(),
+    category = <<"consumers">> :: binary(),
+    check = false :: boolean(),
+    default_timeout = 500 :: integer(),
+    dubbo_version = <<"2.5.3">> :: binary(),
+    methods = [] :: list(),
+    revision = <<"">> :: binary(),
+    side = <<"consumers">> :: binary(),
+    sync = false ::boolean()
+
+}).
+
 -record(dubbo_rpc_invocation, {
     serialVersionUID = -4355285085441097045,
-    className :: string(),
-    classVersion :: string(),
-    methodName :: string(),
-    parameterDesc :: string(),
+    className :: binary(),
+    classVersion :: binary(),
+    methodName :: binary(),
+    parameterDesc :: binary(),
     parameterTypes = [] :: [#type_def{}],
     parameters = [] :: [term()],
-    attachments = [] :: [term()]
+    attachments = [] :: [term()],
+
+    call_ref :: atom(),
+    reference_ops :: #reference_config{},
+    loadbalance :: atom(),
+    source_pid :: pid(),
+    transport_pid :: pid()
+
 }).
 
 -record(consumer_config, {
@@ -96,15 +118,24 @@
     application,
     dubbo = <<"2.5.3">>,
     methods = [],
-    side = <<"provider">>
+    side = <<"provider">>,
+    impl_handle
+}).
+
+-record(invoker,{
+    url,
+    handler
 }).
 
 
--record(interface_info, {interface, loadbalance}).
+-record(interface_info, {interface, loadbalance, protocol}).
 
 -record(interface_list, {interface, pid, connection_info}).
-%%-record(provider_node_list, {host_flag, pid, weight, readonly = false}).
 -record(connection_info, {host_flag, pid, weight, readonly = false}).
 
 -type dubbo_request() :: #dubbo_request{}.
--type dubbo_response() :: #dubbo_response{}.
\ No newline at end of file
+-type dubbo_response() :: #dubbo_response{}.
+-type invocation():: #dubbo_rpc_invocation{}.
+
+%% @doc invoke return info
+-type invoke_result() :: {ok, reference()}| {ok, reference(), Data :: any(), RpcContent :: list()}| {error, Reason :: timeout|no_provider|any()}.
\ No newline at end of file
diff --git a/include/dubboerl.hrl b/include/dubboerl.hrl
index a9be5f1..7c8b1af 100644
--- a/include/dubboerl.hrl
+++ b/include/dubboerl.hrl
@@ -20,7 +20,16 @@
 
 -define(TRAFFIC_CONTROL, traffic_control).
 
+-define(SERVICE_EXPORT_TABLE,dubbo_service_export).
 
--record(dubbo_url, {scheme, user_info, host, port, path, parameters, fragment}).
+-record(dubbo_url, {
+    scheme :: binary() ,
+    user_info :: binary(),
+    host:: binary(),
+    port::integer(),
+    path:: binary(),
+    parameters::map(),
+    fragment::binary()
+}).
 
 -record(dubbo_invoker, {host_flag, handle}).
\ No newline at end of file
diff --git a/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/App.java b/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/App.java
index dc6deb1..52cd1d1 100644
--- a/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/App.java
+++ b/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/App.java
@@ -30,7 +30,7 @@ public class App {
     public static void main(String[] args) throws IOException {
         System.out.println("将要监听服务");
         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
-                new String[]{"applicationProvider.xml"});
+                new String[]{"applicationConsumer.xml"});
         context.start();
         UserOperator userOperator = (UserOperator) context.getBean("userInterface");
         UserInfo result = userOperator.getUserInfo("hh-bb");
diff --git a/samples/dubbo-sample-service/src/main/resources/applicationConsumer.xml b/samples/dubbo-sample-service/src/main/resources/applicationConsumer.xml
new file mode 100644
index 0000000..296bc83
--- /dev/null
+++ b/samples/dubbo-sample-service/src/main/resources/applicationConsumer.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
+    <dubbo:application name="hello-world"/><!-- 注册地址 -->
+    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
+    <dubbo:protocol name="dubbo" port="20880"/>
+
+    <dubbo:reference  id="userInterface" interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" retries="0" />
+</beans>
diff --git a/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml b/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
index 425e41e..7a32064 100644
--- a/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
+++ b/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
@@ -10,8 +10,8 @@
 
     <dubbo:consumer check="false" timeout="300000" id="dubboConsumerConfig" retries="0"/>
 
-<!--    <bean id="userService" class="org.apache.dubbo.erlang.sample.service.impl.UserOperatorImpl"/>-->
-<!--    <dubbo:service interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" ref="userService"/>-->
+    <bean id="userService" class="org.apache.dubbo.erlang.sample.service.impl.UserOperatorImpl"/>
+    <dubbo:service interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" ref="userService"/>
 
-    <dubbo:reference  id="userInterface" interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" retries="0" />
+<!--    <dubbo:reference  id="userInterface" interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" retries="0" />-->
 </beans>
diff --git a/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl b/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
index d7e0809..b92ca62 100644
--- a/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
+++ b/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
@@ -67,7 +67,7 @@ getUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec genUserId()->
@@ -96,7 +96,7 @@ genUserId( RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserInfo(Arg0::#userInfoRequest{})->
@@ -127,7 +127,7 @@ queryUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserList(Arg0::list())->
@@ -158,5 +158,5 @@ queryUserList(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
diff --git a/samples/dubboerl_demo/apps/dubboerl_demo/src/api_gateway_handle.erl b/samples/dubboerl_demo/apps/dubboerl_demo/src/api_gateway_handle.erl
index b89efb8..5430532 100644
--- a/samples/dubboerl_demo/apps/dubboerl_demo/src/api_gateway_handle.erl
+++ b/samples/dubboerl_demo/apps/dubboerl_demo/src/api_gateway_handle.erl
@@ -1,13 +1,20 @@
-%%%-------------------------------------------------------------------
-%%% @author dlive
-%%% @copyright (C) 2018, <COMPANY>
-%%% @doc
-%%%
-%%% @end
-%%% Created : 28. Feb 2018 10:57 PM
-%%%-------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
 -module(api_gateway_handle).
--author("dlive").
 
 -include_lib("dubbo_sample_service/include/dubbo_sample_service.hrl").
 
diff --git a/src/dubbo_adapter.erl b/src/dubbo_adapter.erl
index 00d628f..cd07342 100644
--- a/src/dubbo_adapter.erl
+++ b/src/dubbo_adapter.erl
@@ -28,7 +28,7 @@ reference(Data) ->
         mid = dubbo_id_generator:gen_id(),
         data = Data,
         mversion = <<"0.0.0">>,
-        serialize_type = serialize_value(application:get_env(dubboerl, protocol, hessian))
+        serialize_type = serialize_value(application:get_env(dubboerl, serialization, hessian))
     }.
 
 serialize_value(json) ->
diff --git a/src/dubbo_client_default.erl b/src/dubbo_client_default.erl
index 220c30f..3fe08fa 100644
--- a/src/dubbo_client_default.erl
+++ b/src/dubbo_client_default.erl
@@ -28,7 +28,8 @@
     handle_info/2,
     terminate/2,
     code_change/3]).
--export([start_link/1]).
+
+-export([start_link/2]).
 
 -export([check_recv_data/2]).
 
@@ -38,7 +39,6 @@
 -record(state, {provider_config, socket = undefined,
     heartbeat = #heartbeat{},
     recv_buffer = <<>>,         %%从服务端接收的数据
-    host_flag,
     reconnection_timer,
     handler
 }).
@@ -53,10 +53,8 @@
 %%
 %% @end
 %%--------------------------------------------------------------------
--spec(start_link(Name :: binary(), ProviderConfig :: #provider_config{}) ->
-    {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
-start_link(ProviderConfig) ->
-    gen_server:start_link(?MODULE, [ProviderConfig], []).
+start_link(ProviderConfig, Handle) ->
+    gen_server:start_link(?MODULE, [ProviderConfig, Handle], []).
 
 %%%===================================================================
 %%% gen_server callbacks
@@ -76,7 +74,7 @@ start_link(ProviderConfig) ->
 -spec(init(Args :: term()) ->
     {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term()} | ignore).
-init([HostFlag, ProviderConfig]) ->
+init([ProviderConfig, Handle]) ->
     #provider_config{host = Host, port = Port} = ProviderConfig,
     State = case open(Host, Port) of
                 {ok, Socket} ->
@@ -86,9 +84,9 @@ init([HostFlag, ProviderConfig]) ->
             end,
     NowStamp = dubbo_time_util:timestamp_ms(),
     HeartBeatInfo = #heartbeat{last_read = NowStamp, last_write = NowStamp},
-    logger:info("netty client start ~p", [HostFlag]),
+    logger:info("netty client start ~p ~p", [Host, Port]),
     start_heartbeat_timer(HeartBeatInfo),
-    {ok, State#state{provider_config = ProviderConfig, heartbeat = HeartBeatInfo, host_flag = HostFlag}}.
+    {ok, State#state{provider_config = ProviderConfig, heartbeat = HeartBeatInfo, handler = Handle}}.
 
 %%--------------------------------------------------------------------
 %% @private
@@ -120,11 +118,11 @@ handle_call(_Request, _From, State) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), NewState :: #state{}}).
 
-handle_cast({send_request, Ref, Request, Data, SourcePid, RequestState}, State) ->
+handle_cast({send_request, Ref, Request, Data, SourcePid, Invocation}, State) ->
     logger:debug("[send_request begin] send data to provider consumer mid ~p pid ~p sourcePid ~p", [Request#dubbo_request.mid, self(), SourcePid]),
     NewState = case send_msg(Data, State) of
                    ok ->
-                       save_request_info(Request, SourcePid, Ref, RequestState),
+                       save_request_info(Request, SourcePid, Ref, Invocation),
                        logger:debug("[send_request end] send data to provider consumer pid ~p state ok", [self()]),
                        State;
                    {error, closed} ->
@@ -162,7 +160,7 @@ handle_info({tcp, _Port, Data}, #state{recv_buffer = RecvBuffer} = State) ->
 %%    logger:debug("[INFO] recv one data ~w",[Data]),
     {ok, NextBuffer, NewState} = case check_recv_data(<<RecvBuffer/binary, Data/binary>>, State) of
                                      {next_buffer, NextBuffer2, State3} ->
-                                         logger:debug("[INFO] recv one data state wait next_buffer"),
+                                         logger:debug("recv one data state wait next_buffer"),
                                          {ok, NextBuffer2, State3}
                                  end,
 %%    HeartbeatInfo =update_heartbeat(write,NewState#state.heartbeat),
@@ -232,6 +230,8 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%===================================================================
 
+open(Host, Port) when is_binary(Host) ->
+    open(binary_to_list(Host), Port);
 open(Host, Port) ->
     logger:debug("will connect to provider ~p ~p", [Host, Port]),
     %
@@ -356,76 +356,17 @@ check_recv_data(<<>>, State) ->
     {next_buffer, <<>>, State}.
 
 
-process_data(Data, State) ->
-    <<Header:16/binary, RestData/binary>> = Data,
-    case dubbo_codec:decode_header(Header) of
-        {ok, response, ResponseInfo} ->
-            process_response(ResponseInfo#dubbo_response.is_event, ResponseInfo, RestData, State),
-%%            dubbo_traffic_control:decr_count(State#state.host_flag),
-%%            case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
-%%                undefined->
-%%                    logger:error("dubbo response can't find request data,response ~p",[ResponseInfo]);
-%%                {SourcePid,Ref,_RequestState} ->
-%%                    {ok,Res} = dubbo_codec:decode_response(ResponseInfo,RestData),
-%%
-%%                    logger:info("got one response mid ~p, is_event ~p state ~p",[Res#dubbo_response.mid,Res#dubbo_response.is_event,Res#dubbo_response.state]),
-%%                    case Res#dubbo_response.is_event of
-%%                        false ->
-%%                            %% todo rpccontent need merge response with request
-%%                            RpcContent=[],
-%%                            ResponseData = dubbo_type_transfer:response_to_native(Res),
-%%                            gen_server:cast(SourcePid,{response_process,Ref,RpcContent,ResponseData});
-%%                        _->
-%%                            ok
-%%                    end
-%%            end,
-            {ok, State};
-        {ok, request, RequestInfo} ->
-            {ok, Req} = dubbo_codec:decode_request(RequestInfo, RestData),
-            logger:info("get one request mid ~p, is_event ~p", [Req#dubbo_request.mid, Req#dubbo_request.is_event]),
-            {ok, State2} = process_request(Req#dubbo_request.is_event, Req, State),
-            {ok, State2};
-        {error, Type, RelData} ->
-            logger:error("process_data error type ~p RelData ~p", [Type, RelData]),
-            {ok, State}
-    end.
-
-
-%% @doc process event
--spec process_response(IsEvent :: boolean(), #dubbo_response{}, #state{}, term()) -> ok.
-process_response(false, ResponseInfo, RestData, State) ->
-    dubbo_traffic_control:decr_count(State#state.host_flag),
-    case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
-        undefined ->
-            logger:error("dubbo response can't find request data,response ~p", [ResponseInfo]);
-        {SourcePid, Ref, _RequestState} ->
-            {ok, Res} = dubbo_codec:decode_response(ResponseInfo, RestData),
-            logger:info("got one response mid ~p, is_event ~p state ~p", [Res#dubbo_response.mid, Res#dubbo_response.is_event, Res#dubbo_response.state]),
-            case Res#dubbo_response.is_event of
-                false ->
-                    %% todo rpccontent need merge response with request
-                    RpcContent = [],
-                    ResponseData = dubbo_type_transfer:response_to_native(Res),
-                    gen_server:cast(SourcePid, {response_process, Ref, RpcContent, ResponseData});
-                _ ->
-                    ok
-            end
+process_data(Data, #state{handler = ProtocolHandle} = State) ->
+    case ProtocolHandle:data_receive(Data) of
+        ok ->
+            ok;
+        {do_heartbeat, Mid} ->
+            send_heartbeat_msg(Mid, false,State),
+            ok
     end,
-    {ok, State};
-process_response(true, _ResponseInfo, _RestData, State) ->
     {ok, State}.
 
-process_request(true, #dubbo_request{data = <<"R">>}, State) ->
-    {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
-    {ok, State};
-process_request(true, Request, State) ->
-    {ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State),
-    {ok, NewState};
-process_request(false, Request, State) ->
-    {ok, State}.
 
 
 save_request_info(Request, SourcePid, Ref, RequestState) ->
     put(Request#dubbo_request.mid, {SourcePid, Ref, RequestState}).
-get_earse_request_info(Mid) ->
-    erase(Mid).
\ No newline at end of file
diff --git a/src/dubbo_cluster.erl b/src/dubbo_cluster.erl
index da4031e..3139b47 100644
--- a/src/dubbo_cluster.erl
+++ b/src/dubbo_cluster.erl
@@ -15,7 +15,11 @@
 %% limitations under the License.
 %%------------------------------------------------------------------------------
 -module(dubbo_cluster).
--author("dlive").
 
-%% API
--export([]).
+
+
+select(Interface) ->
+    ok.
+
+get_loadbalance() ->
+    ok.
\ No newline at end of file
diff --git a/src/dubbo_cluster_failfast.erl b/src/dubbo_cluster_failfast.erl
index 6078517..48809f8 100644
--- a/src/dubbo_cluster_failfast.erl
+++ b/src/dubbo_cluster_failfast.erl
@@ -15,7 +15,43 @@
 %% limitations under the License.
 %%------------------------------------------------------------------------------
 -module(dubbo_cluster_failfast).
--author("dlive").
+-behaviour(dubbo_filter).
 
+-include("dubbo.hrl").
 %% API
--export([]).
+-export([invoke/2, do_response/2]).
+
+
+invoke(#dubbo_rpc_invocation{className = Interface, loadbalance = LoadBalance} = Invocation, Acc) ->
+    case dubbo_provider_consumer_reg_table:select_connection(Invocation#dubbo_rpc_invocation.className) of
+        {ok, List} ->
+            Connection = loadbalance_select(LoadBalance, List),
+            #connection_info{pid = Pid, host_flag = HostFlag} = Connection,
+            {ok, Invocation#dubbo_rpc_invocation{transport_pid = Pid}, Acc};
+%%            case dubbo_traffic_control:check_goon(HostFlag, 199) of
+%%                ok ->
+%%
+%%%%                    Request2 = merge_attachments(Request, RpcContext), %% @todo need add rpc context to attachment
+%%
+%%%%                    {ok, RequestData} = dubbo_codec:encode_request(Request2),
+%%%%                    Ref = get_ref(RequestState),
+%%%%                    gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}),
+%%%%                    case is_sync(RequestState) of
+%%%%                        true ->
+%%%%                            sync_receive(Ref, get_timeout(RequestState));
+%%%%                        false -> {ok, Ref}
+%%%%                    end;
+%%                full ->
+%%                    {error, request_full}
+%%            end;
+        {error, none} ->
+            logger:error("[INVOKE] ~p error Reason no_provider", [Interface]),
+            {stop, no_provider}
+    end.
+
+loadbalance_select(LoadBalance, ConnectionList) ->
+    Connection = LoadBalance:select(ConnectionList),
+    Connection.
+
+do_response(Invocation, Result) ->
+    {ok, Invocation, Result}.
diff --git a/src/dubbo_common_fun.erl b/src/dubbo_common_fun.erl
index 5f38fbd..b68ad99 100644
--- a/src/dubbo_common_fun.erl
+++ b/src/dubbo_common_fun.erl
@@ -18,7 +18,9 @@
 
 -include("dubboerl.hrl").
 %% API
--export([local_ip_v4/0, local_ip_v4_str/0, parse_url/1, url_to_binary/1]).
+-export([local_ip_v4/0, local_ip_v4_str/0, parse_url/1, url_to_binary/1, parse_url_parameter/1, binary_list_join/2]).
+
+-define(URL_PATH_SEPARATOR,47).  %% 47 == <<"/">>
 
 local_ip_v4() ->
     {ok, Addrs} = inet:getifaddrs(),
@@ -32,12 +34,12 @@ local_ip_v4_str() ->
     list_to_binary(io_lib:format("~p.~p.~p.~p", [V1, V2, V3, V4])).
 
 
--spec(parse_url(Url :: binary()|list()) -> {ok, #dubbo_url{}}).
+-spec(parse_url(Url :: binary()|list()) -> {ok, #dubbo_url{}}|{error, any()}).
 parse_url(Url) when is_binary(Url) ->
     parse_url(binary_to_list(Url));
 parse_url(Url) ->
     case http_uri:parse(Url, []) of
-        {ok, {Scheme, _UserInfo, Host, Port, _Path, Query}} ->
+        {ok, {Scheme, UserInfo, Host, Port, Path, Query}} ->
             QueryStr = case lists:prefix("?", Query) of
                            true ->
                                [_ | Query2] = Query,
@@ -45,22 +47,34 @@ parse_url(Url) ->
                            false ->
                                Query
                        end,
-            QueryListTmp = string:tokens(QueryStr, "&"),
-            Parameters = parse_url_parameter(QueryListTmp, #{}),
-            Result = #dubbo_url{scheme = Scheme, host = Host, port = Port, parameters = Parameters},
+            Parameters = parse_url_parameter(QueryStr),
+            Result = #dubbo_url{
+                scheme = atom_to_binary(Scheme, utf8),
+                user_info = UserInfo,
+                host = list_to_binary(Host),
+                port = Port,
+                path = Path,
+                parameters = Parameters
+            },
             {ok, Result};
         {error, R1} ->
             {error, R1}
     end.
 
 
+parse_url_parameter(ParameterStr) when is_binary(ParameterStr) ->
+    parse_url_parameter(binary_to_list(ParameterStr));
+parse_url_parameter(ParameterStr) ->
+    QueryListTmp = string:tokens(ParameterStr, "&"),
+    parse_url_parameter(QueryListTmp, #{}).
+
 parse_url_parameter([], Parameters) ->
     Parameters;
 parse_url_parameter([Item | Rest], Parameters) ->
     case string:tokens(Item, "=") of
         KeyPair when length(KeyPair) == 2 ->
             [Key, Value] = KeyPair,
-            parse_url_parameter(Rest, maps:put(Key, Value, Parameters));
+            parse_url_parameter(Rest, maps:put(list_to_binary(Key), list_to_binary(Value), Parameters));
         KeyPair2 ->
             logger:error("parse parameter error, keypair ~p", [KeyPair2]),
             parse_url_parameter(Rest, Parameters)
@@ -68,23 +82,43 @@ parse_url_parameter([Item | Rest], Parameters) ->
 
 
 url_to_binary(UrlInfo) ->
-    ParameterStr =
-        case UrlInfo#dubbo_url.parameters of
-            undefined ->
-                "";
-            Parameter ->
-                KeyValues = maps:to_list(Parameter),
-                KeyValues2 = [io_lib:format("~s=~s", [Key, http_uri:encode(Value)]) || {Key, Value} <= KeyValues],
-                ParameterStr1 = string:join(KeyValues2, "&"),
-                ParameterStr2 = ["?" | ParameterStr1],
-                list_to_binary(ParameterStr2)
-        end,
-    Value = io_lib:format(<<"~s://~s:~p/~s?~s">>,
+    ParameterStr = format_parameter(UrlInfo#dubbo_url.parameters),
+    Value = lists:flatten(io_lib:format(<<"~s://~s:~p/~s?~s">>,
         [
             UrlInfo#dubbo_url.scheme,
             UrlInfo#dubbo_url.host,
             UrlInfo#dubbo_url.port,
-            UrlInfo#dubbo_url.path,
+            format_path(UrlInfo#dubbo_url.path),
             ParameterStr
-        ]),
-    list_to_binary(Value).
\ No newline at end of file
+        ])),
+    list_to_binary(Value).
+format_path(<< ?URL_PATH_SEPARATOR:8,Rest/binary>>) ->
+    logger:debug("format_path1 ~p",[Rest]),
+    Rest;
+format_path([?URL_PATH_SEPARATOR|Rest]) ->
+    logger:debug("format_path3 ~p",[Rest]),
+    Rest;
+format_path(Value) ->
+    logger:debug("format_path2 ~p",[Value]),
+    Value.
+
+format_parameter(undefined) ->
+    "";
+format_parameter(Parameter) when is_map(Parameter) ->
+    KeyValues = maps:to_list(Parameter),
+    format_parameter(KeyValues);
+format_parameter(Parameters) ->
+    KeyValues2 = [io_lib:format("~s=~s", [Key, Value]) || {Key, Value} <- Parameters],
+    ParameterStr1 = string:join(KeyValues2, "&"),
+    ParameterStr1.
+
+binary_list_join([], _Separator) ->
+    <<"">>;
+binary_list_join([H | T], Separator) ->
+    binary_list_join1(H, T, Separator).
+
+binary_list_join1(Header, [], _Separator) ->
+    Header;
+binary_list_join1(Header, [Item | Rest], Separator) ->
+    binary_list_join1(<<Header/binary, Separator/binary, Item/binary>>, Rest, Separator).
+
diff --git a/src/dubbo_config_util.erl b/src/dubbo_config_util.erl
index 4db73d0..0ef0760 100644
--- a/src/dubbo_config_util.erl
+++ b/src/dubbo_config_util.erl
@@ -18,7 +18,7 @@
 
 -include("dubbo.hrl").
 %% API
--export([gen_consumer/3, gen_provider/5]).
+-export([gen_consumer/3, gen_provider/6]).
 
 
 gen_consumer(Application, Interface, Option) ->
@@ -34,7 +34,7 @@ gen_consumer(Application, Interface, Option) ->
         side = <<"consumers">>
     }.
 
-gen_provider(Application, Port, Interface, MethodList, Option) ->
+gen_provider(Application, Port, Interface, MethodList, ImplModuleName, _Option) ->
     Host = dubbo_network_tools:local_ipv4_binary(),
     MethodList2 = [atom_to_binary(Item, utf8) || Item <- MethodList],
     #provider_config{
@@ -46,5 +46,6 @@ gen_provider(Application, Port, Interface, MethodList, Option) ->
         executes = 10,
         application = Application,
         methods = MethodList2,
-        side = <<"provider">>
+        side = <<"provider">>,
+        impl_handle = ImplModuleName
     }.
\ No newline at end of file
diff --git a/src/dubbo_directory.erl b/src/dubbo_directory.erl
index c667f29..4945237 100644
--- a/src/dubbo_directory.erl
+++ b/src/dubbo_directory.erl
@@ -20,7 +20,7 @@
 -include("dubboerl.hrl").
 -include("dubbo.hrl").
 
--export([subscribe/2,notify/2]).
+-export([subscribe/2, notify/2]).
 %% API
 -export([start_link/0]).
 
@@ -72,59 +72,71 @@ start_link() ->
 init([]) ->
     {ok, #state{}}.
 
-subscribe(RegistryName,SubcribeUrl)->
-    try gen_server:call(?SERVER,{subscribe,RegistryName,SubcribeUrl},5000) of
-        ok->
-            ok
-    catch
-        Error:Reason->
-            %% todo improve erro type
-            {error,Reason}
-    end.
-
-notify(Interface,UrlList)->
-    %% @todo if UrlList size is 1, and protocol is empty ,need destroyAllInvokers
+subscribe(RegistryName, SubcribeUrl) ->
+    RegistryName:subscribe(SubcribeUrl, fun dubbo_directory:notify/2),
+    ok.
 
+notify(Interface, []) ->
+    logger:info("[DUBBO] directory get notify, interface provider list is empty"),
+    ok;
+notify(Interface, UrlList) ->
     refresh_invoker(UrlList),
-%%    dubbo_consumer_pool:start_consumer(Interface, UrlList),
     ok.
 
 
-refresh_invoker(UrlList)->
+refresh_invoker(UrlList) ->
     case pick_interface(UrlList) of
-        {error,Reason}->
+        {error, Reason} ->
             fail;
-        {"empty",Interface,_}->
+        {<<"empty">>, Interface,_} ->
+            OldProviderHosts = dubbo_provider_consumer_reg_table:get_interface_provider_node(Interface),
+            dubbo_provider_consumer_reg_table:clean_invalid_provider(OldProviderHosts),
             todo_destroy;
-        {_,Interface,LoadBalance} ->
+        {Schame, Interface, LoadBalance} ->
+            ProtocolModule = binary_to_existing_atom(<<<<"dubbo_protocol_">>/binary, Schame/binary>>, latin1),
+
+            logger:info("[DUBBO] refresh invoker for interface ~p loadbalance ~p protocol ~p", [Interface, LoadBalance, ProtocolModule]),
             OldProviderHosts = dubbo_provider_consumer_reg_table:get_interface_provider_node(Interface),
-            NewInvokers = refresh_invoker(UrlList,[]),
+            NewInvokers = refresh_invoker(UrlList, []),
             NewProviderHosts = [Item#dubbo_invoker.host_flag || Item <- NewInvokers],
             DeleteProverList = OldProviderHosts -- NewProviderHosts,
             dubbo_provider_consumer_reg_table:clean_invalid_provider(DeleteProverList),
-            dubbo_provider_consumer_reg_table:update_connection_info(#interface_info{interface = Interface,loadbalance = LoadBalance})
+
+            lists:map(
+                fun(NewHosts) ->
+                    NewHostConnections = dubbo_provider_consumer_reg_table:query_node_connections(NewHosts),
+                    dubbo_provider_consumer_reg_table:update_consumer_connections(Interface, NewHostConnections)
+                end, NewProviderHosts),
+
+
+%%            dubbo_provider_consumer_reg_table:update_connection_info(#interface_info{interface = Interface,loadbalance = LoadBalance})
+            dubbo_provider_consumer_reg_table:update_interface_info(#interface_info{interface = Interface, loadbalance = LoadBalance, protocol = ProtocolModule})
     end.
 %%    OldProviderHosts =
 
-
-refresh_invoker([Url|Rest],Acc)->
-    case dubbo_extension:run_fold(protocol,refer,[Url],undefined) of
+refresh_invoker([], Acc) ->
+    Acc;
+refresh_invoker([Url | Rest], Acc) ->
+    logger:info("refresh invoker ~s", [Url]),
+    case dubbo_extension:run_fold(protocol, refer, [Url], undefined) of
         undefined ->
-            refresh_invoker(Rest,Acc);
-        {ok,Invoker} ->
-            refresh_invoker(Rest,[Invoker|Acc]);
-        {stop,_}->
-            refresh_invoker(Rest,Acc)
+            refresh_invoker(Rest, Acc);
+        {ok, Invoker} ->
+            refresh_invoker(Rest, [Invoker | Acc]);
+        {stop, _} ->
+            refresh_invoker(Rest, Acc)
     end.
 
 pick_interface([Url | _]) ->
     case dubbo_common_fun:parse_url(Url) of
-        {ok,UrlInfo}->
-            Interface = maps:get("interface",UrlInfo#dubbo_url.parameters),
-            LoadBalance = list_to_atom("dubbo_loadbalance_" ++ maps:get("loadbalance",UrlInfo#dubbo_url.parameters,"random")),
-            {UrlInfo#dubbo_url.scheme,Interface,LoadBalance};
-        {error,Reason} ->
-            {error,Reason}
+        {ok, UrlInfo} ->
+            logger:debug("pick interface info from ~p", [Url]),
+            Interface = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
+            LoadBalanceName = maps:get(<<"loadbalance">>, UrlInfo#dubbo_url.parameters, <<"random">>),
+            LoadBalance = binary_to_existing_atom(<<<<"dubbo_loadbalance_">>/binary, LoadBalanceName/binary>>, latin1),
+            {UrlInfo#dubbo_url.scheme, Interface, LoadBalance};
+        {error, Reason} ->
+            {error, Reason}
     end.
 
 
@@ -143,9 +155,9 @@ pick_interface([Url | _]) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
     {stop, Reason :: term(), NewState :: #state{}}).
-handle_call({subscribe,RegistryName,SubcribeUrl}, _From, State) ->
-    NotifyFun= fun dubbo_directory:notify/1,
-    apply(RegistryName,subscribe,[SubcribeUrl,NotifyFun]),
+handle_call({subscribe, RegistryName, SubcribeUrl}, _From, State) ->
+    NotifyFun = fun dubbo_directory:notify/1,
+    apply(RegistryName, subscribe, [SubcribeUrl, NotifyFun]),
     {reply, ok, State};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
diff --git a/src/dubbo_extension.erl b/src/dubbo_extension.erl
index 4e5eb0c..ee4dae2 100644
--- a/src/dubbo_extension.erl
+++ b/src/dubbo_extension.erl
@@ -18,7 +18,7 @@
 -behaviour(gen_server).
 
 %% API
--export([run/3,run_fold/4,register/3,unregister/3]).
+-export([run/3, run_fold/4, run_fold/5, register/3, unregister/3, invoke/5, invoke_foldr/4]).
 
 
 -export([start_link/0]).
@@ -31,70 +31,104 @@
 -record(state, {}).
 
 
--spec reg(HookName::hookname(), Module::atom(),Priority::integer()) -> ok | {error, term()}.
-register(HookName, Module,Priority) ->
-    gen_server:call(?MODULE, {register, HookName, {Priority, {Module}}}).
+-spec register(HookName :: atom(), Module :: atom(), Priority :: integer()) -> ok | {error, term()}.
+register(HookName, Module, Priority) ->
+    gen_server:call(?MODULE, {register, HookName, {Priority, Module}}).
 
--spec unregister(HookName::hookname(), Module::atom(),Priority::integer()) -> ok.
-unregister(HookName, Module,Priority) ->
+-spec unregister(HookName :: atom(), Module :: atom(), Priority :: integer()) -> ok.
+unregister(HookName, Module, Priority) ->
     gen_server:call(?MODULE, {unregister, HookName, {Priority, Module}}).
 
-%% @doc run all hooks registered for the HookName.
-%% Execution can be interrupted if an hook return the atom `stop'.
--spec run(HookName::hookname(), Args::list()) -> ok.
-run(HookName,Fun, Args) ->
+-spec run(HookName :: atom(), Fun :: atom(), Args :: list()) -> ok.
+run(HookName, Fun, Args) ->
     case find_hooks(HookName) of
         no_hook -> ok;
-        Hooks -> run1(Hooks, HookName,Fun, Args)
+        Hooks ->
+            run1(Hooks, HookName, Fun, Args)
     end.
 
-run1([], _HookName,_Fun, _Args) ->
+run1([], _HookName, _Fun, _Args) ->
     ok;
 run1([M | Rest], HookName, Fun, Args) ->
     Ret = (catch apply(M, Fun, Args)),
     case Ret of
         {'EXIT', Reason} ->
+            io:format(user, "~p~n error running extension: ~p~n", [HookName, Reason]),
             logger:error("~p~n error running extension: ~p~n", [HookName, Reason]),
-            run1(Rest, HookName,Fun, Args);
+            run1(Rest, HookName, Fun, Args);
         stop ->
             ok;
         _ ->
-            run1(Rest, HookName,Fun, Args)
+            run1(Rest, HookName, Fun, Args)
     end.
 
--spec run_fold(HookName::hookname(), Args::list(), Acc::any()) -> Acc2::any().
+-spec run_fold(HookName :: atom(), Fun :: atom(), Args :: list(), Acc :: any()) -> Acc2 :: any().
 run_fold(HookName, Fun, Args, Acc) ->
     case find_hooks(HookName) of
         no_hook -> Acc;
-        Hooks -> run_fold1(Hooks,HookName, Fun, Args, Acc)
+        Hooks -> run_fold1(Hooks, HookName, Fun, Args, Acc)
     end.
 
+run_fold(HookName, Fun, Args, Acc, AppendExtension) ->
+    case find_hooks(HookName) of
+        no_hook -> Acc;
+        Hooks ->
+            run_fold1(Hooks ++ AppendExtension, HookName, Fun, Args, Acc)
+    end.
 
-run_fold1([], _HookName,_Fun, _Args,  Acc) ->
+run_fold1([], _HookName, _Fun, _Args, Acc) ->
     Acc;
-run_fold1([M | Rest], HookName,Fun, Args0,  Acc) ->
+run_fold1([M | Rest], HookName, Fun, Args0, Acc) ->
     Args = Args0 ++ [Acc],
     Ret = (catch apply(M, Fun, Args)),
     case Ret of
         {'EXIT', Reason} ->
-            error_logger:error_msg("~p~n error running hook: ~p~n",
-                [HookName, Reason]),
-            run_fold1(Rest, HookName,Fun,Args0, Acc);
+            logger:error("~p~n error running hook: ~p~n", [HookName, Reason]),
+            run_fold1(Rest, HookName, Fun, Args0, Acc);
         stop ->
             Acc;
         {stop, NewAcc} ->
             NewAcc;
         _ ->
-            run_fold1(Rest, HookName,Fun,Args0, Ret)
+            run_fold1(Rest, HookName, Fun, Args0, Ret)
     end.
 
+invoke_foldr(HookName, Fun, Args, Acc) ->
+    case find_hooks(HookName) of
+        no_hook -> Acc;
+        Hooks ->
+            do_invoke(lists:reverse(Hooks), HookName, Fun, Args, Acc)
+    end.
 
+invoke(HookName, Fun, Args, Acc, AppendExtension) ->
+    case find_hooks(HookName) of
+        no_hook -> Acc;
+        Hooks ->
+            do_invoke(Hooks ++ AppendExtension, HookName, Fun, Args, Acc)
+    end.
+
+do_invoke([], _HookName, _Fun, _Args, Acc) ->
+    Acc;
+do_invoke([M | Rest], HookName, Fun, Args0, Acc) ->
+    Args = Args0 ++ [Acc],
+    Ret = (catch apply(M, Fun, Args)),
+    case Ret of
+        {'EXIT', Reason} ->
+            logger:error("~p~n error running hook: ~p~n", [HookName, Reason]),
+            do_invoke(Rest, HookName, Fun, Args0, Acc);
+        stop ->
+            Acc;
+        {stop, NewAcc} ->
+            NewAcc;
+        {ok, Args2, NewAcc2} ->
+            do_invoke(Rest, HookName, Fun, [Args2], NewAcc2)
+    end.
 
 
 %% @doc retrieve the lists of registered functions for an hook.
--spec find(HookName::hookname()) -> {ok, [{atom(), atom()}]} | error.
+-spec find(HookName :: atom()) -> {ok, [{atom(), atom()}]} | error.
 find(HookName) ->
-    case ?find_hook(HookName) of
+    case find_hooks(HookName) of
         no_hook -> error;
         Hooks -> {ok, Hooks}
     end.
@@ -144,7 +178,7 @@ code_change(_OldVsn, State, _Extra) ->
 terminate(_Reason, _Srv) ->
     ok.
 
-do_register(HookName, {_Priority, ModuleName}=Hook) ->
+do_register(HookName, {_Priority, ModuleName} = Hook) ->
     check_module(ModuleName),
     update_hooks(HookName, [Hook]).
 
@@ -180,10 +214,11 @@ check_module(ModuleName) ->
     _ = code:ensure_loaded(ModuleName),
     ok.
 
-find_hooks(HookName)->
-    case ets:lookup(?TAB,HookName) of
-        []->
+find_hooks(HookName) ->
+    case ets:lookup(?TAB, HookName) of
+        [] ->
             no_hook;
-        [{_, Modules}]->
-            Modules
+        [{_, Modules}] ->
+            Modules1 = [Module || {_, Module} <- Modules],
+            Modules1
     end.
diff --git a/src/dubbo_filter.erl b/src/dubbo_filter.erl
index 7fa7950..c0c1a4f 100644
--- a/src/dubbo_filter.erl
+++ b/src/dubbo_filter.erl
@@ -15,7 +15,14 @@
 %% limitations under the License.
 %%------------------------------------------------------------------------------
 -module(dubbo_filter).
--author("dlive").
+-include("dubbo.hrl").
 
+-type filter_result() :: {stop, term()}|{error, term()}|term().
+
+-callback(invoke(Invocation :: #dubbo_rpc_invocation{}, Acc :: invoke_result()) -> filter_result()).
+
+-callback(on_response(Invocation :: invocation(), Result :: invoke_result()) -> filter_result()).
 %% API
 -export([]).
+
+
diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker.erl
index f01cec4..9981f2d 100644
--- a/src/dubbo_invoker.erl
+++ b/src/dubbo_invoker.erl
@@ -21,11 +21,8 @@
 -export([]).
 
 
--callback(invoke(Invoker,Invocation) -> ok).
-
-
 %% API
--export([invoke_request/2, invoke_request/3, invoke_request/5]).
+-export([invoke_request/2, invoke_request/3, invoke_request/5, invoke_response/2]).
 
 -spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) ->
     {ok, reference()}|
@@ -39,7 +36,8 @@ invoke_request(Interface, Request) ->
     {ok, reference(), Data :: any(), RpcContent :: list()}|
     {error, Reason :: timeout|no_provider|any()}.
 invoke_request(Interface, Request, RequestOption) ->
-    invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
+    invoke_request(Interface, Request, RequestOption, self()).
+%%    invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
 
 
 -spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
@@ -68,6 +66,27 @@ invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
             {error, no_provider}
     end.
 
+invoke_request(Interface, Request, RequestOption, CallBackPid) ->
+    case dubbo_provider_consumer_reg_table:get_interface_info(Interface) of
+        undefined ->
+            {error, no_provider};
+        #interface_info{protocol = Protocol, loadbalance = LoadBalance} ->
+            ReferenceConfig = #reference_config{sync = is_sync(RequestOption)},
+            Ref = get_ref(RequestOption),
+            Invocation = Request#dubbo_request.data#dubbo_rpc_invocation{
+                loadbalance = LoadBalance,
+                call_ref = Ref,
+                reference_ops = ReferenceConfig,
+                source_pid = CallBackPid
+            },
+            Result = dubbo_extension:invoke(filter, invoke, [Invocation], {ok, Ref}, [Protocol]),
+            Result
+    end.
+
+invoke_response(Invocation, Result) ->
+    Result2 = dubbo_extension:invoke_foldr(filter, do_response, [Invocation], Result),
+    gen_server:cast(Invocation#dubbo_rpc_invocation.source_pid, {response_process, Invocation#dubbo_rpc_invocation.call_ref, Invocation#dubbo_rpc_invocation.attachments, Result2}),
+    ok.
 
 is_sync(Option) ->
     maps:is_key(sync, Option).
diff --git a/src/dubbo_invoker_cluster.erl b/src/dubbo_invoker_cluster.erl
deleted file mode 100644
index 906a0fa..0000000
--- a/src/dubbo_invoker_cluster.erl
+++ /dev/null
@@ -1,21 +0,0 @@
-%%------------------------------------------------------------------------------
-%% Licensed to the Apache Software Foundation (ASF) under one or more
-%% contributor license agreements.  See the NOTICE file distributed with
-%% this work for additional information regarding copyright ownership.
-%% The ASF licenses this file to You under the Apache License, Version 2.0
-%% (the "License"); you may not use this file except in compliance with
-%% the License.  You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%------------------------------------------------------------------------------
--module(dubbo_invoker_cluster).
--author("dlive").
-
-%% API
--export([]).
diff --git a/src/dubbo_invoker_old.erl b/src/dubbo_invoker_old.erl
deleted file mode 100644
index 354cef9..0000000
--- a/src/dubbo_invoker_old.erl
+++ /dev/null
@@ -1,97 +0,0 @@
-%%------------------------------------------------------------------------------
-%% Licensed to the Apache Software Foundation (ASF) under one or more
-%% contributor license agreements.  See the NOTICE file distributed with
-%% this work for additional information regarding copyright ownership.
-%% The ASF licenses this file to You under the Apache License, Version 2.0
-%% (the "License"); you may not use this file except in compliance with
-%% the License.  You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%------------------------------------------------------------------------------
--module(dubbo_invoker_old).
-
--include("dubbo.hrl").
-%% API
--export([invoke_request/2, invoke_request/3, invoke_request/5]).
-
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) ->
-    {ok, reference()}|
-    {ok, reference(), Data :: any(), RpcContent :: list()}|
-    {error, Reason :: timeout|no_provider|any()}.
-invoke_request(Interface, Request) ->
-    invoke_request(Interface, Request, [], #{}, self()).
-
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RequestOption :: map()) ->
-    {ok, reference()}|
-    {ok, reference(), Data :: any(), RpcContent :: list()}|
-    {error, Reason :: timeout|no_provider|any()}.
-invoke_request(Interface, Request, RequestOption) ->
-    invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
-
-
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
-    {ok, reference()}|
-    {ok, reference(), Data :: any(), RpcContent :: list()}|
-    {error, Reason :: timeout|no_provider|request_full|any()}.
-invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
-    case dubbo_provider_consumer_reg_table:select_connection(Interface, Request#dubbo_request.mid) of
-        {ok, #connection_info{pid = Pid, host_flag = HostFlag}} ->
-            case dubbo_traffic_control:check_goon(HostFlag, 199) of
-                ok ->
-                    Request2 = merge_attachments(Request, RpcContext),
-                    {ok, RequestData} = dubbo_codec:encode_request(Request2),
-                    Ref = get_ref(RequestState),
-                    gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}),
-                    case is_sync(RequestState) of
-                        true ->
-                            sync_receive(Ref, get_timeout(RequestState));
-                        false -> {ok, Ref}
-                    end;
-                full ->
-                    {error, request_full}
-            end;
-        {error, none} ->
-            logger:error("[INVOKE] ~p error Reason no_provider", [Interface]),
-            {error, no_provider}
-    end.
-
-
-is_sync(Option) ->
-    maps:is_key(sync, Option).
-get_ref(Option) ->
-    maps:get(ref, Option, make_ref()).
-
-get_timeout(Option) ->
-    maps:get(timeout, Option, ?REQUEST_TIME_OUT).
-
-
-sync_receive(Ref, TimeOut) ->
-    receive
-        {'$gen_cast', {response_process, Ref, RpcContent, Response}} ->
-            {ok, Ref, Response, RpcContent}
-    after
-        TimeOut ->
-            {error, timeout}
-    end.
-merge_attachments(#dubbo_request{data = null} = Request, _Option) ->
-    Request;
-merge_attachments(Request, Option) ->
-    Attachements = Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
-    case lists:keyfind(attachments, 1, Option) of
-        false -> OptionAttachments = [];
-        {attachments, OptionAttachments} ->
-            OptionAttachments
-    end,
-    List = [
-        {<<"version">>, <<"0.0.0">>},
-        {<<"timeout">>, <<"5000">>}
-    ],
-    Attachements2 = lists:merge3(Attachements, OptionAttachments, List),
-    Data2 = Request#dubbo_request.data#dubbo_rpc_invocation{attachments = Attachements2},
-    Request#dubbo_request{data = Data2}.
diff --git a/src/dubbo_loadbalance_random.erl b/src/dubbo_loadbalance_random.erl
index 21d4f61..ebd9dd1 100644
--- a/src/dubbo_loadbalance_random.erl
+++ b/src/dubbo_loadbalance_random.erl
@@ -17,4 +17,10 @@
 -module(dubbo_loadbalance_random).
 
 %% API
--export([]).
+-export([select/1]).
+
+select(List) ->
+    RandNum = rand:uniform(65535),
+    Len = length(List),
+    RemNum = (RandNum rem Len) + 1,
+    lists:nth(RemNum, List).
diff --git a/src/dubbo_netty_client.erl b/src/dubbo_netty_client.erl
deleted file mode 100644
index 06f9d1e..0000000
--- a/src/dubbo_netty_client.erl
+++ /dev/null
@@ -1,430 +0,0 @@
-%%------------------------------------------------------------------------------
-%% Licensed to the Apache Software Foundation (ASF) under one or more
-%% contributor license agreements.  See the NOTICE file distributed with
-%% this work for additional information regarding copyright ownership.
-%% The ASF licenses this file to You under the Apache License, Version 2.0
-%% (the "License"); you may not use this file except in compliance with
-%% the License.  You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%------------------------------------------------------------------------------
--module(dubbo_netty_client).
-
--behaviour(gen_server).
-
--include("dubbo.hrl").
-%% API
--export([start_link/4]).
-
-%% gen_server callbacks
--export([init/1,
-    handle_call/3,
-    handle_cast/2,
-    handle_info/2,
-    terminate/2,
-    code_change/3]).
--export([check_recv_data/2]).
-
--define(SERVER, ?MODULE).
-
--record(heartbeat, {last_write = 0, last_read = 0, timeout = 60000, max_timeout = 180000}).
--record(state, {provider_config, socket = undefined,
-    heartbeat = #heartbeat{},
-    recv_buffer = <<>>,         %%从服务端接收的数据
-    host_flag,
-    reconnection_timer
-}).
-
-%%%===================================================================
-%%% API
-%%%===================================================================
-
-%%--------------------------------------------------------------------
-%% @doc
-%% Starts the server
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(start_link(Name :: binary(), HostFlag :: binary(), ProviderConfig :: #provider_config{}, integer()) ->
-    {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
-start_link(Name, HostFlag, ProviderConfig, Index) ->
-    gen_server:start_link({local, Name}, ?MODULE, [HostFlag, ProviderConfig, Index], []).
-
-%%%===================================================================
-%%% gen_server callbacks
-%%%===================================================================
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Initializes the server
-%%
-%% @spec init(Args) -> {ok, State} |
-%%                     {ok, State, Timeout} |
-%%                     ignore |
-%%                     {stop, Reason}
-%% @end
-%%--------------------------------------------------------------------
--spec(init(Args :: term()) ->
-    {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term()} | ignore).
-init([HostFlag, ProviderConfig, Index]) ->
-    erlang:process_flag(min_bin_vheap_size, 1024 * 1024),
-    #provider_config{host = Host, port = Port} = ProviderConfig,
-    State = case open(Host, Port) of
-                {ok, Socket} ->
-                    #state{socket = Socket};
-                {error, _Reason} ->
-                    #state{}
-            end,
-    NowStamp = dubbo_time_util:timestamp_ms(),
-    HeartBeatInfo = #heartbeat{last_read = NowStamp, last_write = NowStamp},
-    logger:info("netty client start ~p", [HostFlag]),
-    start_heartbeat_timer(HeartBeatInfo),
-    {ok, State#state{provider_config = ProviderConfig, heartbeat = HeartBeatInfo, host_flag = HostFlag}}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling call messages
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
-    State :: #state{}) ->
-    {reply, Reply :: term(), NewState :: #state{}} |
-    {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
-    {noreply, NewState :: #state{}} |
-    {noreply, NewState :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
-    {stop, Reason :: term(), NewState :: #state{}}).
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling cast messages
-%%
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_cast(Request :: term(), State :: #state{}) ->
-    {noreply, NewState :: #state{}} |
-    {noreply, NewState :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term(), NewState :: #state{}}).
-
-handle_cast({send_request, Ref, Request, Data, SourcePid, RequestState}, State) ->
-    logger:debug("[send_request begin] send data to provider consumer mid ~p pid ~p sourcePid ~p", [Request#dubbo_request.mid, self(), SourcePid]),
-    NewState = case send_msg(Data, State) of
-                   ok ->
-                       save_request_info(Request, SourcePid, Ref, RequestState),
-                       logger:debug("[send_request end] send data to provider consumer pid ~p state ok", [self()]),
-                       State;
-                   {error, closed} ->
-                       logger:warning("send request error, connection is closed"),
-                       State2 = reconnect(State),
-                       State2;
-                   {error, R1} ->
-                       logger:error("[send_request end] send data to provider consumer pid error ~p ~p", [self(), R1]),
-                       State
-               end,
-    HeartbeatInfo = update_heartbeat(write, NewState#state.heartbeat),
-    {noreply, NewState#state{heartbeat = HeartbeatInfo}};
-
-handle_cast(_Request, State) ->
-    {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Handling all non call/cast messages
-%%
-%% @spec handle_info(Info, State) -> {noreply, State} |
-%%                                   {noreply, State, Timeout} |
-%%                                   {stop, Reason, State}
-%% @end
-%%--------------------------------------------------------------------
--spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
-    {noreply, NewState :: #state{}} |
-    {noreply, NewState :: #state{}, timeout() | hibernate} |
-    {stop, Reason :: term(), NewState :: #state{}}).
-
-
-handle_info({tcp, _Port, Data}, #state{recv_buffer = RecvBuffer} = State) ->
-%%    inet:setopts(State#state.socket, [{active, once}]),
-%%    logger:debug("[INFO] recv one data ~w",[Data]),
-    {ok, NextBuffer, NewState} = case check_recv_data(<<RecvBuffer/binary, Data/binary>>, State) of
-                                     {next_buffer, NextBuffer2, State3} ->
-                                         logger:debug("[INFO] recv one data state wait next_buffer"),
-                                         {ok, NextBuffer2, State3}
-                                 end,
-%%    HeartbeatInfo =update_heartbeat(write,NewState#state.heartbeat),
-    {noreply, NewState#state{recv_buffer = NextBuffer}};
-handle_info({tcp_closed, Port}, State) ->
-    logger:info("dubbo connection closed ~p", [Port]),
-    NewState = reconnect(State),
-    {noreply, NewState};
-handle_info({timeout, _TimerRef, {reconnect}}, State) ->
-    NewState = reconnect(State#state{reconnection_timer = undefined}),
-    {noreply, NewState};
-handle_info({timeout, _TimerRef, {heartbeat_timer}}, State) ->
-    {ok, NewState} = case check_heartbeat_state(State) of
-                         {normal} -> {ok, State};
-                         {send_heart} ->
-                             send_heartbeat_msg(undefined, true, State);
-                         {reconnect} ->
-                             %% @todo reconnect
-                             {ok, State}
-                     end,
-    HeartbeatInfo = update_heartbeat(write, NewState#state.heartbeat),
-    start_heartbeat_timer(HeartbeatInfo),
-    {noreply, NewState#state{heartbeat = HeartbeatInfo}};
-handle_info(_Info, State) ->
-    logger:warning("[INFO] get one info:~p", [_Info]),
-%%    inet:setopts(State#state.socket, [{active, once}]),
-%%    case State#state.tmp_pid of
-%%        undefined  ->ok;
-%%        Pid ->
-%%            gen_server:cast(Pid,{msg_back})
-%%    end,
-    HeartbeatInfo = update_heartbeat(write, State#state.heartbeat),
-    {noreply, State#state{heartbeat = HeartbeatInfo}}.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any
-%% necessary cleaning up. When it returns, the gen_server terminates
-%% with Reason. The return value is ignored.
-%%
-%% @spec terminate(Reason, State) -> void()
-%% @end
-%%--------------------------------------------------------------------
--spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
-    State :: #state{}) -> term()).
-terminate(_Reason, _State) ->
-    logger:warning("terminate reason:~p", [_Reason]),
-    ok.
-
-%%--------------------------------------------------------------------
-%% @private
-%% @doc
-%% Convert process state when code is changed
-%%
-%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% @end
-%%--------------------------------------------------------------------
--spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
-    Extra :: term()) ->
-    {ok, NewState :: #state{}} | {error, Reason :: term()}).
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%%===================================================================
-%%% Internal functions
-%%%===================================================================
-
-open(Host, Port) ->
-    logger:debug("will connect to provider ~p ~p", [Host, Port]),
-    %
-    case gen_tcp:connect(Host, Port, [
-        binary,
-        {packet, 0}, {active, false},
-        {reuseaddr, true},
-        {delay_send, true},
-        {nodelay, true},
-        {high_watermark, 512 * 1024},
-        {low_watermark, 256 * 1024},
-        {sndbuf, 512 * 1024},
-        {recbuf, 512 * 1024}
-    ]) of
-        {ok, Sockets} ->
-            inet:setopts(Sockets, [{active, true}]),
-            {ok, Sockets};
-        Info ->
-            logger:error("start client connection error ~p", [Info]),
-            {error, Info}
-    end.
-
-reconnect(#state{reconnection_timer = Timer} = State) when Timer /= undefined ->
-    State;
-reconnect(State) ->
-    #provider_config{host = Host, port = Port} = State#state.provider_config,
-    case State#state.socket of
-        undefined -> ok;
-        Socket ->
-            gen_tcp:close(Socket)
-    end,
-    case open(Host, Port) of
-        {ok, Socket2} ->
-            logger:warning("reconnect to provider ~p ~p success", [Host, Port]),
-            State#state{socket = Socket2, recv_buffer = <<>>};
-        {error, Reason} ->
-            logger:warning("connect to provider error ~p", [Reason]),
-            TimerRef = erlang:start_timer(2000, self(), {reconnect}),
-            State#state{socket = undefined, reconnection_timer = TimerRef}
-    end.
-
-send_msg(Msg, State) ->
-    case State#state.socket of
-        undefined ->
-            {error, closed};
-        Socket ->
-            case gen_tcp:send(Socket, Msg) of
-                ok ->
-                    ok;
-                {error, Reason} ->
-                    logger:error("send to server error,reason:~p", [Reason]),
-                    {error, Reason}
-            end
-    end.
-
-%%%=================================================================
-%%% 心跳检测
-%%%=================================================================
-start_heartbeat_timer(HeartbeatInfo) ->
-    erlang:start_timer(HeartbeatInfo#heartbeat.timeout, self(), {heartbeat_timer}),
-    ok.
-update_heartbeat(write, Info) ->
-    Info#heartbeat{last_write = dubbo_time_util:timestamp_ms()};
-update_heartbeat(read, Info) ->
-    Info#heartbeat{last_read = dubbo_time_util:timestamp_ms()}.
-
-
-check_heartbeat_state(#state{heartbeat = HeartBeatInfo} = _State) ->
-    Now = dubbo_time_util:timestamp_ms(),
-    #heartbeat{last_read = LastRead, last_write = LastWrite, timeout = Timeout, max_timeout = MaxTimeout} = HeartBeatInfo,
-    if
-        (Now - LastRead) > Timeout ->
-            {send_heart};
-        (Now - LastWrite) > Timeout ->
-            {send_heart};
-        (Now - LastRead) > MaxTimeout ->
-            {reconnect};
-        true ->
-            {normal}
-    end.
-
-
-send_heartbeat_msg(Mid, NeedResponse, State) ->
-    {ok, Bin} = dubbo_heartbeat:generate_request(Mid, NeedResponse),
-    NewState = case send_msg(Bin, State) of
-                   ok ->
-                       logger:info("send one heartbeat to server"),
-                       State;
-                   {error, Reason} ->
-                       logger:warning("dubbo connection send heartbeat error ~p", [Reason]),
-                       State2 = reconnect(State),
-                       State2
-               end,
-    {ok, NewState}.
-
-%%%=================================================================
-%%% 接收数据处理
-%%%=================================================================
--spec check_recv_data(Data :: binary(), State :: #state{}) -> {ready, ReadyData :: binary()} | {ready, ReadyData :: binary(), NextBuffer :: binary()}.
-check_recv_data(<<?DUBBO_MEGIC_HIGH, ?DUBBO_MEGIC_LOW, Rest/binary>> = Data, State) when byte_size(Rest) < 14 ->
-    {next_buffer, Data, State};
-check_recv_data(<<?DUBBO_MEGIC_HIGH, ?DUBBO_MEGIC_LOW, _OtherFlag:80, DataLen:32, Rest/binary>> = Data, State) ->
-    RestSize = byte_size(Rest),
-    if
-        DataLen == RestSize ->
-            {ok, State2} = process_data(Data, State),
-            {next_buffer, <<>>, State2};
-        DataLen > RestSize ->
-            logger:warning("need wait next buffer data ~p", [Data]),
-            {next_buffer, Data, State};
-        DataLen < RestSize ->
-            <<ReadyData:DataLen/binary, NextBuffer/binary>> = Rest,
-            OneData = <<?DUBBO_MEGIC_HIGH:8, ?DUBBO_MEGIC_LOW:8, _OtherFlag:80, DataLen:32, ReadyData/binary>>,
-            {ok, State3} = process_data(OneData, State),
-%%            logger:warning("recevi more data ~w ",[NextBuffer]),
-            check_recv_data(NextBuffer, State3)
-    end;
-check_recv_data(<<Error/integer, Data/binary>>, State) ->
-    logger:error("recv bad header data,Begin Byte:~p", [Error]),
-    check_recv_data(Data, State);
-check_recv_data(<<>>, State) ->
-    {next_buffer, <<>>, State}.
-
-
-process_data(Data, State) ->
-    <<Header:16/binary, RestData/binary>> = Data,
-    case dubbo_codec:decode_header(Header) of
-        {ok, response, ResponseInfo} ->
-            process_response(ResponseInfo#dubbo_response.is_event, ResponseInfo, RestData, State),
-%%            dubbo_traffic_control:decr_count(State#state.host_flag),
-%%            case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
-%%                undefined->
-%%                    logger:error("dubbo response can't find request data,response ~p",[ResponseInfo]);
-%%                {SourcePid,Ref,_RequestState} ->
-%%                    {ok,Res} = dubbo_codec:decode_response(ResponseInfo,RestData),
-%%
-%%                    logger:info("got one response mid ~p, is_event ~p state ~p",[Res#dubbo_response.mid,Res#dubbo_response.is_event,Res#dubbo_response.state]),
-%%                    case Res#dubbo_response.is_event of
-%%                        false ->
-%%                            %% todo rpccontent need merge response with request
-%%                            RpcContent=[],
-%%                            ResponseData = dubbo_type_transfer:response_to_native(Res),
-%%                            gen_server:cast(SourcePid,{response_process,Ref,RpcContent,ResponseData});
-%%                        _->
-%%                            ok
-%%                    end
-%%            end,
-            {ok, State};
-        {ok, request, RequestInfo} ->
-            {ok, Req} = dubbo_codec:decode_request(RequestInfo, RestData),
-            logger:info("get one request mid ~p, is_event ~p", [Req#dubbo_request.mid, Req#dubbo_request.is_event]),
-            {ok, State2} = process_request(Req#dubbo_request.is_event, Req, State),
-            {ok, State2};
-        {error, Type, RelData} ->
-            logger:error("process_data error type ~p RelData ~p", [Type, RelData]),
-            {ok, State}
-    end.
-
-
-%% @doc process event
--spec process_response(IsEvent :: boolean(), #dubbo_response{}, #state{}, term()) -> ok.
-process_response(false, ResponseInfo, RestData, State) ->
-    dubbo_traffic_control:decr_count(State#state.host_flag),
-    case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
-        undefined ->
-            logger:error("dubbo response can't find request data,response ~p", [ResponseInfo]);
-        {SourcePid, Ref, _RequestState} ->
-            {ok, Res} = dubbo_codec:decode_response(ResponseInfo, RestData),
-            logger:info("got one response mid ~p, is_event ~p state ~p", [Res#dubbo_response.mid, Res#dubbo_response.is_event, Res#dubbo_response.state]),
-            case Res#dubbo_response.is_event of
-                false ->
-                    %% todo rpccontent need merge response with request
-                    RpcContent = [],
-                    ResponseData = dubbo_type_transfer:response_to_native(Res),
-                    gen_server:cast(SourcePid, {response_process, Ref, RpcContent, ResponseData});
-                _ ->
-                    ok
-            end
-    end,
-    {ok, State};
-process_response(true, _ResponseInfo, _RestData, State) ->
-    {ok, State}.
-
-process_request(true, #dubbo_request{data = <<"R">>}, State) ->
-    {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
-    {ok, State};
-process_request(true, Request, State) ->
-    {ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State),
-    {ok, NewState};
-process_request(false, Request, State) ->
-    {ok, State}.
-
-
-save_request_info(Request, SourcePid, Ref, RequestState) ->
-    put(Request#dubbo_request.mid, {SourcePid, Ref, RequestState}).
-get_earse_request_info(Mid) ->
-    erase(Mid).
\ No newline at end of file
diff --git a/src/dubbo_node_config_util.erl b/src/dubbo_node_config_util.erl
index 32207da..9e3108a 100644
--- a/src/dubbo_node_config_util.erl
+++ b/src/dubbo_node_config_util.erl
@@ -17,30 +17,19 @@
 -module(dubbo_node_config_util).
 
 -include("dubbo.hrl").
+-include("dubboerl.hrl").
 %% API
 -export([parse_provider_info/1, gen_provider_info/1]).
 
-parse_provider_info(ProviderStr) when is_binary(ProviderStr) ->
-    parse_provider_info(binary_to_list(ProviderStr));
-parse_provider_info(ProviderStr) ->
-    case http_uri:parse(http_uri:decode(ProviderStr), [{scheme_defaults, [{dubbo, 20880}]}]) of
-        {ok, {Scheme, _UserInfo, Host, Port, _Path, Query}} ->
-            QueryStr = case lists:prefix("?", Query) of
-                           true ->
-                               [_ | Query2] = Query,
-                               Query2;
-                           false ->
-                               Query
-                       end,
-            QueryListTmp = string:tokens(QueryStr, "&"),
-            ProviderConfig = parse_parameter(QueryListTmp, #provider_config{protocol = Scheme, host = Host, port = Port}),
-            logger:debug("parse provider info string ~p,result: ~p", [ProviderStr, ProviderConfig]),
-            {ok, ProviderConfig};
-        {error, R1} ->
-            logger:debug("parse provider error string ~p, error ~p", [ProviderStr, R1]),
-            {error, R1}
-    end.
-
+parse_provider_info(#dubbo_url{scheme = Scheme, host = Host, port = Port, parameters = Parameters}) ->
+    ProviderInfo = #provider_config{protocol = Scheme, host = Host, port = Port},
+    maps:to_list(Parameters),
+    ProviderConfig = maps:fold(
+        fun(K, V, Acc1) ->
+            parse_parameter(K, V, Acc1)
+        end, ProviderInfo, Parameters),
+    logger:debug("parse provider,result: ~p", [ProviderConfig]),
+    {ok, ProviderConfig}.
 
 parse_parameter([], Config) ->
     Config;
@@ -54,24 +43,23 @@ parse_parameter([Item | Rest], Config) ->
             logger:error("parse parameter error, keypair ~p", [KeyPair2]),
             parse_parameter(Rest, Config)
     end.
-parse_parameter("anyhost", Value, Config) ->
-    Config#provider_config{anyhost = list_to_atom(Value)};
-parse_parameter("application", Value, Config) ->
-    Config#provider_config{application = list_to_binary(Value)};
-parse_parameter("dubbo", Value, Config) ->
-    Config#provider_config{dubbo = list_to_binary(Value)};
-parse_parameter("executes", Value, Config) ->
-    Config#provider_config{executes = list_to_integer(Value)};
-parse_parameter("interface", Value, Config) ->
-    Config#provider_config{interface = list_to_binary(Value)};
-parse_parameter("methods", Value, Config) ->
-    MethodList = string:tokens(Value, ","),
-    MethodList2 = [list_to_binary(Item) || Item <- MethodList],
-    Config#provider_config{methods = MethodList2};
-parse_parameter("side", Value, Config) ->
-    Config#provider_config{side = list_to_binary(Value)};
-parse_parameter("interface", Value, Config) ->
-    Config#provider_config{interface = list_to_binary(Value)};
+parse_parameter(<<"anyhost">>, Value, Config) ->
+    Config#provider_config{anyhost = binary_to_existing_atom(Value, latin1)};
+parse_parameter(<<"application">>, Value, Config) ->
+    Config#provider_config{application = Value};
+parse_parameter(<<"dubbo">>, Value, Config) ->
+    Config#provider_config{dubbo = Value};
+parse_parameter(<<"executes">>, Value, Config) ->
+    Config#provider_config{executes = binary_to_integer(Value)};
+parse_parameter(<<"interface">>, Value, Config) ->
+    Config#provider_config{interface = Value};
+parse_parameter(<<"methods">>, Value, Config) ->
+    MethodList = binary:split(Value, <<",">>, [global, trim_all]),
+    Config#provider_config{methods = MethodList};
+parse_parameter(<<"side">>, Value, Config) ->
+    Config#provider_config{side = Value};
+parse_parameter(<<"interface">>, Value, Config) ->
+    Config#provider_config{interface = Value};
 parse_parameter(_, _, Config) ->
     Config.
 
diff --git a/src/dubbo_protocol.erl b/src/dubbo_protocol.erl
index 7ecfcd8..9a5e659 100644
--- a/src/dubbo_protocol.erl
+++ b/src/dubbo_protocol.erl
@@ -16,11 +16,8 @@
 %%------------------------------------------------------------------------------
 -module(dubbo_protocol).
 
--callback refer(Url,Acc)->ok.
+-include("dubbo.hrl").
 
-%%%% API
-%%-export([refer/2]).
-%%
-%%
-%%refer(InterfaceClassInfo,Url)->
-%%    dubbo_hooker(protocol_wapper,refer,[InterfaceClassInfo,Url]).
\ No newline at end of file
+-callback refer(Url :: binary(), Acc :: term()) -> ok.
+
+-callback export(Invoker :: #invoker{}, Acc :: term()) -> ok.
diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl
index 96947cc..fc6a224 100644
--- a/src/dubbo_protocol_dubbo.erl
+++ b/src/dubbo_protocol_dubbo.erl
@@ -20,13 +20,14 @@
 -include("dubbo.hrl").
 
 %% API
--export([refer/2]).
+-export([refer/2, invoke/2, data_receive/1]).
+-export([export/2]).
 
 refer(Url, Acc) ->
     {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
     case UrlInfo#dubbo_url.scheme of
         <<"dubbo">> ->
-            {ok,Invoker} = do_refer(UrlInfo),
+            {ok, Invoker} = do_refer(UrlInfo),
             {ok, Invoker};
         _ ->
             {skip, Acc}
@@ -38,9 +39,9 @@ do_refer(UrlInfo) ->
 %%            OldHostList = dubbo_provider_consumer_reg_table:get_interface_provider_node(ProviderConfig#provider_config.interface),
             case getClients(ProviderConfig) of
                 {ok, ConnectionInfoList} ->
-                    dubbo_provider_consumer_reg_table:update_node_conections(ProviderConfig#provider_config.interface,ConnectionInfoList),
+                    dubbo_provider_consumer_reg_table:update_node_conections(ProviderConfig#provider_config.interface, ConnectionInfoList),
                     HostFlag = dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
-                    {ok,#dubbo_invoker{host_flag = HostFlag,handle = ?MODULE}};
+                    {ok, #dubbo_invoker{host_flag = HostFlag, handle = ?MODULE}};
                 {error, Reason} ->
                     {error, Reason}
             end;
@@ -49,30 +50,30 @@ do_refer(UrlInfo) ->
             {error, R1}
     end.
 
+export(Invoker, _Acc) ->
+    registry_provider_impl_module(Invoker),
+    ok = service_listen_check_start(Invoker#invoker.url),
+    {ok, Invoker}.
+
 getClients(ProviderConfig) ->
     %% @todo if connections parameter > 1, need new spec transport
     case new_transport(ProviderConfig) of
         {ok, ConnectionInfoList} ->
-%%            ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
             {ok, ConnectionInfoList};
         {error, Reason} ->
             {error, Reason}
     end.
 
 
-%%ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),
-
-
 new_transport(ProviderConfig) ->
 
-    HostFlag = get_host_flag(ProviderConfig),
-    case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of
+    case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config.host, ProviderConfig#provider_config.port) of
         [] ->
             case dubbo_exchanger:connect(ProviderConfig, ?MODULE) of
                 {ok, ConnectionInfo} ->
                     {ok, [ConnectionInfo]};
                 {error, Reason} ->
-                    logger:warning("start client fail ~p ~p", [Reason, HostFlag]),
+                    logger:warning("start client fail ~p ~p ~p", [Reason, ProviderConfig#provider_config.host, ProviderConfig#provider_config.port]),
                     {error, Reason}
             end;
         ConnectionInfoList ->
@@ -82,3 +83,102 @@ new_transport(ProviderConfig) ->
 
 
 
+invoke(#dubbo_rpc_invocation{source_pid = CallBackPid, transport_pid = TransportPid, call_ref = Ref} = Invocation, Acc) ->
+
+%%    Request2 = merge_attachments(Request, RpcContext), %% @todo need add rpc context to attachment
+    Request = dubbo_adapter:reference(Invocation),
+    {ok, RequestData} = dubbo_codec:encode_request(Request),
+    gen_server:cast(TransportPid, {send_request, Ref, Request, RequestData, CallBackPid, Invocation}),
+    {ok, Invocation, Acc}.
+%%    case is_sync(RequestState) of
+%%        true ->
+%%            sync_receive(Ref, get_timeout(RequestState));
+%%        false -> {ok, Ref}
+%%    end.
+
+
+
+-spec(data_receive(binary()) -> ok|{do_heartbeat, Mid :: integer()}).
+data_receive(Data) ->
+    <<Header:16/binary, RestData/binary>> = Data,
+    case dubbo_codec:decode_header(Header) of
+        {ok, response, ResponseInfo} ->
+            process_response(ResponseInfo#dubbo_response.is_event, ResponseInfo, RestData);
+        {ok, request, RequestInfo} ->
+            {ok, Req} = dubbo_codec:decode_request(RequestInfo, RestData),
+            logger:info("get one request mid ~p, is_event ~p", [Req#dubbo_request.mid, Req#dubbo_request.is_event]),
+            process_request(Req#dubbo_request.is_event, Req);
+        {error, Type, RelData} ->
+            logger:error("process_data error type ~p RelData ~p", [Type, RelData]),
+            ok
+    end.
+
+
+%% @doc process event
+-spec process_response(IsEvent :: boolean(), #dubbo_response{}, RestData :: binary()) -> ok.
+process_response(false, ResponseInfo, RestData) ->
+%%    dubbo_traffic_control:decr_count(State#state.host_flag),
+
+    %% @todo traffic need move limit filter
+    case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
+        undefined ->
+            logger:error("dubbo response can't find request data,response ~p", [ResponseInfo]);
+        {SourcePid, Ref, Invocation} ->
+            {ok, Res} = dubbo_codec:decode_response(ResponseInfo, RestData),
+            logger:info("got one response mid ~p, is_event ~p state ~p", [Res#dubbo_response.mid, Res#dubbo_response.is_event, Res#dubbo_response.state]),
+            case Res#dubbo_response.is_event of
+                false ->
+                    %% @todo rpccontent need merge response with request
+                    ResponseData = dubbo_type_transfer:response_to_native(Res),
+                    dubbo_invoker:invoke_response(Invocation, ResponseData);
+                _ ->
+                    ok
+            end
+    end,
+    ok;
+process_response(true, _ResponseInfo, _RestData) ->
+    ok.
+
+process_request(true, #dubbo_request{data = <<"R">>}) ->
+    {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
+    ok;
+process_request(true, Request) ->
+    {do_heartbeat, Request#dubbo_request.mid};
+process_request(false, Request) ->
+    ok.
+
+get_earse_request_info(Mid) ->
+    erase(Mid).
+
+
+registry_provider_impl_module(Invoker) ->
+
+    case dubbo_common_fun:parse_url(Invoker#invoker.url) of
+        {ok, UrlInfo} ->
+            Interface = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
+            ok = dubbo_provider_protocol:register_impl_provider(Interface, Invoker#invoker.handler)
+    end.
+
+
+service_listen_check_start(Url) ->
+    case dubbo_common_fun:parse_url(Url) of
+        {ok,#dubbo_url{port = Port}} ->
+            case server_is_start() of
+                true ->
+                    ok;
+                false ->
+                    {ok, _} = ranch:start_listener(dubbo_provider, ranch_tcp, [{port, Port}], dubbo_provider_protocol, []),
+                    ok
+            end;
+        {error,Reason} ->
+            {error,Reason}
+    end.
+
+server_is_start() ->
+    try ranch:get_protocol_options(dubbo_provider) of
+        _ ->
+            true
+    catch
+        _:_ ->
+            false
+    end.
\ No newline at end of file
diff --git a/src/dubbo_protocol_registry.erl b/src/dubbo_protocol_registry.erl
index b2fde17..47174f6 100644
--- a/src/dubbo_protocol_registry.erl
+++ b/src/dubbo_protocol_registry.erl
@@ -18,36 +18,97 @@
 -behaviour(dubbo_protocol).
 
 -include("dubboerl.hrl").
+-include("dubbo.hrl").
+-include("constrans.hrl").
 
-%% API
--export([refer/1]).
 
-refer(Url)->
-    {ok,UrlInfo} =  dubbo_common_fun:parse_url(Url),
+%% API
+-export([refer/2, export/2, destroy/0]).
 
-    {ok,RegistryName} = dubbo_registry:setup_register(UrlInfo),
+refer(Url, Acc) ->
+    {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
+    RegistryUrlInfo = gen_registry_urlinfo(UrlInfo),
+    {ok, RegistryName} = dubbo_registry:setup_register(RegistryUrlInfo),
 
     ConsumerUrl = gen_consumer_url(UrlInfo),
-    %% 通知directory
-    dubbo_registry:register(RegistryName,ConsumerUrl),
+    dubbo_registry:register(RegistryName, ConsumerUrl),
+
+    dubbo_directory:subscribe(RegistryName, ConsumerUrl),
+
+    ok.
+
+export(Invoker, Acc) ->
+    {ok, UrlInfo} = dubbo_common_fun:parse_url(Invoker#invoker.url),
+    %% url = registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hello-world&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.1.5%3A20880%2Forg.apache.dubbo.erlang.sample.service.facade.UserOperator%3Fanyhost%3Dtrue%26application%3Dhello-world%26bean.name%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26bind.ip%3D192.168.1.5%26bind.port%3D20880%26default.deprecated%3Dfalse%26default.dynamic%3Dfalse%26default.register%3Dtrue%26deprecated%3Dfalse%26dubbo%3 [...]
+    ProtocolUrl = get_provider_url(UrlInfo),
+    {ok, InterfaceKey} = do_local_export(Invoker, ProtocolUrl),
+
+    RegistryUrlInfo = gen_registry_urlinfo(UrlInfo),
+    {ok, RegistryName} = dubbo_registry:setup_register(RegistryUrlInfo),
+    dubbo_registry:register(RegistryName, ProtocolUrl),
+
+    register_export_info(ProtocolUrl, RegistryName, InterfaceKey),
+    {ok, Invoker}.
+
+destroy() ->
+    List = ets:tab2list(?SERVICE_EXPORT_TABLE),
+    lists:map(
+        fun(Item) ->
 
-    dubbo_directory:subscribe(RegistryName,ConsumerUrl),
+            {ProtocolUrl, RegistryModule, _} = Item,
+            io:format(user, "destroy url ~p~n", [ProtocolUrl]),
+            unexport(RegistryModule, ProtocolUrl)
+        end, List),
+    ok.
 
-    %% return
+unexport(RegistryModule, Url) ->
+    dubbo_registry:unregister(RegistryModule, Url),
     ok.
 
+do_local_export(Invoker, Url) ->
+    %% Url = dubbo://127.0.0.1:20880/org.apache.dubbo.erlang.sample.service.facade.UserOperator?anyhost=true&application=hello-world&bean.name=org.apache.dubbo.erlang.sample.service.facade.UserOperator&bind.ip=127.0.0.1&bind.port=20880&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid [...]
+    {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
+    Protocol = UrlInfo#dubbo_url.scheme,
+    ProtocolModule = binary_to_existing_atom(<<<<"dubbo_protocol_">>/binary, Protocol/binary>>, latin1),
+    _Result = apply(ProtocolModule, export, [Invoker#invoker{url = Url}, ok]),
+
+    InterfaceKey = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
+
+    {ok, InterfaceKey}.
+
+register_export_info(ProtocolUrl, RegistryModule, InterfaceKey) ->
+    ets:insert(?SERVICE_EXPORT_TABLE, {ProtocolUrl, RegistryModule, InterfaceKey}),
+    ok.
 
-gen_consumer_url(UrlInfo)->
+
+gen_consumer_url(UrlInfo) ->
     Parameters = UrlInfo#dubbo_url.parameters,
     #{<<"refer">> := Refer} = Parameters,
     Refer2 = http_uri:decode(Refer),
-    Parameters2 = dubbo_common_fun:parse_url(Refer2,#{}),
-    #{<<"interface">> := Interface} = Parameters2,
+    Parameters2 = dubbo_common_fun:parse_url_parameter(Refer2),
+    Parameters3 = Parameters2#{
+        ?CATEGORY_KEY => ?CONSUMERS_CATEGORY
+    },
+    #{<<"interface">> := Interface} = Parameters3,
     ConsumerUrlInfo = UrlInfo#dubbo_url{
         scheme = <<"consumer">>,
         host = dubbo_common_fun:local_ip_v4_str(),
         path = Interface,
-        parameters = Parameters2
+        parameters = Parameters3
     },
     ConsumerUrl = dubbo_common_fun:url_to_binary(ConsumerUrlInfo),
-    ConsumerUrl.
\ No newline at end of file
+    ConsumerUrl.
+get_provider_url(UrlInfo) ->
+    ExportUrl = maps:get(<<"export">>, UrlInfo#dubbo_url.parameters),
+    ExportUrl2 = http_uri:decode(ExportUrl),
+    {ok,ExportUrlInfo} = dubbo_common_fun:parse_url(ExportUrl2),
+    ParameterNew = maps:put(?CATEGORY_KEY,?PROVIDERS_CATEGORY,ExportUrlInfo#dubbo_url.parameters),
+    ExportUrlInfoNew = ExportUrlInfo#dubbo_url{parameters = ParameterNew},
+    logger:debug("registry gen provider url info ~p",[ExportUrlInfoNew]),
+    dubbo_common_fun:url_to_binary(ExportUrlInfoNew).
+
+gen_registry_urlinfo(UrlInfo) ->
+    Parameters = UrlInfo#dubbo_url.parameters,
+    UrlInfo#dubbo_url{
+        scheme = maps:get(<<"registry">>, Parameters, <<"zookeeper">>)
+    }.
\ No newline at end of file
diff --git a/src/dubbo_provider_consumer_reg_table.erl b/src/dubbo_provider_consumer_reg_table.erl
index 1c193b8..31c862b 100644
--- a/src/dubbo_provider_consumer_reg_table.erl
+++ b/src/dubbo_provider_consumer_reg_table.erl
@@ -29,15 +29,15 @@
     terminate/2,
     code_change/3]).
 
--export([update_consumer_connections/2,update_node_conections/2,get_interface_provider_node/1,get_host_connections/2, select_connection/1,
-    select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2,clean_invalid_provider/1]).
+-export([update_consumer_connections/2, update_node_conections/2, query_node_connections/1, get_interface_provider_node/1, get_host_connections/2, select_connection/1,
+    update_connection_readonly/2, get_host_flag/1, get_host_flag/2, clean_invalid_provider/1, update_interface_info/1, get_interface_info/1]).
 
 -include("dubbo.hrl").
 -define(SERVER, ?MODULE).
 
 -define(INTERFCE_LIST_TABLE, interface_list).
 
--define(INTERFAE_INFO_TABLE,dubbo_interface_info).
+-define(INTERFACE_INFO_TABLE, dubbo_interface_info).
 
 -define(PROVIDER_NODE_LIST_TABLE, provider_node_list).
 
@@ -99,12 +99,12 @@ init_ets_table() ->
         _Type1:Reason1 ->
             logger:error("new ets table  PROVIDER_NODE_LIST_TABLE error ~p", [Reason1])
     end,
-    try ets:new(?INTERFAE_INFO_TABLE, [public, named_table, {keypos, 2}]) of
-        ?INTERFAE_INFO_TABLE ->
+    try ets:new(?INTERFACE_INFO_TABLE, [public, named_table, {keypos, 2}]) of
+        ?INTERFACE_INFO_TABLE ->
             ok
     catch
-        _Type1:Reason1 ->
-            logger:error("new ets table  PROVIDER_NODE_LIST_TABLE error ~p", [Reason1])
+        _Type2:Reason2 ->
+            logger:error("new ets table  INTERFACE_INFO_TABLE error ~p", [Reason2])
     end,
     ok.
 %%--------------------------------------------------------------------
@@ -123,13 +123,13 @@ init_ets_table() ->
     {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
     {stop, Reason :: term(), NewState :: #state{}}).
 
-handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) ->
-
-    OldProviderList = get_interface_provider_node(Interface),
-    NewProviderList = add_consumer(ProviderNodeList, []),
-    DeleteProverList = OldProviderList -- NewProviderList,
-    clean_invalid_provider(DeleteProverList),
-    {reply, ok, State};
+%%handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) ->
+%%
+%%    OldProviderList = get_interface_provider_node(Interface),
+%%    NewProviderList = add_consumer(ProviderNodeList, []),
+%%    DeleteProverList = OldProviderList -- NewProviderList,
+%%    clean_invalid_provider(DeleteProverList),
+%%    {reply, ok, State};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
@@ -204,10 +204,20 @@ get_host_connections(Host, Port) ->
     List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag),
     List.
 
-update_interface_info(InterfaceInfo)->
-    ets:insert(?INTERFAE_INFO_TABLE,InterfaceInfo).
 
 
+update_interface_info(InterfaceInfo) ->
+    ets:insert(?INTERFACE_INFO_TABLE, InterfaceInfo).
+
+
+get_interface_info(Interface) ->
+    case ets:lookup(?INTERFACE_INFO_TABLE, Interface) of
+        [] ->
+            undefined;
+        [Result] ->
+            Result
+    end.
+
 %%%===================================================================
 %%% Internal functions
 %%%===================================================================
@@ -248,32 +258,27 @@ update_interface_info(InterfaceInfo)->
 %%    ConnectionList.
 
 
-update_node_conections(HostFlag,Connections)->
+update_node_conections(Interface, Connections) ->
     lists:map(
         fun(Item) ->
-            HostFlag= Item#connection_info.host_flag,
-            case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#connection_info{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
-                '$end_of_table' ->
+            HostFlag = Item#connection_info.host_flag,
+            case ets:match_object(?PROVIDER_NODE_LIST_TABLE, #connection_info{host_flag = HostFlag, pid = Item#connection_info.pid, _ = "_"}) of
+                [] ->
                     I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item),
-                    logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
+                    logger:debug("update_node_conections insert one record ~p result:~p", [HostFlag, I2]);
                 _ ->
+                    logger:debug("update_node_conections hostflag ~p already exit ", [HostFlag]),
                     ok
             end
         end, Connections),
     ok.
 
+query_node_connections(Hostflag) ->
+    ets:lookup(?PROVIDER_NODE_LIST_TABLE, Hostflag).
+
 update_consumer_connections(Interface, Connections) ->
     lists:map(
         fun(Item) ->
-            HostFlag= Item#connection_info.host_flag,
-
-            case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#connection_info{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
-                '$end_of_table' ->
-                    I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item),
-                    logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
-                {_ObjectList,_Continuation} ->
-                    ok
-            end,
             I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}),
             logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
             ok
@@ -281,10 +286,10 @@ update_consumer_connections(Interface, Connections) ->
     ok.
 
 get_host_flag(ProviderConfig) ->
-    HostFlag = <<(list_to_binary(ProviderConfig#provider_config.host))/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>,
+    HostFlag = <<(ProviderConfig#provider_config.host)/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>,
     HostFlag.
 get_host_flag(Host, Port) ->
-    <<(list_to_binary(Host))/binary, <<"_">>/binary, (integer_to_binary(Port))/binary>>.
+    <<Host/binary, <<"_">>/binary, (integer_to_binary(Port))/binary>>.
 
 update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) ->
     lists:map(fun(Item) ->
@@ -311,17 +316,12 @@ get_interface_provider_node(Interface) ->
     end.
 
 select_connection(Interface) ->
-    RandNum = rand:uniform(2048),
-    select_connection(Interface, RandNum).
-select_connection(Interface, RandNum) ->
     case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
         [] ->
             {error, none};
         List ->
-            Len = length(List),
-            RemNum = (RandNum rem Len) + 1,
-            InterfaceListItem = lists:nth(RemNum, List),
-            {ok, InterfaceListItem#interface_list.connection_info}
+            Ret = [Item#interface_list.connection_info || Item <- List],
+            {ok, Ret}
     end.
 
 -spec(update_connection_readonly(pid(), boolean()) -> ok).
diff --git a/src/dubbo_provider_protocol.erl b/src/dubbo_provider_protocol.erl
index 11434b6..5615916 100644
--- a/src/dubbo_provider_protocol.erl
+++ b/src/dubbo_provider_protocol.erl
@@ -23,7 +23,7 @@
 
 
 %% API
--export([start_link/4, register_impl_provider/3, select_impl_provider/1]).
+-export([start_link/4, register_impl_provider/2, select_impl_provider/1]).
 
 %% gen_server callbacks
 -export([init/1,
@@ -58,10 +58,11 @@ start_link(Ref, Socket, Transport, Opts) ->
 %% we can use the -behaviour(gen_server) attribute.
 %init([]) -> {ok, undefined}.
 
-init({Ref, Socket, Transport, _Opts = []}) ->
-    logger:info("provider connected"),
+init({Ref, Socket, Transport, _Opts}) ->
+    {ok, {IP, Port}} = inet:peername(Socket),
+    logger:info("consumer ~p:~p connect the server", [IP, Port]),
     ok = ranch:accept_ack(Ref),
-%%    ok = Transport:setopts(Socket, [{active, once}]),
+
     ok = Transport:setopts(Socket, [{active, true}, {packet, 0}]),
     gen_server:enter_loop(?MODULE, [],
         #state{socket = Socket, transport = Transport},
@@ -114,8 +115,8 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-register_impl_provider(Interface, ImplModuleName, ModuleName) ->
-    ets:insert(?PROVIDER_IMPL_TABLE, {Interface, ImplModuleName, ModuleName}),
+register_impl_provider(Interface, ImplModuleName) ->
+    ets:insert(?PROVIDER_IMPL_TABLE, {Interface, ImplModuleName}),
     ok.
 
 -spec select_impl_provider(Interface :: binary()) -> {ok, binary()} | {error, term()}.
@@ -123,7 +124,7 @@ select_impl_provider(Interface) ->
     case ets:lookup(?PROVIDER_IMPL_TABLE, Interface) of
         [] ->
             {error, no_provider};
-        [{Interface, ImplModuleName, ModuleName}] ->
+        [{Interface, ImplModuleName}] ->
             {ok, ImplModuleName}
     end.
 
diff --git a/src/dubbo_reference_config.erl b/src/dubbo_reference_config.erl
index 7ab7f86..2465831 100644
--- a/src/dubbo_reference_config.erl
+++ b/src/dubbo_reference_config.erl
@@ -19,12 +19,12 @@
 -include("dubbo.hrl").
 -include("dubboerl.hrl").
 
--record(dubbo_interface_info,{}).
+-record(dubbo_interface_info, {}).
 
 %% API
 -export([init_reference/1]).
 
-init_reference(ConsumerInfo)->
+init_reference(ConsumerInfo) ->
 %%    InitConfigMap= #{
 %%
 %%    },
@@ -33,25 +33,23 @@ init_reference(ConsumerInfo)->
     ok.
 
 
-create_proxy(ConsumerInfo)->
-
-
+create_proxy(ConsumerInfo) ->
 
     Para = gen_parameter(ConsumerInfo),
     Url = gen_registry_url(Para),
-    dubbo_extension:run(protocol_wapper,refer,[Url]),
+    ok = dubbo_extension:run_fold(protocol_wapper, refer, [Url], ok),
     ok.
 
-    %%application=hello-world&dubbo=2.0.2&pid=68901&refer=application=hello-world&default.check=false&default.lazy=false&default.retries=0&default.sticky=false&default.timeout=300000&dubbo=2.0.2&interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&lazy=false&methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid=68901&register.ip=127.0.0.1&release=2.7.1&retries=0&side=consumer&sticky=false&timestamp=1559727789953&registry=zookeeper&release=2.7.1&timestamp=1559727842451
+%%application=hello-world&dubbo=2.0.2&pid=68901&refer=application=hello-world&default.check=false&default.lazy=false&default.retries=0&default.sticky=false&default.timeout=300000&dubbo=2.0.2&interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&lazy=false&methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid=68901&register.ip=127.0.0.1&release=2.7.1&retries=0&side=consumer&sticky=false&timestamp=1559727789953&registry=zookeeper&release=2.7.1&timestamp=1559727842451
 
 
-gen_registry_url(Para)->
+gen_registry_url(Para) ->
     %%todo 组装para & url
-    {Host,Port} = get_registry_host_port(),
+    {Host, Port} = dubbo_registry:get_registry_host_port(),
     UrlInfo = #dubbo_url{
         scheme = <<"registry">>,
         host = list_to_binary(Host),
-        port = integer_to_binary(Port),
+        port = Port,
         path = <<"org.apache.dubbo.registry.RegistryService">>,
         parameters = Para
     },
@@ -59,53 +57,45 @@ gen_registry_url(Para)->
 %%    Url = "registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hello-world&dubbo=2.0.2&pid=68901&refer=application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127.0.0.1% [...]
 %%    Url.
 
-get_registry_host_port()->
-    %% @todo need adapter other registry
-    RegistryList = application:get_env(dubboerl,zookeeper_list,[{"127.0.0.1",2181}]),
-    [Item|_] = RegistryList,
-    Item.
 
-gen_parameter(ConsumerInfo)->
+gen_parameter(ConsumerInfo) ->
     Para = #{
         <<"application">> => get_appname(ConsumerInfo),
         <<"dubbo">> => <<"2.0.2">>,
-        <<"pid">> => get_pid(),
+        <<"pid">> => list_to_binary(get_pid()),
         <<"refer">> => get_refinfo(ConsumerInfo),
-        <<"registry">> => get_registry_type(),
+        <<"registry">> => dubbo_registry:get_registry_type(),
         <<"release">> => <<"2.7.1">>,
         <<"timestamp">> => integer_to_binary(dubbo_time_util:timestamp_ms())
     },
-
     Para.
 
-get_appname(ConsumerInfo)->
+get_appname(ConsumerInfo) ->
     ConsumerInfo#consumer_config.application.
-get_pid()->
+get_pid() ->
     os:getpid().
-get_refinfo(ConsumerInfo)->
-    KeyValues=[
-        {"application",ConsumerInfo#consumer_config.application},
-        {"default.check",ConsumerInfo#consumer_config.check},
-        {"default.lazy","false"},
-        {"default.retries","0"},
-        {"default.sticky","false"},
-        {"default.timeout","300000"},
-        {"dubbo","2.0.2"},
-        {"interface",ConsumerInfo#consumer_config.interface},
-        {"lazy","false"},
-        {"methods",ConsumerInfo#consumer_config.methods},
-        {"register.ip",ConsumerInfo#consumer_config.application},
-        {"release","2.7.1"},
-        {"pid",get_pid()},
-        {"side","consumer"},
-        {"sticky","false"},
-        {"timestamp",dubbo_time_util:timestamp_ms()}
+get_refinfo(ConsumerInfo) ->
+    KeyValues = [
+        {"application", ConsumerInfo#consumer_config.application},
+        {"default.check", atom_to_list(ConsumerInfo#consumer_config.check)},
+        {"default.lazy", "false"},
+        {"default.retries", "0"},
+        {"default.sticky", "false"},
+        {"default.timeout", "300000"},
+        {"dubbo", "2.0.2"},
+        {"interface", ConsumerInfo#consumer_config.interface},
+        {"lazy", "false"},
+        {"methods", string:join(ConsumerInfo#consumer_config.methods, ",")},
+        {"register.ip", ConsumerInfo#consumer_config.application},
+        {"release", "2.7.1"},
+        {"pid", get_pid()},
+        {"side", "consumer"},
+        {"sticky", "false"},
+        {"timestamp", integer_to_list(dubbo_time_util:timestamp_ms())}
     ],
-    KeyValues2 = [io_lib:format("~s=~p", [Key, Value]) || {Key, Value} <= KeyValues],
+    KeyValues2 = [io_lib:format("~s=~s", [Key, Value]) || {Key, Value} <- KeyValues],
     ParameterStr1 = string:join(KeyValues2, "&"),
     list_to_binary(http_uri:encode(ParameterStr1)).
 %%    <<"application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127..0.1%26release%3D2.7.1%26retries%3D0%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1559727789953">>.
 
-get_registry_type()->
-    %%todo
-    atom_to_binary(application:get_env(dubboerl,registry,zookeeper)).
\ No newline at end of file
+
diff --git a/src/dubbo_registry.erl b/src/dubbo_registry.erl
index 89eb64b..01af02c 100644
--- a/src/dubbo_registry.erl
+++ b/src/dubbo_registry.erl
@@ -18,13 +18,13 @@
 -include("dubboerl.hrl").
 
 -callback start(Url :: binary) -> ok.
--callback register(Url::binary())-> term().
--callback subscribe(SubcribeUrl::binary(),NotifyFun::function())->ok.
+-callback register(Url :: binary()) -> term().
+-callback subscribe(SubcribeUrl :: binary(), NotifyFun :: function()) -> ok.
 
 %% API
--export([setup_register/1,register/2]).
+-export([setup_register/1, register/2, unregister/2, get_registry_host_port/0, get_registry_type/0, get_registry_module/1]).
 
--spec(setup_register(UrlInfo :: map()) -> {ok, RegistryProcessName :: atom()}|{error, term()}).
+-spec(setup_register(UrlInfo :: #dubbo_url{}) -> {ok, RegistryProcessName :: atom()}|{error, term()}).
 setup_register(UrlInfo) ->
     RegistryModuleName = get_registry_module(UrlInfo),
     case whereis(RegistryModuleName) of
@@ -35,13 +35,28 @@ setup_register(UrlInfo) ->
             {ok, RegistryModuleName}
     end.
 
-register(RegistryName,Url) ->
-    logger:info("call ~p register url ~p",[RegistryName,Url]),
-    Result = apply(RegistryName,register,[Url]),
+register(RegistryName, Url) ->
+    logger:info("call ~p register url ~p", [RegistryName, Url]),
+    Result = apply(RegistryName, register, [Url]),
+    Result.
+unregister(RegistryName, Url) ->
+    logger:info("call ~p unregister url ~p", [RegistryName, Url]),
+    Result = apply(RegistryName, unregister, [Url]),
     Result.
-
 
 get_registry_module(Info) ->
     RegistryName = Info#dubbo_url.scheme,
-    FullName = << <<"dubbo_registry_">>, RegistryName/binary>>,
-    binary_to_existing_atom(FullName).
\ No newline at end of file
+    FullName = <<<<"dubbo_registry_">>/binary, RegistryName/binary>>,
+    binary_to_existing_atom(FullName, latin1).
+
+
+
+get_registry_host_port() ->
+    %% @todo need adapter other registry
+    RegistryList = application:get_env(dubboerl, zookeeper_list, [{"127.0.0.1", 2181}]),
+    [Item | _] = RegistryList,
+    Item.
+
+get_registry_type() ->
+    %%todo
+    atom_to_binary(application:get_env(dubboerl, registry, zookeeper), utf8).
\ No newline at end of file
diff --git a/src/dubbo_registry_zookeeper.erl b/src/dubbo_registry_zookeeper.erl
index 5d14588..915bb71 100644
--- a/src/dubbo_registry_zookeeper.erl
+++ b/src/dubbo_registry_zookeeper.erl
@@ -21,9 +21,9 @@
 -include("dubbo.hrl").
 -include("dubboerl.hrl").
 %% API
--export([start_link/0, register_consumer/1, register_consumer/2, gen_consumer_node_info/1, register_provider/1, provider_watcher/1]).
+-export([start_link/0, register_provider/1, provider_watcher/1]).
 
--export([start/1,register/1,subscribe/2]).
+-export([start/1, register/1, unregister/1, subscribe/2]).
 %% gen_server callbacks
 -export([init/1,
     handle_call/3,
@@ -34,7 +34,7 @@
 
 -define(SERVER, ?MODULE).
 
--record(state, {zk_pid,notify_fun}).
+-record(state, {zk_pid, provider_notify_fun}).
 
 %%%===================================================================
 %%% API
@@ -88,16 +88,17 @@ init([]) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
     {stop, Reason :: term(), NewState :: #state{}}).
-
-handle_call({add_consumer, Interface,ConsumerUrl}, _From, State) ->
-    add_consumer(Interface,ConsumerUrl, State),
+handle_call({do_register, Url}, _From, State) ->
+    do_register(State#state.zk_pid, Url),
     {reply, ok, State};
-handle_call({add_provider, Provider}, _From, State) ->
-    register_provider_path(Provider, State),
+handle_call({do_unregister, Url}, _From, State) ->
+    do_unregister(State#state.zk_pid, Url),
     {reply, ok, State};
-handle_call({subscribe_provider,InterfaceName,NotifyFun}, _From, #state{zk_pid = ZkPid} = State) ->
-    NewState=State#state{notify_fun = NotifyFun},
-    get_provider_list(InterfaceName,ZkPid,NotifyFun),
+handle_call({subscribe_provider, InterfaceName, NotifyFun}, _From, #state{zk_pid = ZkPid} = State) ->
+    logger:debug("subscribe provider ~p notify fun ~p",[InterfaceName,NotifyFun]),
+    NewState = State#state{provider_notify_fun = NotifyFun},
+    List = get_provider_list(InterfaceName, ZkPid),
+    notify_provider_change(NotifyFun, InterfaceName, List),
     {reply, ok, NewState};
 
 handle_call(_Request, _From, State) ->
@@ -114,8 +115,9 @@ handle_call(_Request, _From, State) ->
     {noreply, NewState :: #state{}} |
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), NewState :: #state{}}).
-handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid} = State) ->
-    get_provider_and_start(Pid, Interface, Path),
+handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid, provider_notify_fun = NotifyFun} = State) ->
+    ProviderList = get_provider_and_start(Pid, Interface, Path),
+    notify_provider_change(NotifyFun, Interface, ProviderList),
     {noreply, State};
 handle_cast(_Request, State) ->
     {noreply, State}.
@@ -135,7 +137,6 @@ handle_cast(_Request, State) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), NewState :: #state{}}).
 handle_info(_Info, State) ->
-    logger:info("zk server recv msg:~p", [_Info]),
     {noreply, State}.
 
 %%--------------------------------------------------------------------
@@ -169,44 +170,84 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-
 %%----------------------------------------------
 %% dubbo_registry
 %%----------------------------------------------
 start(Url) ->
     ok.
-register(Url)->
-    {ok,UrlInfo} = dubbo_common_fun:parse_url(Url),
-    InterfaceName = maps:get(<<"interface">>,UrlInfo#dubbo_url.parameters),
-    register(UrlInfo#dubbo_url.scheme,InterfaceName,Url),
+%%register(Url) ->
+%%    {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
+%%    InterfaceName = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
+%%    register(UrlInfo#dubbo_url.scheme, InterfaceName, Url),
+%%    ok.
+
+%%register(<<"consumer">>, InterfaceName, Url) ->
+%%    gen_server:call(?SERVER, {add_consumer, InterfaceName, Url}),
+%%    ok.
+
+register(Url) ->
+    gen_server:call(?SERVER, {do_register, Url}, 10000),
     ok.
 
-register(<<"consumer">>,InterfaceName,Url)->
-    gen_server:call(?SERVER, {add_consumer,InterfaceName, Url}),
-    ok;
-register(<<"provider">>,InterfaceName,Url)->
-
+unregister(Url) ->
+    gen_server:call(?SERVER, {do_unregister, Url}, 10000),
     ok.
 
-subscribe(SubcribeUrl,NotifyFun)->
-    {ok,UrlInfo} = dubbo_common_fun:parse_url(SubcribeUrl),
-    InterfaceName = maps:get(<<"interface">>,UrlInfo#dubbo_url.parameters),
-    try gen_server:call(?SERVER,{subscribe_provider,InterfaceName,NotifyFun},5000) of
-        ok->
+do_register(Pid, Url) ->
+    case dubbo_common_fun:parse_url(Url) of
+        {ok, UrlInfo} ->
+            CreateNodeList = [{get_register_node(Item, UrlInfo), p} || Item <- [root, service, category]],
+            UrlNode = {list_to_binary(edoc_lib:escape_uri(binary_to_list(Url))), get_dynamic(UrlInfo)},
+            CreateNodeList2 = CreateNodeList ++ [UrlNode],
+            RetFullNode = check_and_create_path(Pid, <<"">>, CreateNodeList2),
+            {ok, RetFullNode};
+        Reason ->
+            logger:error("zk parse url fail reason ~p", [Reason]),
+            {error, Reason}
+    end.
+do_unregister(Pid, Url) ->
+    case dubbo_common_fun:parse_url(Url) of
+        {ok, UrlInfo} ->
+            CreateNodeList = [get_register_node(Item, UrlInfo) || Item <- [root, service, category]],
+            UrlNode = list_to_binary(edoc_lib:escape_uri(binary_to_list(Url))),
+            CreateNodeList2 = CreateNodeList ++ [UrlNode],
+            Path = dubbo_common_fun:binary_list_join(CreateNodeList2, <<"/">>),
+            FullPath = <<<<"/">>/binary, Path/binary>>,
+            del_path(Pid, FullPath);
+        Reason ->
+            logger:error("zk parse url fail reason ~p", [Reason]),
+            {error, Reason}
+    end.
+
+get_dynamic(UrlInfo) ->
+    case maps:get(<<"dynamic">>, UrlInfo#dubbo_url.parameters, <<"true">>) of
+        <<"true">> ->
+            e;
+        _ ->
+            p
+    end.
+
+get_register_node(root, _UrlInfo) ->
+    <<"dubbo">>;
+get_register_node(service, UrlInfo) ->
+    maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters);
+get_register_node(category, UrlInfo) ->
+    maps:get(<<"category">>, UrlInfo#dubbo_url.parameters, <<"providers">>).
+
+
+subscribe(SubcribeUrl, NotifyFun) ->
+    {ok, UrlInfo} = dubbo_common_fun:parse_url(SubcribeUrl),
+    InterfaceName = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
+    try gen_server:call(?SERVER, {subscribe_provider, InterfaceName, NotifyFun}, 5000) of
+        ok ->
             ok
     catch
-        Error:Reason->
+        _Error:Reason ->
             %%todo improve error type
-            {error,Reason}
+            {error, Reason}
     end.
 
-register_consumer(Consumer) ->
-    gen_server:call(?SERVER, {add_consumer, Consumer}),
-    ok.
-register_consumer(Name, Option) ->
-    Consumer = #consumer_config{interface = Name, methods = [<<"testa">>, <<"testb">>]},
-    register_consumer(Consumer),
-    ok.
+
 register_provider(Provider) ->
     gen_server:call(?SERVER, {add_provider, Provider}),
     ok.
@@ -222,9 +263,8 @@ connection() ->
         {monitor, self()}]),
     {ok, Pid}.
 
-add_consumer(InterfaceName,ConsumerUrl, State) ->
+add_consumer(InterfaceName, ConsumerUrl, State) ->
     Pid = State#state.zk_pid,
-%%    ConsumerNode = gen_consumer_node_info(Consumer),
     ConsumerNode2 = list_to_binary(edoc_lib:escape_uri(binary_to_list(ConsumerUrl))),
     check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {InterfaceName, p}, {<<"consumers">>, p}, {ConsumerNode2, e}]),
     %% todo
@@ -237,19 +277,21 @@ register_provider_path(Provider, State) ->
     ok.
 
 
-get_provider_list(InterfaceName,ZkPid,NotifyFun) ->
+get_provider_list(InterfaceName, ZkPid) ->
     InterfacePath = <<<<"/dubbo/">>/binary, InterfaceName/binary, <<"/providers">>/binary>>,
-    ChildList= get_provider_and_start(ZkPid, InterfaceName, InterfacePath),
-    NotifyFun(InterfaceName,ChildList),
-    ok.
+    ChildList = get_provider_and_start(ZkPid, InterfaceName, InterfacePath),
+    ChildList.
 get_provider_and_start(Pid, Interface, Path) ->
     case erlzk:get_children(Pid, Path, spawn(dubbo_registry_zookeeper, provider_watcher, [Interface])) of
         {ok, ChildList} ->
             logger:debug("get provider list ~p", [ChildList]),
-%%            start_provider_process(Interface, ChildList),
             ChildList;
+        {error, no_node} ->
+            logger:warning("interface ~p provide zk node unexist", [Interface]),
+            check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Interface, p}, {<<"providers">>, p}]),
+            get_provider_and_start(Pid, Interface, Path);
         {error, R1} ->
-            logger:debug("[add_consumer] get_provider_list error ~p ~p", [R1]),
+            logger:debug("[add_consumer] get_provider_list error ~p", [R1]),
             []
     end.
 
@@ -259,56 +301,61 @@ provider_watcher(Interface) ->
             gen_server:cast(?SERVER, {provider_node_change, Interface, Path}),
             logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]);
         {Event, Path} ->
-%%            Path = "/a",
-%%            Event = node_created
             logger:debug("provider_watcher get event ~p ~p", [Event, Path])
     end,
     ok.
 
+notify_provider_change(Fun, Interface, []) ->
+    UrlInfo = #dubbo_url{
+        scheme = <<"empty">>,
+        host = <<"127.0.0.1">>,
+        path = Interface,
+        port = 80,
+        parameters = #{
+            <<"interface">> => Interface
+        }
+    },
+    UrlInfoBin = dubbo_common_fun:url_to_binary(UrlInfo),
+    logger:debug("notify provider change fun ~p", [Fun]),
+    Fun(Interface, [UrlInfoBin]),
+    ok;
+notify_provider_change(Fun, Interface, List) ->
+    List2 = [http_uri:decode(Item) || Item <- List],
+    logger:debug("notify provider change fun ~p", [Fun]),
+    Fun(Interface, List2),
+    ok.
+
+del_path(Pid, Path) ->
+    case erlzk:delete(Pid, Path) of
+        ok ->
+            ok;
+        {error, Reason} ->
+            logger:warning("zookeeper registry del path error ~p path ~p", [Reason, Path]),
+            {error, Reason}
+    end.
 
 create_path(Pid, Path, CreateType) ->
     case erlzk:create(Pid, Path, CreateType) of
         {ok, ActualPath} ->
-            logger:debug("[add_consumer] create zk path  success ~p", [ActualPath]),
+            logger:debug("create zk path  success ~p", [ActualPath]),
             ok;
         {error, R1} ->
-            logger:debug("[add_consumer] create zk path error ~p ~p", [Path, R1])
+            logger:debug("create zk path error ~p ~p", [Path, R1])
     end,
     ok.
-check_and_create_path(_Pid, _RootPath, []) ->
-    ok;
+check_and_create_path(_Pid, RootPath, []) ->
+    RootPath;
 check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) ->
     CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>,
+
     case erlzk:exists(Pid, CheckPath) of
         {ok, Stat} ->
             check_and_create_path(Pid, CheckPath, Rst);
         {error, no_node} ->
-            logger:debug("[add_consumer] check_and_create_path unexist no_node ~p", [CheckPath]),
+            logger:debug("check_and_create_path unexist no_node ~p", [CheckPath]),
             create_path(Pid, CheckPath, CreateType),
             check_and_create_path(Pid, CheckPath, Rst);
         {error, R1} ->
-            logger:debug("[add_consumer] check_and_create_path unexist ~p", [R1]),
+            logger:debug("check_and_create_path unexist ~p", [R1]),
             check_and_create_path(Pid, CheckPath, Rst)
-    end.
-
-gen_consumer_node_info(Consumer) ->
-    %% revision参数字段的作用是什么? 暂时不添加
-    Methods = dubbo_lists_util:join(Consumer#consumer_config.methods, <<",">>),
-    Value = io_lib:format(<<"consumer://~s/~s?application=~s&category=~s&check=~p&default.timeout=~p&dubbo=~s&interface=~s&methods=~s&side=~s&timestamp=~p">>,
-        [dubbo_common_fun:local_ip_v4_str(),
-            Consumer#consumer_config.interface,
-            Consumer#consumer_config.application,
-            Consumer#consumer_config.category,
-            Consumer#consumer_config.check,
-            Consumer#consumer_config.default_timeout,
-            Consumer#consumer_config.dubbo_version,
-            Consumer#consumer_config.interface,
-            Methods,
-            Consumer#consumer_config.side,
-            dubbo_time_util:timestamp_ms()
-        ]),
-    list_to_binary(Value).
-
-%%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
-start_provider_process(Interface, ProviderList) ->
-    dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList).
\ No newline at end of file
+    end.
\ No newline at end of file
diff --git a/src/dubbo_service_config.erl b/src/dubbo_service_config.erl
new file mode 100644
index 0000000..2f595ba
--- /dev/null
+++ b/src/dubbo_service_config.erl
@@ -0,0 +1,110 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_service_config).
+
+-include("dubbo.hrl").
+-include("dubboerl.hrl").
+%% API
+-export([export/1]).
+
+-spec(export(#provider_config{}) -> ok).
+export(ProviderInfo) ->
+    logger:debug("will export provider info ~p", [ProviderInfo]),
+    do_export(ProviderInfo),
+    ok.
+
+do_export(ProviderInfo) ->
+    do_export_protocol(ProviderInfo),
+    ok.
+
+do_export_protocol(ProviderInfo) ->
+    Url = get_registry_url(ProviderInfo),
+    logger:debug("do export protocol for url ~p", [Url]),
+    Invoker = #invoker{url = Url, handler = ProviderInfo#provider_config.impl_handle},
+    dubbo_extension:run_fold(protocol_wapper, export, [Invoker], ok),
+    ok.
+
+
+
+
+get_registry_url(ProviderInfo) ->
+    {Host, Port} = dubbo_registry:get_registry_host_port(),
+    UrlInfo = #dubbo_url{
+        scheme = <<"registry">>,
+        host = list_to_binary(Host),
+        port = Port,
+        path = <<"org.apache.dubbo.registry.RegistryService">>,
+        parameters = gen_registry_parameter(ProviderInfo)
+    },
+    dubbo_common_fun:url_to_binary(UrlInfo).
+
+gen_registry_parameter(ProviderInfo) ->
+    Para = #{
+        <<"application">> => ProviderInfo#provider_config.application,
+        <<"dubbo">> => <<"2.0.2">>,
+        <<"pid">> => list_to_binary(os:getpid()),
+        <<"export">> => get_export_info(ProviderInfo),
+        <<"registry">> => dubbo_registry:get_registry_type(),
+        <<"release">> => <<"2.7.1">>,
+        <<"timestamp">> => integer_to_binary(dubbo_time_util:timestamp_ms())
+    },
+    Para.
+
+get_export_info(ProviderInfo) ->
+    %%dubbo://127.0.0.1:20880/org.apache.dubbo.erlang.sample.service.facade.UserOperator?
+    %% anyhost=true&
+    %% application=hello-world&
+    %% bean.name=org.apache.dubbo.erlang.sample.service.facade.UserOperator&
+    %% bind.ip=127.0.0.1&bind.port=20880&default.deprecated=false&
+    %% default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&
+    %% dynamic=false&generic=false&
+    %% interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&
+    %% methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid=90956&register=true&release=2.7.1&side=provider&timestamp=1562725983984
+    Para = [
+        {"anyhost", "true"},
+        {"application", ProviderInfo#provider_config.application},
+        {"bean.name", ProviderInfo#provider_config.interface},
+        {"bind.ip", dubbo_common_fun:local_ip_v4_str()},
+        {"bind.port", integer_to_list(ProviderInfo#provider_config.port)},
+        {"default.deprecated", "false"},
+        {"default.dynamic", "false"},
+        {"default.register", "true"},
+        {"deprecated", "false"},
+        {"dynamic", "false"},
+        {"generic", "false"},
+        {"interface", ProviderInfo#provider_config.interface},
+        {"methods", format_methods_str(ProviderInfo#provider_config.methods)},
+        {"pid", os:getpid()},
+        {"register", "true"},
+        {"release", "2.7.1"},
+        {"side", "provider"},
+        {"dubbo", "2.0.2"},
+        {"timestamp", integer_to_list(dubbo_time_util:timestamp_ms())}
+    ],
+    UrlInfo = #dubbo_url{
+        scheme = ProviderInfo#provider_config.protocol,
+        host = dubbo_common_fun:local_ip_v4_str(),
+        port = ProviderInfo#provider_config.port,
+        path = ProviderInfo#provider_config.interface,
+        parameters = Para
+    },
+    Url = dubbo_common_fun:url_to_binary(UrlInfo),
+    list_to_binary(http_uri:encode(binary_to_list(Url))).
+
+format_methods_str(Methods) ->
+    Methods2 = [binary_to_list(Item) || Item <- Methods],
+    string:join(Methods2, ",").
\ No newline at end of file
diff --git a/src/dubbo_zookeeper.erl b/src/dubbo_shutdown.erl
similarity index 50%
rename from src/dubbo_zookeeper.erl
rename to src/dubbo_shutdown.erl
index 84a95d6..856bd41 100644
--- a/src/dubbo_zookeeper.erl
+++ b/src/dubbo_shutdown.erl
@@ -14,12 +14,12 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(dubbo_zookeeper).
+-module(dubbo_shutdown).
+
 -behaviour(gen_server).
 
--include("dubbo.hrl").
 %% API
--export([start_link/0, register_consumer/1, register_consumer/2, gen_consumer_node_info/1, register_provider/1, provider_watcher/1]).
+-export([start_link/0]).
 
 %% gen_server callbacks
 -export([init/1,
@@ -31,7 +31,7 @@
 
 -define(SERVER, ?MODULE).
 
--record(state, {zk_pid}).
+-record(state, {}).
 
 %%%===================================================================
 %%% API
@@ -67,8 +67,9 @@ start_link() ->
     {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term()} | ignore).
 init([]) ->
-    {ok, Pid} = connection(),
-    {ok, #state{zk_pid = Pid}}.
+    process_flag(trap_exit, true),
+    io:format("start shutdown ser"),
+    {ok, #state{}}.
 
 %%--------------------------------------------------------------------
 %% @private
@@ -85,13 +86,6 @@ init([]) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
     {stop, Reason :: term(), NewState :: #state{}}).
-
-handle_call({add_consumer, Consumer}, _From, State) ->
-    add_consumer(Consumer, State),
-    {reply, ok, State};
-handle_call({add_provider, Provider}, _From, State) ->
-    register_provider_path(Provider, State),
-    {reply, ok, State};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
@@ -106,9 +100,6 @@ handle_call(_Request, _From, State) ->
     {noreply, NewState :: #state{}} |
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), NewState :: #state{}}).
-handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid} = State) ->
-    get_provider_and_start(Pid, Interface, Path),
-    {noreply, State};
 handle_cast(_Request, State) ->
     {noreply, State}.
 
@@ -127,7 +118,6 @@ handle_cast(_Request, State) ->
     {noreply, NewState :: #state{}, timeout() | hibernate} |
     {stop, Reason :: term(), NewState :: #state{}}).
 handle_info(_Info, State) ->
-    logger:info("zk server recv msg:~p", [_Info]),
     {noreply, State}.
 
 %%--------------------------------------------------------------------
@@ -144,6 +134,7 @@ handle_info(_Info, State) ->
 -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
     State :: #state{}) -> term()).
 terminate(_Reason, _State) ->
+    destroy(),
     ok.
 
 %%--------------------------------------------------------------------
@@ -160,117 +151,10 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-
-register_consumer(Consumer) ->
-    gen_server:call(?SERVER, {add_consumer, Consumer}),
-    ok.
-register_consumer(Name, Option) ->
-    Consumer = #consumer_config{interface = Name, methods = [<<"testa">>, <<"testb">>]},
-    register_consumer(Consumer),
-    ok.
-register_provider(Provider) ->
-    gen_server:call(?SERVER, {add_provider, Provider}),
-    ok.
-
 %%%===================================================================
 %%% Internal functions
 %%%===================================================================
-
-connection() ->
-    {ok, List} = application:get_env(dubboerl, zookeeper_list),
-    {ok, Pid} = erlzk:connect(List, 30000, [
-        {chroot, "/"},
-        {monitor, self()}]),
-    {ok, Pid}.
-
-add_consumer(Consumer, State) ->
-    Pid = State#state.zk_pid,
-%%    InterfacePath= << <<"/dubbo/">>/binary,Name/binary ,<<"consumers">>/binary >>,
-    ConsumerNode = gen_consumer_node_info(Consumer),
-    ConsumerNode2 = list_to_binary(edoc_lib:escape_uri(binary_to_list(ConsumerNode))),
-    check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Consumer#consumer_config.interface, p}, {<<"consumers">>, p}, {ConsumerNode2, e}]),
-    get_provider_list(Consumer, State),
-    ok.
-register_provider_path(Provider, State) ->
-    #state{zk_pid = Pid} = State,
-    ProviderNode = dubbo_node_config_util:gen_provider_info(Provider),
-    check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Provider#provider_config.interface, p}, {<<"providers">>, p}, {ProviderNode, e}]),
-    ok.
-
-
-get_provider_list(Consumer, State) ->
-    Pid = State#state.zk_pid,
-    InterfacePath = <<<<"/dubbo/">>/binary, (Consumer#consumer_config.interface)/binary, <<"/providers">>/binary>>,
-    get_provider_and_start(Pid, Consumer#consumer_config.interface, InterfacePath),
-    ok.
-get_provider_and_start(Pid, Interface, Path) ->
-    case erlzk:get_children(Pid, Path, spawn(dubbo_zookeeper, provider_watcher, [Interface])) of
-        {ok, ChildList} ->
-            logger:debug("get provider list ~p", [ChildList]),
-            start_provider_process(Interface, ChildList),
-            ok;
-        {error, R1} ->
-            logger:debug("[add_consumer] get_provider_list error ~p ~p", [R1]),
-            ok
-    end.
-
-provider_watcher(Interface) ->
-    receive
-        {node_children_changed, Path} ->
-            gen_server:cast(?SERVER, {provider_node_change, Interface, Path}),
-            logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]);
-        {Event, Path} ->
-%%            Path = "/a",
-%%            Event = node_created
-            logger:debug("provider_watcher get event ~p ~p", [Event, Path])
-    end,
-    ok.
-
-
-create_path(Pid, Path, CreateType) ->
-    case erlzk:create(Pid, Path, CreateType) of
-        {ok, ActualPath} ->
-            logger:debug("[add_consumer] create zk path  success ~p", [ActualPath]),
-            ok;
-        {error, R1} ->
-            logger:debug("[add_consumer] create zk path error ~p ~p", [Path, R1])
-    end,
-    ok.
-check_and_create_path(_Pid, _RootPath, []) ->
-    ok;
-check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) ->
-    CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>,
-    case erlzk:exists(Pid, CheckPath) of
-        {ok, Stat} ->
-            check_and_create_path(Pid, CheckPath, Rst);
-        {error, no_node} ->
-            logger:debug("[add_consumer] check_and_create_path unexist no_node ~p", [CheckPath]),
-            create_path(Pid, CheckPath, CreateType),
-            check_and_create_path(Pid, CheckPath, Rst);
-        {error, R1} ->
-            logger:debug("[add_consumer] check_and_create_path unexist ~p", [R1]),
-            check_and_create_path(Pid, CheckPath, Rst)
-    end.
-
-gen_consumer_node_info(Consumer) ->
-    %% revision参数字段的作用是什么? 暂时不添加
-    Methods = dubbo_lists_util:join(Consumer#consumer_config.methods, <<",">>),
-    Value = io_lib:format(<<"consumer://~s/~s?application=~s&category=~s&check=~p&default.timeout=~p&dubbo=~s&interface=~s&methods=~s&side=~s&timestamp=~p">>,
-        [dubbo_common_fun:local_ip_v4_str(),
-            Consumer#consumer_config.interface,
-            Consumer#consumer_config.application,
-            Consumer#consumer_config.category,
-            Consumer#consumer_config.check,
-            Consumer#consumer_config.default_timeout,
-            Consumer#consumer_config.dubbo_version,
-            Consumer#consumer_config.interface,
-            Methods,
-            Consumer#consumer_config.side,
-            dubbo_time_util:timestamp_ms()
-        ]),
-    list_to_binary(Value).
-
-%%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
-start_provider_process(Interface, ProviderList) ->
-    dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList).
-
+destroy() ->
+    logger:info("dubbo hook shutdown event"),
+    dubbo_extension:run(protocol_wapper, destroy, []),
+    ok.
\ No newline at end of file
diff --git a/src/dubbo_traffic_control.erl b/src/dubbo_traffic_control.erl
index 0a75aa7..853ce82 100644
--- a/src/dubbo_traffic_control.erl
+++ b/src/dubbo_traffic_control.erl
@@ -1,11 +1,19 @@
-%%%-------------------------------------------------------------------
-%%% @author dlive
-%%% @copyright (C) 2018, <COMPANY>
-%%% @doc
-%%%
-%%% @end
-%%% Created : 22. May 2018 1:58 PM
-%%%-------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
 -module(dubbo_traffic_control).
 -include("dubboerl.hrl").
 %% API
diff --git a/src/dubbo_type_transfer.erl b/src/dubbo_type_transfer.erl
index 827c3d2..3f1af96 100644
--- a/src/dubbo_type_transfer.erl
+++ b/src/dubbo_type_transfer.erl
@@ -1,11 +1,19 @@
-%%%-------------------------------------------------------------------
-%%% @author dlive
-%%% @copyright (C) 2016, <COMPANY>
-%%% @doc
-%%%
-%%% @end
-%%% Created : 27. 十月 2016 下午8:28
-%%%-------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements.  See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License.  You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
 -module(dubbo_type_transfer).
 -include("hessian.hrl").
 -include("dubbo.hrl").
@@ -42,7 +50,6 @@ java_to_native(#list{values = ForeignData} = Data, State) ->
     ForeignDataNew = [java_to_native(ValueItem, State) || ValueItem <- ForeignData],
     ForeignDataNew;
 java_to_native(Data, _) ->
-    logger:debug("java_to_native unkonw type ~p", [Data]),
     Data.
 
 get_deftype(ForeignType) ->
diff --git a/src/dubboerl.erl b/src/dubboerl.erl
index 03cc8a0..e3b71e4 100644
--- a/src/dubboerl.erl
+++ b/src/dubboerl.erl
@@ -33,7 +33,6 @@ start_consumer() ->
     ApplicationName = application:get_env(dubboerl, application, <<"defaultApplication">>),
     lists:map(fun({Interface, Option}) ->
         ConsumerInfo = dubbo_config_util:gen_consumer(ApplicationName, Interface, Option),
-%%        dubbo_zookeeper:register_consumer(ConsumerInfo),
         dubbo_reference_config:init_reference(ConsumerInfo),
         logger:info("consumer refer success ~p", [Interface])
               end, ConsumerList),
@@ -43,21 +42,15 @@ start_provider() ->
     ProviderList = application:get_env(dubboerl, provider, []),
     ApplicationName = application:get_env(dubboerl, application, <<"defaultApplication">>),
     DubboServerPort = application:get_env(dubboerl, port, ?DUBBO_DEFAULT_PORT),
-    start_provider_listen(DubboServerPort),
-    lists:map(fun({ImplModuleName, BehaviourModuleName, Interface, Option}) ->
-        ok = dubbo_provider_protocol:register_impl_provider(Interface, ImplModuleName, BehaviourModuleName),
-        MethodList = apply(BehaviourModuleName, get_method_999_list, []),
-        ProviderInfo = dubbo_config_util:gen_provider(ApplicationName, DubboServerPort, Interface, MethodList, Option),
-        dubbo_zookeeper:register_provider(ProviderInfo),
-        logger:info("register provider success ~p ~p", [ImplModuleName, Interface])
-              end, ProviderList),
-    ok.
 
-start_provider_listen(Port) ->
-    {ok, _} = ranch:start_listener(tcp_reverse,
-        ranch_tcp, [{port, Port}], dubbo_provider_protocol, []),
+    lists:map(
+        fun({ImplModuleName, BehaviourModuleName, Interface, Option}) ->
+            MethodList = apply(BehaviourModuleName, get_method_999_list, []),
+            ProviderInfo = dubbo_config_util:gen_provider(ApplicationName, DubboServerPort, Interface, MethodList, ImplModuleName, Option),
+            dubbo_service_config:export(ProviderInfo),
+            logger:info("register provider success ~p ~p", [ImplModuleName, Interface])
+        end, ProviderList),
     ok.
 
 
 
-
diff --git a/src/dubboerl_app.erl b/src/dubboerl_app.erl
index 0f51768..d27b690 100644
--- a/src/dubboerl_app.erl
+++ b/src/dubboerl_app.erl
@@ -29,9 +29,9 @@
 start(_StartType, _StartArgs) ->
     logger:info("[START] dubbo framework server start"),
     case dubboerl_sup:start_link() of
-        {ok,Pid} ->
+        {ok, Pid} ->
             init_default_hooks(),
-            {ok,Pid};
+            {ok, Pid};
         Result ->
             Result
     end.
@@ -43,13 +43,14 @@ stop(_State) ->
 %%====================================================================
 %% Internal functions
 %%====================================================================
-init_default_hooks()->
-    dubbo_extension:register(protocol,dubbo_protocol_dubbo,10),
-    dubbo_extension:register(protocol_wapper,dubbo_protocol_registry,10),
-
+init_default_hooks() ->
+    dubbo_extension:register(protocol, dubbo_protocol_dubbo, 10),
+    dubbo_extension:register(protocol_wapper, dubbo_protocol_registry, 10),
+    dubbo_extension:register(filter, application:get_env(dubboerl, cluster, dubbo_cluster_failfast), 1),
     ok.
 env_init() ->
     ets:new(?PROVIDER_IMPL_TABLE, [public, named_table]),
+    ets:new(?SERVICE_EXPORT_TABLE, [public, named_table]),
     dubbo_traffic_control:init(),
     dubbo_type_register:init(),
     register_type_list().
diff --git a/src/dubboerl_sup.erl b/src/dubboerl_sup.erl
index a0d2fb0..a7b2bac 100644
--- a/src/dubboerl_sup.erl
+++ b/src/dubboerl_sup.erl
@@ -41,20 +41,23 @@ start_link() ->
 %% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules}
 init([]) ->
     dubboerl_app:env_init(),
-    ZK = {dubbo_zookeeper, {dubbo_zookeeper, start_link, []}, transient, 5000, worker, [dubbo_zookeeper]},
-%%    NettySer = {dubbo_netty_client,{dubbo_netty_client, start_link, []},transient,5000,worker,[dubbo_netty_client]},
+    %% @todo registry need move registry sup
+    ZK = {dubbo_registry_zookeeper, {dubbo_registry_zookeeper, start_link, []}, transient, 5000, worker, [dubbo_registry_zookeeper]},
+
+    ExtensionSer = {dubbo_extension, {dubbo_extension, start_link, []}, transient, 5000, worker, [dubbo_extension]},
     Id_count = {dubbo_id_generator, {dubbo_id_generator, start_link, []}, transient, 5000, worker, [dubbo_id_generator]},
     ProviderPoolSup = {dubbo_provider_worker_sup, {dubbo_provider_worker_sup, start_link, []}, transient, 5000, supervisor, [dubbo_provider_worker_sup]},
     ConsumerPoolSup = {dubbo_transport_pool_sup, {dubbo_transport_pool_sup, start_link, []}, transient, 5000, supervisor, [dubbo_transport_pool_sup]},
     ConsumerPool = {dubbo_provider_consumer_reg_table, {dubbo_provider_consumer_reg_table, start_link, []}, transient, 5000, worker, [dubbo_provider_consumer_reg_table]},
-    ListNew1 =
-        case application:get_env(dubboerl, registry, false) of
-            true ->
-                [ZK];
-            false ->
-                []
-        end,
-    ListNew = ListNew1 ++ [Id_count, ConsumerPool, ConsumerPoolSup, ProviderPoolSup],
+    ShutdownSer = {dubbo_shutdown, {dubbo_shutdown, start_link, []}, transient, 10000, worker, [dubbo_shutdown]},
+%%    ListNew1 =
+%%        case application:get_env(dubboerl, registry, false) of
+%%            true ->
+%%                [ZK];
+%%            false ->
+%%                []
+%%        end,
+    ListNew = [Id_count, ExtensionSer, ZK, ConsumerPool, ConsumerPoolSup, ProviderPoolSup, ShutdownSer],
     {ok, {{one_for_one, 60, 10}, ListNew}}.
 
 %%====================================================================
diff --git a/test/consumer_SUITE.erl b/test/consumer_SUITE.erl
index ade6032..ef1fe19 100644
--- a/test/consumer_SUITE.erl
+++ b/test/consumer_SUITE.erl
@@ -141,14 +141,14 @@ lib_type_register(_Config) ->
     ok.
 
 json_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, json),
+    application:set_env(dubboerl, serialization, json),
     R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
     io:format(user, "json_sync_invoker result ~p ~n", [R1]),
     R2 = userOperator:genUserId(),
     io:format(user, "json_sync_invoker result2 ~p ~n", [R2]),
     ok.
 hessian_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, hessian),
+    application:set_env(dubboerl, serialization, hessian),
     R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
     io:format(user, "json_sync_invoker result ~p ~n", [R1]),
     R2 = userOperator:genUserId(),
diff --git a/test/dubbo_config_parser_tests.erl b/test/dubbo_config_parser_tests.erl
index 9ff0be4..060d4f5 100644
--- a/test/dubbo_config_parser_tests.erl
+++ b/test/dubbo_config_parser_tests.erl
@@ -19,7 +19,7 @@
 -include("dubbo.hrl").
 
 gen_provice_config_test() ->
-    ProviderConfigInfo = dubbo_config_util:gen_provider(<<"defaultApp">>, 20880, <<"org.apache.dubbo.test.interface">>, [method1], []),
+    ProviderConfigInfo = dubbo_config_util:gen_provider(<<"defaultApp">>, 20880, <<"org.apache.dubbo.test.interface">>, [method1],dubbo_service_user_impl, []),
     ProvideNode = dubbo_node_config_util:gen_provider_info(ProviderConfigInfo),
     ?assert(is_binary(ProvideNode)).
 
diff --git a/test/dubbo_consumer_pool_tests.erl b/test/dubbo_consumer_pool_tests.erl
index 0a67f1a..1ed5a6a 100644
--- a/test/dubbo_consumer_pool_tests.erl
+++ b/test/dubbo_consumer_pool_tests.erl
@@ -15,7 +15,6 @@
 %% limitations under the License.
 %%------------------------------------------------------------------------------
 -module(dubbo_consumer_pool_tests).
--author("dlive").
 
 -include_lib("eunit/include/eunit.hrl").
 -include("dubbo.hrl").
@@ -25,8 +24,8 @@ update_readonly_test() ->
     InterfaceName= <<"testinterfacename">>,
     HostFalg= <<"127.0.0.1/20880">>,
     ConnectionList = [
-        #connection_info{connection_id=1,pid= testpid,weight = 30,host_flag = HostFalg},
-        #connection_info{connection_id=2,pid= testpid2,weight = 30,host_flag = HostFalg}
+        #connection_info{pid= testpid,weight = 30,host_flag = HostFalg},
+        #connection_info{pid= testpid2,weight = 30,host_flag = HostFalg}
     ],
     dubbo_provider_consumer_reg_table:update_connection_info(InterfaceName,HostFalg,ConnectionList,true),
     {ok,Size} = dubbo_provider_consumer_reg_table:update_connection_readonly(testpid,false),
diff --git a/test/consumer_SUITE.erl b/test/dubbo_service_config_SUITE.erl
similarity index 62%
copy from test/consumer_SUITE.erl
copy to test/dubbo_service_config_SUITE.erl
index ade6032..41d8399 100644
--- a/test/consumer_SUITE.erl
+++ b/test/dubbo_service_config_SUITE.erl
@@ -14,46 +14,31 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(consumer_SUITE).
-%% API
--export([]).
+-module(dubbo_service_config_SUITE).
 
 -compile(export_all).
 
--include_lib("common_test/include/ct.hrl").
--include("dubbo_sample_service.hrl").
-%%--------------------------------------------------------------------
-%% Function: suite() -> Info
-%% Info = [tuple()]
-%%--------------------------------------------------------------------
 suite() ->
     [{timetrap, {seconds, 60}}].
 
-%%--------------------------------------------------------------------
-%% Function: init_per_suite(Config0) ->
-%%               Config1 | {skip,Reason} | {skip_and_save,Reason,Config1}
-%% Config0 = Config1 = [tuple()]
-%% Reason = term()
-%%--------------------------------------------------------------------
+
+
 init_per_suite(Config) ->
     logger:add_handler(testttt, logger_std_h, #{
-        level => debug
+        level => all
     }),
     Start = application:ensure_all_started(dubboerl),
-%%  dubboerl:init(),
-    dubboerl:start_provider(),
-    timer:sleep(2000),
-    dubboerl:start_consumer(),
-    dubbo_sample_service_app:register_type_list(),
-    timer:sleep(5000),
-    io:format(user, "test case start info ~p~n", [Start]),
-    [{appid, 1}].
+%%    dubboerl:init(),
+    io:format(user, "test case start dubboerl info ~p~n", [Start]),
+    [].
 
 %%--------------------------------------------------------------------
 %% Function: end_per_suite(Config0) -> term() | {save_config,Config1}
 %% Config0 = Config1 = [tuple()]
 %%--------------------------------------------------------------------
 end_per_suite(_Config) ->
+    application:stop(dubboerl),
+    timer:sleep(1000),
     ok.
 
 %%--------------------------------------------------------------------
@@ -109,7 +94,7 @@ end_per_testcase(_TestCase, _Config) ->
 %%--------------------------------------------------------------------
 groups() ->
     [
-        {consumer1, [sequence], [lib_type_register, json_sync_invoker, hessian_sync_invoker]}
+        {service_test, [sequence], [export_interface]}
     ].
 
 %%--------------------------------------------------------------------
@@ -120,37 +105,11 @@ groups() ->
 %% Reason = term()
 %%--------------------------------------------------------------------
 all() ->
-    [{group, consumer1}].
+    [{group, service_test}].
 
-%%--------------------------------------------------------------------
-%% Function: TestCase() -> Info
-%% Info = [tuple()]
-%%--------------------------------------------------------------------
-lib_type_register() ->
-    [].
-
-%%--------------------------------------------------------------------
-%% Function: TestCase(Config0) ->
-%%               ok | exit() | {skip,Reason} | {comment,Comment} |
-%%               {save_config,Config1} | {skip_and_save,Reason,Config1}
-%% Config0 = Config1 = [tuple()]
-%% Reason = term()
-%% Comment = term()
-%%--------------------------------------------------------------------
-lib_type_register(_Config) ->
-    ok.
 
-json_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, json),
-    R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
-    io:format(user, "json_sync_invoker result ~p ~n", [R1]),
-    R2 = userOperator:genUserId(),
-    io:format(user, "json_sync_invoker result2 ~p ~n", [R2]),
-    ok.
-hessian_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, hessian),
-    R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
-    io:format(user, "json_sync_invoker result ~p ~n", [R1]),
-    R2 = userOperator:genUserId(),
-    io:format(user, "json_sync_invoker result2 ~p ~n", [R2]),
+export_interface(_Config) ->
+    MethodList = apply(userOperator, get_method_999_list, []),
+    ProviderInfo = dubbo_config_util:gen_provider(<<"test-application">>, 20880, <<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>, MethodList, dubbo_service_user_impl, []),
+    dubbo_service_config:export(ProviderInfo),
     ok.
\ No newline at end of file
diff --git a/test/dubbo_zookeeper_tests.erl b/test/dubbo_zookeeper_tests.erl
deleted file mode 100644
index 499feb6..0000000
--- a/test/dubbo_zookeeper_tests.erl
+++ /dev/null
@@ -1,25 +0,0 @@
-%%------------------------------------------------------------------------------
-%% Licensed to the Apache Software Foundation (ASF) under one or more
-%% contributor license agreements.  See the NOTICE file distributed with
-%% this work for additional information regarding copyright ownership.
-%% The ASF licenses this file to You under the Apache License, Version 2.0
-%% (the "License"); you may not use this file except in compliance with
-%% the License.  You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%------------------------------------------------------------------------------
--module(dubbo_zookeeper_tests).
--include_lib("eunit/include/eunit.hrl").
--include("dubbo.hrl").
-exist_test() ->
-    Consumer = #consumer_config{interface = <<"com.ifcoder.demo.facade.User">>,
-        methods = [<<"a">>, <<"b">>]},
-    V = dubbo_zookeeper:gen_consumer_node_info(Consumer),
-    ?debugFmt("consumer info ~p", [V]),
-    ?assert(is_binary(V)).
diff --git a/test/consumer_SUITE.erl b/test/reference_config_SUITE.erl
similarity index 73%
copy from test/consumer_SUITE.erl
copy to test/reference_config_SUITE.erl
index ade6032..4005311 100644
--- a/test/consumer_SUITE.erl
+++ b/test/reference_config_SUITE.erl
@@ -14,10 +14,9 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%------------------------------------------------------------------------------
--module(consumer_SUITE).
-%% API
--export([]).
+-module(reference_config_SUITE).
 
+%% API
 -compile(export_all).
 
 -include_lib("common_test/include/ct.hrl").
@@ -40,12 +39,7 @@ init_per_suite(Config) ->
         level => debug
     }),
     Start = application:ensure_all_started(dubboerl),
-%%  dubboerl:init(),
-    dubboerl:start_provider(),
-    timer:sleep(2000),
-    dubboerl:start_consumer(),
-    dubbo_sample_service_app:register_type_list(),
-    timer:sleep(5000),
+%%    dubboerl:init(),
     io:format(user, "test case start info ~p~n", [Start]),
     [{appid, 1}].
 
@@ -109,7 +103,7 @@ end_per_testcase(_TestCase, _Config) ->
 %%--------------------------------------------------------------------
 groups() ->
     [
-        {consumer1, [sequence], [lib_type_register, json_sync_invoker, hessian_sync_invoker]}
+        {reference_test, [sequence], [registry_interface]}
     ].
 
 %%--------------------------------------------------------------------
@@ -120,37 +114,11 @@ groups() ->
 %% Reason = term()
 %%--------------------------------------------------------------------
 all() ->
-    [{group, consumer1}].
-
-%%--------------------------------------------------------------------
-%% Function: TestCase() -> Info
-%% Info = [tuple()]
-%%--------------------------------------------------------------------
-lib_type_register() ->
-    [].
+    [{group, reference_test}].
 
-%%--------------------------------------------------------------------
-%% Function: TestCase(Config0) ->
-%%               ok | exit() | {skip,Reason} | {comment,Comment} |
-%%               {save_config,Config1} | {skip_and_save,Reason,Config1}
-%% Config0 = Config1 = [tuple()]
-%% Reason = term()
-%% Comment = term()
-%%--------------------------------------------------------------------
-lib_type_register(_Config) ->
-    ok.
 
-json_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, json),
-    R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
-    io:format(user, "json_sync_invoker result ~p ~n", [R1]),
-    R2 = userOperator:genUserId(),
-    io:format(user, "json_sync_invoker result2 ~p ~n", [R2]),
-    ok.
-hessian_sync_invoker(_Config) ->
-    application:set_env(dubboerl, protocol, hessian),
-    R1 = userOperator:queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}),
-    io:format(user, "json_sync_invoker result ~p ~n", [R1]),
-    R2 = userOperator:genUserId(),
-    io:format(user, "json_sync_invoker result2 ~p ~n", [R2]),
+registry_interface(_Config) ->
+    ConsumerInfo = dubbo_config_util:gen_consumer("test_app", <<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>, []),
+%%        dubbo_zookeeper:register_consumer(ConsumerInfo),
+    dubbo_reference_config:init_reference(ConsumerInfo),
     ok.
\ No newline at end of file
diff --git a/test/userOperator.erl b/test/userOperator.erl
index e8567ae..0d35917 100644
--- a/test/userOperator.erl
+++ b/test/userOperator.erl
@@ -67,7 +67,7 @@ getUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec genUserId()->
@@ -96,7 +96,7 @@ genUserId( RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserInfo(Arg0::#userInfoRequest{})->
@@ -127,7 +127,7 @@ queryUserInfo(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 
 -spec queryUserList(Arg0::list())->
@@ -158,7 +158,7 @@ queryUserList(Arg0, RequestOption)->
         ]
     },
     Request = dubbo_adapter:reference(Data),
-    dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+    dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
 
 test() ->
     queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}).
\ No newline at end of file