You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/07/26 08:44:08 UTC

[GitHub] [dubbo-go] fangyincheng opened a new pull request #673: [WIN]Rft: network & codec

fangyincheng opened a new pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673


   **What this PR does**:
   
   * merge branch dubbo-protocol-restructure
   * refact  codec
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] AlexStocks commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
AlexStocks commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r491360725



##########
File path: remoting/getty/readwriter.go
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+	"reflect"
+)
+
+import (
+	"github.com/apache/dubbo-getty"
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+////////////////////////////////////////////
+// RpcClientPackageHandler
+////////////////////////////////////////////
+
+// RpcClientPackageHandler Read data from server and Write data to server
+type RpcClientPackageHandler struct {
+	client *Client
+}
+
+// NewRpcClientPackageHandler create a RpcClientPackageHandler
+func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
+	return &RpcClientPackageHandler{client: client}
+}
+
+// Read data from server. if the package size from server is larger than 4096 byte, server will read 4096 byte
+// and send to client each time. the Read can assemble it.
+func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
+	resp, length, err := (p.client.codec).Decode(data)
+	//err := pkg.Unmarshal(buf, p.client)
+	if err != nil {
+		err = perrors.Cause(err)
+		if err == hessian.ErrHeaderNotEnough || err == hessian.ErrBodyNotEnough {
+			return nil, 0, nil
+		}
+
+		logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err)
+
+		return nil, length, err
+	}
+
+	return resp, length, nil
+}
+
+// Write send the data to server
+func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
+	req, ok := pkg.(*remoting.Request)
+	if !ok {
+		logger.Errorf("illegal pkg:%+v\n", pkg)
+		return nil, perrors.New("invalid rpc request")
+	}
+
+	buf, err := (p.client.codec).EncodeRequest(req)
+	if err != nil {
+		logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err))
+		return nil, perrors.WithStack(err)
+	}
+
+	return buf.Bytes(), nil
+}
+
+////////////////////////////////////////////
+// RpcServerPackageHandler
+////////////////////////////////////////////
+
+//var (
+//	rpcServerPkgHandler = &RpcServerPackageHandler{}
+//)
+
+// RpcServerPackageHandler Read data from client and Write data to client
+type RpcServerPackageHandler struct {
+	server *Server
+}
+
+func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
+	return &RpcServerPackageHandler{server: server}
+}
+
+// Read data from client. if the package size from client is larger than 4096 byte, client will read 4096 byte
+// and send to client each time. the Read can assemble it.
+func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
+	req, length, err := (p.server.codec).Decode(data)
+	//resp,len, err := (*p.).DecodeResponse(buf)
+

Review comment:
       pls do not left a blank line btw assignment and the if condition checker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] zouyx commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
zouyx commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r486099772



##########
File path: remoting/getty/getty_server.go
##########
@@ -73,47 +76,58 @@ func init() {
 	if err := srvConf.CheckValidity(); err != nil {
 		panic(err)
 	}
-	setServerGrpool()
+	SetServerGrpool()
 }
 
-// SetServerConfig set dubbo server config.
+// SetServerConfig ...
 func SetServerConfig(s ServerConfig) {
 	srvConf = &s
 	err := srvConf.CheckValidity()
 	if err != nil {
 		logger.Warnf("[ServerConfig CheckValidity] error: %v", err)
 		return
 	}
-	setServerGrpool()
+	SetServerGrpool()
 }
 
-// GetServerConfig get dubbo server config.
+// GetServerConfig ...
 func GetServerConfig() ServerConfig {
 	return *srvConf
 }
 
-func setServerGrpool() {
+// SetServerGrpool ...

Review comment:
       add some comments

##########
File path: remoting/getty/getty_server.go
##########
@@ -73,47 +76,58 @@ func init() {
 	if err := srvConf.CheckValidity(); err != nil {
 		panic(err)
 	}
-	setServerGrpool()
+	SetServerGrpool()
 }
 
-// SetServerConfig set dubbo server config.
+// SetServerConfig ...

Review comment:
       ```suggestion
   // SetServerConfig set dubbo server config.
   ```

##########
File path: remoting/getty/getty_server.go
##########
@@ -170,18 +184,18 @@ func (s *Server) newSession(session getty.Session) error {
 	return nil
 }
 
-// Start start dubbo server.
-func (s *Server) Start(url common.URL) {
+// Start ...

Review comment:
       ```suggestion
   // Start dubbo server.
   ```

##########
File path: remoting/getty/getty_server.go
##########
@@ -73,47 +76,58 @@ func init() {
 	if err := srvConf.CheckValidity(); err != nil {
 		panic(err)
 	}
-	setServerGrpool()
+	SetServerGrpool()
 }
 
-// SetServerConfig set dubbo server config.
+// SetServerConfig ...
 func SetServerConfig(s ServerConfig) {
 	srvConf = &s
 	err := srvConf.CheckValidity()
 	if err != nil {
 		logger.Warnf("[ServerConfig CheckValidity] error: %v", err)
 		return
 	}
-	setServerGrpool()
+	SetServerGrpool()
 }
 
-// GetServerConfig get dubbo server config.
+// GetServerConfig ...

Review comment:
       revert code

##########
File path: remoting/getty/getty_server.go
##########
@@ -73,47 +76,58 @@ func init() {
 	if err := srvConf.CheckValidity(); err != nil {
 		panic(err)
 	}
-	setServerGrpool()
+	SetServerGrpool()
 }
 
-// SetServerConfig set dubbo server config.
+// SetServerConfig ...
 func SetServerConfig(s ServerConfig) {
 	srvConf = &s
 	err := srvConf.CheckValidity()
 	if err != nil {
 		logger.Warnf("[ServerConfig CheckValidity] error: %v", err)
 		return
 	}
-	setServerGrpool()
+	SetServerGrpool()
 }
 
-// GetServerConfig get dubbo server config.
+// GetServerConfig ...
 func GetServerConfig() ServerConfig {
 	return *srvConf
 }
 
-func setServerGrpool() {
+// SetServerGrpool ...
+func SetServerGrpool() {
 	if srvConf.GrPoolSize > 1 {
 		srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
 			gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber))
 	}
 }
 
-// Server is dubbo protocol server.
+// Server ...
 type Server struct {
-	conf       ServerConfig
-	tcpServer  getty.Server
-	rpcHandler *RpcServerHandler
+	conf           ServerConfig
+	addr           string
+	codec          remoting.Codec
+	tcpServer      getty.Server
+	rpcHandler     *RpcServerHandler
+	requestHandler func(*invocation.RPCInvocation) protocol.RPCResult
 }
 
-// NewServer create a new Server.
-func NewServer() *Server {
+// NewServer ...

Review comment:
       revert code

##########
File path: remoting/getty/getty_server.go
##########
@@ -191,12 +205,12 @@ func (s *Server) Start(url common.URL) {
 		)
 	}
 	tcpServer.RunEventLoop(s.newSession)
-	logger.Debugf("s bind addr{%s} ok!", addr)
+	logger.Debugf("s bind addr{%s} ok!", s.addr)
 	s.tcpServer = tcpServer
 
 }
 
-// Stop stop dubbo server.
+// Stop ...

Review comment:
       as above

##########
File path: remoting/getty/config.go
##########
@@ -30,7 +30,7 @@ import (
 )
 
 type (
-	// GettySessionParam is session configuration for getty.
+	// GettySessionParam ...

Review comment:
       revert comment in this file

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)

Review comment:
       what is this lock use for?

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -45,48 +60,43 @@ var (
 	dubboProtocol *DubboProtocol
 )
 
-// DubboProtocol is a dubbo protocol implement.
+// It support dubbo protocol. It implements Protocol interface for dubbo protocol.
 type DubboProtocol struct {
 	protocol.BaseProtocol
-	serverMap  map[string]*Server
+	// It is store relationship about serviceKey(group/interface:version) and ExchangeServer
+	// The ExchangeServer is introduced to replace of Server. Because Server is depend on getty directly.
+	serverMap  map[string]*remoting.ExchangeServer
 	serverLock sync.Mutex
 }
 
-// NewDubboProtocol create a dubbo protocol.
+// nolint
 func NewDubboProtocol() *DubboProtocol {
 	return &DubboProtocol{
 		BaseProtocol: protocol.NewBaseProtocol(),
-		serverMap:    make(map[string]*Server),
+		serverMap:    make(map[string]*remoting.ExchangeServer),
 	}
 }
 
-// Export export dubbo service.
+// nolint

Review comment:
       as above

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -45,48 +60,43 @@ var (
 	dubboProtocol *DubboProtocol
 )
 
-// DubboProtocol is a dubbo protocol implement.
+// It support dubbo protocol. It implements Protocol interface for dubbo protocol.
 type DubboProtocol struct {
 	protocol.BaseProtocol
-	serverMap  map[string]*Server
+	// It is store relationship about serviceKey(group/interface:version) and ExchangeServer
+	// The ExchangeServer is introduced to replace of Server. Because Server is depend on getty directly.
+	serverMap  map[string]*remoting.ExchangeServer
 	serverLock sync.Mutex
 }
 
-// NewDubboProtocol create a dubbo protocol.
+// nolint

Review comment:
       why no lint?

##########
File path: protocol/jsonrpc/jsonrpc_invoker.go
##########
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/dubbo-go/common/constant"
 	"github.com/apache/dubbo-go/common/logger"
 	"github.com/apache/dubbo-go/protocol"
+

Review comment:
       revert this line

##########
File path: protocol/dubbo/impl/proto.go
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+	"bytes"
+	"encoding/binary"
+	"fmt"
+	"github.com/apache/dubbo-go/common/logger"

Review comment:
       format 

##########
File path: protocol/jsonrpc/http_test.go
##########
@@ -75,7 +75,7 @@ func TestHTTPClientCall(t *testing.T) {
 
 	// call GetUser
 	ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
-		"X-Proxy-Id": "dubbogo",
+		"X-Proxy-ID": "dubbogo",

Review comment:
       if you change this ID........will it have a problem , when consumer is old version, but provider is new version

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       ```suggestion
   	if err != nil {
   		return ctx
   	}
   	return context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] AlexStocks commented on a change in pull request #673: [WIP]Rft: network & codec

Posted by GitBox <gi...@apache.org>.
AlexStocks commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r479644513



##########
File path: common/url.go
##########
@@ -658,6 +662,32 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f
 	return methodConfigMergeFcn
 }
 
+// doesn't encode url reserve character, url.QueryEscape will do this work

Review comment:
       I do now know what ur meaning from this explanation. maybe u'd better add some code examples.

##########
File path: common/url.go
##########
@@ -658,6 +662,32 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f
 	return methodConfigMergeFcn
 }
 
+// doesn't encode url reserve character, url.QueryEscape will do this work
+// reference: https://github.com/golang/go.git, src/net/url/url.go, Encode method
+func ParamsUnescapeEncode(params url.Values) string {
+	if params == nil {
+		return ""
+	}
+	var buf strings.Builder
+	keys := make([]string, len(params))

Review comment:
       keys := make([]string, 0, len(params))   ?

##########
File path: protocol/dubbo/impl/codec.go
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+	"bufio"
+	"encoding/binary"
+	"github.com/apache/dubbo-go/common/constant"

Review comment:
       omg
   

##########
File path: remoting/getty/listener.go
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	"github.com/dubbogo/getty"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+// todo: WritePkg_Timeout will entry *.yml
+const (
+	// WritePkg_Timeout ...
+	WritePkg_Timeout = 5 * time.Second
+)
+
+var (
+	errTooManySessions = perrors.New("too many sessions")
+)
+
+type rpcSession struct {
+	session getty.Session
+	reqNum  int32
+}
+
+func (s *rpcSession) AddReqNum(num int32) {
+	atomic.AddInt32(&s.reqNum, num)
+}
+
+func (s *rpcSession) GetReqNum() int32 {
+	return atomic.LoadInt32(&s.reqNum)
+}
+
+// //////////////////////////////////////////
+// RpcClientHandler
+// //////////////////////////////////////////
+
+// RpcClientHandler ...
+type RpcClientHandler struct {
+	conn *gettyRPCClient
+}
+
+// NewRpcClientHandler ...
+func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler {
+	return &RpcClientHandler{conn: client}
+}
+
+// OnOpen ...
+func (h *RpcClientHandler) OnOpen(session getty.Session) error {
+	h.conn.addSession(session)
+	return nil
+}
+
+// OnError ...
+func (h *RpcClientHandler) OnError(session getty.Session, err error) {
+	logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
+	h.conn.removeSession(session)
+}
+
+// OnClose ...
+func (h *RpcClientHandler) OnClose(session getty.Session) {
+	logger.Infof("session{%s} is closing......", session.Stat())
+	h.conn.removeSession(session)
+}
+
+// OnMessage ...
+func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
+	result, ok := pkg.(remoting.DecodeResult)
+	if !ok {
+		logger.Errorf("illegal package")
+		return
+	}
+	// get heartbeart request from server
+	if result.IsRequest {
+		req := result.Result.(*remoting.Request)
+		if req.Event {
+			logger.Debugf("get rpc heartbeat request{%#v}", req)
+			resp := remoting.NewResponse(req.ID, req.Version)
+			resp.Status = hessian.Response_OK
+			resp.Event = req.Event
+			resp.SerialID = req.SerialID
+			resp.Version = "2.0.2"
+			reply(session, resp, hessian.PackageHeartbeat)
+			return
+		}
+		logger.Errorf("illegal request but not heartbeart. {%#v}", req)
+		return
+	}
+
+	p := result.Result.(*remoting.Response)
+	// get heartbeart
+	if p.Event {
+		logger.Debugf("get rpc heartbeat response{%#v}", p)
+		if p.Error != nil {
+			logger.Errorf("rpc heartbeat response{error: %#v}", p.Error)
+		}
+		h.conn.pool.rpcClient.responseHandler.Handler(p)
+		return
+	}
+	if result.IsRequest {
+		logger.Errorf("illegal package for it is response type. {%#v}", pkg)
+		return
+	}
+
+	logger.Debugf("get rpc response{%#v}", p)
+
+	h.conn.updateSession(session)
+
+	h.conn.pool.rpcClient.responseHandler.Handler(p)
+}
+
+// OnCron ...
+func (h *RpcClientHandler) OnCron(session getty.Session) {
+	rpcSession, err := h.conn.getClientRpcSession(session)
+	if err != nil {
+		logger.Errorf("client.getClientSession(session{%s}) = error{%v}",
+			session.Stat(), perrors.WithStack(err))
+		return
+	}
+	if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
+		logger.Warnf("session{%s} timeout{%s}, reqNum{%d}",
+			session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
+		h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn)
+		return
+	}
+
+	h.conn.pool.rpcClient.heartbeat(session)
+}
+
+// //////////////////////////////////////////
+// RpcServerHandler
+// //////////////////////////////////////////
+
+// RpcServerHandler ...

Review comment:
       add comment

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"

Review comment:
       split it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r484097336



##########
File path: common/url.go
##########
@@ -658,6 +662,32 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f
 	return methodConfigMergeFcn
 }
 
+// doesn't encode url reserve character, url.QueryEscape will do this work

Review comment:
       see next line “reference: https://github.com/golang/go.git, src/net/url/url.go, Encode method” 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] AlexStocks commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
AlexStocks commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r487654867



##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {
+		return c.encodeHeartbeartReqeust(request)
+	}
+
+	invoc, ok := request.Data.(*protocol.Invocation)
+	if !ok {
+		err := perrors.Errorf("encode request failed for parameter type :%+v", request)
+		logger.Errorf(err.Error())
+		return nil, err
+	}
+	invocation := *invoc
+
+	svc := impl.Service{}
+	svc.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
+	svc.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "")
+	svc.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "")
+	svc.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "")
+	svc.Method = invocation.MethodName()
+	timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_REMOTING_TIMEOUT)))
+	if err != nil {
+		// it will be wrapped in readwrite.Write .
+		return nil, perrors.WithStack(err)
+	}
+	svc.Timeout = time.Duration(timeout)
+
+	header := impl.DubboHeader{}
+	serialization := invocation.AttachmentsByKey(constant.SERIALIZATION_KEY, constant.HESSIAN2_SERIALIZATION)
+	if serialization == constant.PROTOBUF_SERIALIZATION {
+		header.SerialID = constant.S_Proto
+	} else {
+		header.SerialID = constant.S_Hessian2
+	}
+	header.ID = request.ID
+	if request.TwoWay {
+		header.Type = impl.PackageRequest_TwoWay
+	} else {
+		header.Type = impl.PackageRequest
+	}
+
+	pkg := &impl.DubboPackage{
+		Header:  header,
+		Service: svc,
+		Body:    impl.NewRequestPayload(invocation.Arguments(), invocation.Attachments()),
+		Err:     nil,
+		Codec:   impl.NewDubboCodec(nil),
+	}
+
+	if err := impl.LoadSerializer(pkg); err != nil {
+		return nil, perrors.WithStack(err)
+	}
+
+	return pkg.Marshal()
+}
+
+// encode heartbeart request
+func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) {
+	header := impl.DubboHeader{
+		Type:     impl.PackageHeartbeat,
+		SerialID: constant.S_Hessian2,
+		ID:       request.ID,
+	}
+
+	pkg := &impl.DubboPackage{
+		Header:  header,
+		Service: impl.Service{},
+		Body:    impl.NewRequestPayload([]interface{}{}, nil),
+		Err:     nil,
+		Codec:   impl.NewDubboCodec(nil),
+	}
+
+	if err := impl.LoadSerializer(pkg); err != nil {
+		return nil, err
+	}
+	return pkg.Marshal()
+}
+
+// encode response
+func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) {
+	var ptype = impl.PackageResponse
+	if response.IsHeartbeat() {
+		ptype = impl.PackageHeartbeat
+	}
+	resp := &impl.DubboPackage{
+		Header: impl.DubboHeader{
+			SerialID:       response.SerialID,
+			Type:           ptype,
+			ID:             response.ID,
+			ResponseStatus: response.Status,
+		},
+	}
+	if !response.IsHeartbeat() {
+		resp.Body = &impl.ResponsePayload{
+			RspObj:      response.Result.(protocol.RPCResult).Rest,
+			Exception:   response.Result.(protocol.RPCResult).Err,
+			Attachments: response.Result.(protocol.RPCResult).Attrs,
+		}
+	}
+
+	codec := impl.NewDubboCodec(nil)
+
+	pkg, err := codec.Encode(*resp)
+	if err != nil {
+		return nil, perrors.WithStack(err)
+	}
+
+	return bytes.NewBuffer(pkg), nil
+}
+
+// Decode data, including request and response.
+func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
+	if c.isRequest(data) {
+		req, len, err := c.decodeRequest(data)
+		if err != nil {
+			return remoting.DecodeResult{}, len, perrors.WithStack(err)
+		}
+		return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
+	} else {

Review comment:
       pls delete the else condition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r487384058



##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       delete it

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       delete it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] codecov-commenter commented on pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#issuecomment-691684531


   # [Codecov](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=h1) Report
   > Merging [#673](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=desc) into [develop](https://codecov.io/gh/apache/dubbo-go/commit/e18e1ab63f23e06b10fa9e80333598c747c9aa0b?el=desc) will **decrease** coverage by `1.62%`.
   > The diff coverage is `47.70%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/dubbo-go/pull/673/graphs/tree.svg?width=650&height=150&src=pr&token=dcPE6RyFAL)](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           develop     #673      +/-   ##
   ===========================================
   - Coverage    63.42%   61.79%   -1.63%     
   ===========================================
     Files          252      252              
     Lines        14302    14263      -39     
   ===========================================
   - Hits          9071     8814     -257     
   - Misses        4306     4522     +216     
   - Partials       925      927       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [common/url.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-Y29tbW9uL3VybC5nbw==) | `62.89% <0.00%> (+0.72%)` | :arrow_up: |
   | [protocol/dubbo/impl/response.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cHJvdG9jb2wvZHViYm8vaW1wbC9yZXNwb25zZS5nbw==) | `0.00% <0.00%> (ø)` | |
   | [protocol/dubbo/impl/serialize.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cHJvdG9jb2wvZHViYm8vaW1wbC9zZXJpYWxpemUuZ28=) | `0.00% <0.00%> (ø)` | |
   | [protocol/jsonrpc/jsonrpc\_invoker.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cHJvdG9jb2wvanNvbnJwYy9qc29ucnBjX2ludm9rZXIuZ28=) | `100.00% <ø> (ø)` | |
   | [remoting/getty/config.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cmVtb3RpbmcvZ2V0dHkvY29uZmlnLmdv) | `13.15% <ø> (ø)` | |
   | [protocol/dubbo/impl/hessian.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cHJvdG9jb2wvZHViYm8vaW1wbC9oZXNzaWFuLmdv) | `31.68% <31.68%> (ø)` | |
   | [remoting/getty/listener.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cmVtb3RpbmcvZ2V0dHkvbGlzdGVuZXIuZ28=) | `39.49% <39.49%> (ø)` | |
   | [protocol/dubbo/impl/codec.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cHJvdG9jb2wvZHViYm8vaW1wbC9jb2RlYy5nbw==) | `41.13% <41.13%> (ø)` | |
   | [remoting/getty/getty\_client.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cmVtb3RpbmcvZ2V0dHkvZ2V0dHlfY2xpZW50Lmdv) | `46.93% <46.93%> (ø)` | |
   | [remoting/getty/pool.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cmVtb3RpbmcvZ2V0dHkvcG9vbC5nbw==) | `62.74% <55.55%> (ø)` | |
   | ... and [43 more](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=footer). Last update [e18e1ab...b5b8c0e](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r487384147



##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       I think so




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] AlexStocks merged pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
AlexStocks merged pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] pantianying commented on a change in pull request #673: [WIP]Rft: network & codec

Posted by GitBox <gi...@apache.org>.
pantianying commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r471865516



##########
File path: remoting/getty/listener.go
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	"github.com/dubbogo/getty"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+// todo: WritePkg_Timeout will entry *.yml

Review comment:
       Here's a todo, don't forget

##########
File path: remoting/getty/listener.go
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	"github.com/dubbogo/getty"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+// todo: WritePkg_Timeout will entry *.yml
+const (
+	// WritePkg_Timeout ...
+	WritePkg_Timeout = 5 * time.Second
+)
+
+var (
+	errTooManySessions = perrors.New("too many sessions")
+)
+
+type rpcSession struct {
+	session getty.Session
+	reqNum  int32
+}
+
+func (s *rpcSession) AddReqNum(num int32) {
+	atomic.AddInt32(&s.reqNum, num)
+}
+
+func (s *rpcSession) GetReqNum() int32 {
+	return atomic.LoadInt32(&s.reqNum)
+}
+
+// //////////////////////////////////////////
+// RpcClientHandler
+// //////////////////////////////////////////
+
+// RpcClientHandler ...
+type RpcClientHandler struct {
+	conn *gettyRPCClient
+}
+
+// NewRpcClientHandler ...
+func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler {
+	return &RpcClientHandler{conn: client}
+}
+
+// OnOpen ...
+func (h *RpcClientHandler) OnOpen(session getty.Session) error {
+	h.conn.addSession(session)
+	return nil
+}
+
+// OnError ...
+func (h *RpcClientHandler) OnError(session getty.Session, err error) {
+	logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
+	h.conn.removeSession(session)
+}
+
+// OnClose ...
+func (h *RpcClientHandler) OnClose(session getty.Session) {
+	logger.Infof("session{%s} is closing......", session.Stat())
+	h.conn.removeSession(session)
+}
+
+// OnMessage ...
+func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
+	result, ok := pkg.(remoting.DecodeResult)
+	if !ok {
+		logger.Errorf("illegal package")
+		return
+	}
+	// get heartbeart request from server
+	if result.IsRequest {
+		req := result.Result.(*remoting.Request)
+		if req.Event {
+			logger.Debugf("get rpc heartbeat request{%#v}", req)
+			resp := remoting.NewResponse(req.ID, req.Version)
+			resp.Status = hessian.Response_OK
+			resp.Event = req.Event
+			resp.SerialID = req.SerialID
+			resp.Version = "2.0.2"
+			reply(session, resp, hessian.PackageHeartbeat)
+			return
+		}
+		logger.Errorf("illegal request but not heartbeart. {%#v}", req)
+		return
+	}
+
+	p := result.Result.(*remoting.Response)
+	// get heartbeart
+	if p.Event {
+		logger.Debugf("get rpc heartbeat response{%#v}", p)
+		if p.Error != nil {
+			logger.Errorf("rpc heartbeat response{error: %#v}", p.Error)
+		}
+		h.conn.pool.rpcClient.responseHandler.Handler(p)
+		return
+	}
+	if result.IsRequest {
+		logger.Errorf("illegal package for it is response type. {%#v}", pkg)
+		return
+	}
+
+	logger.Debugf("get rpc response{%#v}", p)
+
+	h.conn.updateSession(session)
+
+	h.conn.pool.rpcClient.responseHandler.Handler(p)
+}
+
+// OnCron ...
+func (h *RpcClientHandler) OnCron(session getty.Session) {
+	rpcSession, err := h.conn.getClientRpcSession(session)
+	if err != nil {
+		logger.Errorf("client.getClientSession(session{%s}) = error{%v}",
+			session.Stat(), perrors.WithStack(err))
+		return
+	}
+	if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
+		logger.Warnf("session{%s} timeout{%s}, reqNum{%d}",
+			session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
+		h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn)
+		return
+	}
+
+	h.conn.pool.rpcClient.heartbeat(session)
+}
+
+// //////////////////////////////////////////
+// RpcServerHandler
+// //////////////////////////////////////////
+
+// RpcServerHandler ...
+type RpcServerHandler struct {
+	maxSessionNum  int
+	sessionTimeout time.Duration
+	sessionMap     map[getty.Session]*rpcSession
+	rwlock         sync.RWMutex
+	server         *Server
+}
+
+// NewRpcServerHandler ...
+func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration, serverP *Server) *RpcServerHandler {
+	return &RpcServerHandler{
+		maxSessionNum:  maxSessionNum,
+		sessionTimeout: sessionTimeout,
+		sessionMap:     make(map[getty.Session]*rpcSession),
+		server:         serverP,
+	}
+}
+
+// OnOpen ...
+func (h *RpcServerHandler) OnOpen(session getty.Session) error {
+	var err error
+	h.rwlock.RLock()
+	if h.maxSessionNum <= len(h.sessionMap) {
+		err = errTooManySessions
+	}
+	h.rwlock.RUnlock()
+	if err != nil {
+		return perrors.WithStack(err)
+	}
+
+	logger.Infof("got session:%s", session.Stat())
+	h.rwlock.Lock()
+	h.sessionMap[session] = &rpcSession{session: session}
+	h.rwlock.Unlock()
+	return nil
+}
+
+// OnError ...
+func (h *RpcServerHandler) OnError(session getty.Session, err error) {
+	logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
+	h.rwlock.Lock()
+	delete(h.sessionMap, session)
+	h.rwlock.Unlock()
+}
+
+// OnClose ...
+func (h *RpcServerHandler) OnClose(session getty.Session) {
+	logger.Infof("session{%s} is closing......", session.Stat())
+	h.rwlock.Lock()
+	delete(h.sessionMap, session)
+	h.rwlock.Unlock()
+}
+
+// OnMessage ...
+func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
+	h.rwlock.Lock()

Review comment:
       I think it is better use lock then rwlock here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] flycash commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
flycash commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r486390859



##########
File path: common/url.go
##########
@@ -686,6 +690,32 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f
 	return methodConfigMergeFcn
 }
 
+// ParamsUnescapeEncode doesn't encode url reserve character, url.QueryEscape will do this work
+// reference: https://github.com/golang/go.git, src/net/url/url.go, Encode method
+func ParamsUnescapeEncode(params url.Values) string {
+	if params == nil {
+		return ""
+	}
+	var buf strings.Builder
+	keys := make([]string, 0, len(params))
+	for k := range params {
+		keys = append(keys, k)
+	}
+	sort.Strings(keys)
+	for _, k := range keys {
+		vs := params[k]
+		for _, v := range vs {
+			if buf.Len() > 0 {
+				buf.WriteByte('&')
+			}
+			buf.WriteString(k)
+			buf.WriteByte('=')
+			buf.WriteString(v)

Review comment:
       Do we need `UrlEncode` the value ? I mean, does it work if `value` has some special characters like `/,?,%`

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {
+		return c.encodeHeartbeartReqeust(request)
+	}
+
+	invoc, ok := request.Data.(*protocol.Invocation)
+	if !ok {
+		logger.Errorf("encode request failed for parameter type :%+v", request)

Review comment:
       err := fmt.Sprintf("encode request failed for parameter type :%+v", request)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r484096586



##########
File path: remoting/getty/listener.go
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	"github.com/dubbogo/getty"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+// todo: WritePkg_Timeout will entry *.yml

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r487384058



##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       delete it

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       delete it

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       I think so

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       modifed

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       I think less "return “  is better.

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       delete it

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       delete it

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       I think so

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       modifed

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       I think less "return “  is better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] codecov-commenter edited a comment on pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#issuecomment-691684531


   # [Codecov](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=h1) Report
   > Merging [#673](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=desc) into [develop](https://codecov.io/gh/apache/dubbo-go/commit/163d645c62e87907a2ee48afbfc86eab103c13d8?el=desc) will **decrease** coverage by `1.65%`.
   > The diff coverage is `44.37%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/dubbo-go/pull/673/graphs/tree.svg?width=650&height=150&src=pr&token=dcPE6RyFAL)](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           develop     #673      +/-   ##
   ===========================================
   - Coverage    61.44%   59.78%   -1.66%     
   ===========================================
     Files          251      259       +8     
     Lines        12084    12722     +638     
   ===========================================
   + Hits          7425     7606     +181     
   - Misses        3766     4166     +400     
   - Partials       893      950      +57     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [common/url.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-Y29tbW9uL3VybC5nbw==) | `60.83% <0.00%> (+0.21%)` | :arrow_up: |
   | [protocol/dubbo/impl/response.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cHJvdG9jb2wvZHViYm8vaW1wbC9yZXNwb25zZS5nbw==) | `0.00% <0.00%> (ø)` | |
   | [protocol/dubbo/impl/serialize.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cHJvdG9jb2wvZHViYm8vaW1wbC9zZXJpYWxpemUuZ28=) | `0.00% <0.00%> (ø)` | |
   | [remoting/getty/config.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cmVtb3RpbmcvZ2V0dHkvY29uZmlnLmdv) | `9.85% <ø> (ø)` | |
   | [protocol/dubbo/impl/hessian.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cHJvdG9jb2wvZHViYm8vaW1wbC9oZXNzaWFuLmdv) | `29.44% <29.44%> (ø)` | |
   | [remoting/getty/listener.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cmVtb3RpbmcvZ2V0dHkvbGlzdGVuZXIuZ28=) | `35.91% <35.91%> (ø)` | |
   | [protocol/dubbo/impl/codec.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cHJvdG9jb2wvZHViYm8vaW1wbC9jb2RlYy5nbw==) | `39.33% <39.33%> (ø)` | |
   | [remoting/getty/getty\_server.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cmVtb3RpbmcvZ2V0dHkvZ2V0dHlfc2VydmVyLmdv) | `41.96% <40.62%> (ø)` | |
   | [remoting/getty/getty\_client.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cmVtb3RpbmcvZ2V0dHkvZ2V0dHlfY2xpZW50Lmdv) | `41.17% <41.17%> (ø)` | |
   | [remoting/getty/pool.go](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree#diff-cmVtb3RpbmcvZ2V0dHkvcG9vbC5nbw==) | `60.36% <46.66%> (ø)` | |
   | ... and [24 more](https://codecov.io/gh/apache/dubbo-go/pull/673/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=footer). Last update [163d645...4f3d4ff](https://codecov.io/gh/apache/dubbo-go/pull/673?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r487384427



##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       modifed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] aliiohs commented on a change in pull request #673: [WIP]Rft: network & codec

Posted by GitBox <gi...@apache.org>.
aliiohs commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r472048279



##########
File path: metadata/service/exporter/configurable/exporter_test.go
##########
@@ -18,6 +18,7 @@
 package configurable
 
 import (
+	"github.com/apache/dubbo-go/remoting/getty"

Review comment:
       pls split pkg

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {

Review comment:
       don't use magic number




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] flycash commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
flycash commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r485664385



##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()

Review comment:
       It looks a little bit strange. It's a typical check-and-do something problem.
   Why not just use:
   ```go
   clientTmp, ok := exchangeClientMap.Load(url.Location)
   	if !ok {
       exchengeClient = NewXXX
       otherClient, loaded = exchangeClientMap.LoadOrStore(location, exchangeClient)
       if ! loaded {
       // someone created, so we closed the client just created by us.
       exchangeClient.Close()
      exchangeClient = otherClient
   }
   }
   ```

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{

Review comment:
       if `clientTmp != nil`, should we create an exchangeClient? I mean, `clientTmp != nil` indicates that someone has created a client.  

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {

Review comment:
       There is a race condition. Both two goroutine find out `!ok` and both of them create clients, named client1, client2.
   
   And if client1 was stored before client2, client1 was covered by client2 when we call `exchangeClientMap.Store(url.Location, client2)` . But client1 is not closed. 
   Actually, using `LoadOrStore` and then checking whether store sucessfully is better.
   Don't forget to close client if it's not stored in `exchangeClientMap`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r487384058



##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       delete it

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       delete it

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       I think so

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       modifed

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       I think less "return “  is better.

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       delete it

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       delete it

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       I think so

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       modifed

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       I think less "return “  is better.

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       delete it

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       delete it

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       I think so

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       modifed

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       I think less "return “  is better.

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       delete it

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       delete it

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       I think so

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       modifed

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       I think less "return “  is better.

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       delete it

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       delete it

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       I think so

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       modifed

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       I think less "return “  is better.

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       delete it

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       delete it

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       I think so

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       modifed

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       I think less "return “  is better.

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       delete it

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       delete it

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       I think so

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       modifed

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       I think less "return “  is better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] zouyx commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
zouyx commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r486011339



##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {

Review comment:
       Useful or not?

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {

Review comment:
       The request will not null here?

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	di.appendCtx(ctx, inv)
 
 	url := di.GetUrl()
+	// default hessian2 serialization, compatible
+	if url.GetParam("serialization", "") == "" {
+		url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)

Review comment:
       where is the constant of key

##########
File path: config/application_config.go
##########
@@ -42,6 +42,16 @@ func (*ApplicationConfig) Prefix() string {
 	return constant.DUBBO + ".application."
 }
 
+// nolint
+func (c *ApplicationConfig) Id() string {
+	return ""
+}
+
+// SetId ...
+func (c *ApplicationConfig) SetId(id string) {

Review comment:
       Useful or not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] georgehao commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
georgehao commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r489561858



##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)

Review comment:
       two goroutine coming, goroutine1 come early, but not create client yet , both doesn't get client from exchangeClientMap.
   
   when goroutine1 create client, goroutine1 must lock, to prevent goroutine2 overwrite the client




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] AlexStocks commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
AlexStocks commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r487656013



##########
File path: remoting/getty/readwriter.go
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+	"reflect"
+)
+
+import (
+	"github.com/apache/dubbo-getty"
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+////////////////////////////////////////////
+// RpcClientPackageHandler
+////////////////////////////////////////////
+
+// RpcClientPackageHandler ...

Review comment:
       pls add remark for this file's funcs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r487385104



##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,101 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		var exchangeClientTmp *remoting.ExchangeClient
+		func() {
+			// lock for NewExchangeClient and store into map.
+			_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
+			// unlock
+			defer exchangeLock.Delete(url.Location)
+			if loaded {
+				// retry for 5 times.
+				for i := 0; i < 5; i++ {
+					if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
+						break
+					} else {
+						// if cannot get, sleep a while.
+						time.Sleep(time.Duration(i*100) * time.Millisecond)
+					}
+				}
+				return
+			}
+			// new ExchangeClient
+			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+			}), config.GetConsumerConfig().ConnectTimeout, false)
+			// input store
+			if exchangeClientTmp != nil {
+				exchangeClientMap.Store(url.Location, exchangeClientTmp)
+			}
+		}()
+		if exchangeClientTmp != nil {
+			return exchangeClientTmp
+		}
+	}
+	// cannot dial the server
+	if clientTmp == nil {
+		return nil
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+			RequestTimeout: config.GetConsumerConfig().RequestTimeout}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+		return exchangeClientTmp
+	}
+	return exchangeClient
+}
+
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+	ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
+
+	// actually, if user do not use any opentracing framework, the err will not be nil.
+	spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+		opentracing.TextMapCarrier(inv.Attachments()))
+	if err == nil {
+		ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+	}
+	return ctx

Review comment:
       I think less "return “  is better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] watermelo commented on a change in pull request #673: [WIP]Rft: network & codec

Posted by GitBox <gi...@apache.org>.
watermelo commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r483953879



##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {
+		return c.encodeHeartbeartReqeust(request)
+	}
+
+	invoc, ok := request.Data.(*protocol.Invocation)
+	if !ok {
+		logger.Errorf("encode request failed for parameter type :%+v", request)
+		return nil, perrors.Errorf("encode request failed for parameter type :%+v", request)
+	}
+	invocation := *invoc
+
+	svc := impl.Service{}
+	svc.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
+	svc.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "")
+	svc.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "")
+	svc.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "")
+	svc.Method = invocation.MethodName()
+	timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_REMOTING_TIMEOUT)))
+	if err != nil {
+		// it will be wrapped in readwrite.Write .
+		return nil, perrors.WithStack(err)
+	}
+	svc.Timeout = time.Duration(timeout)
+
+	header := impl.DubboHeader{}
+	serialization := invocation.AttachmentsByKey(constant.SERIALIZATION_KEY, constant.HESSIAN2_SERIALIZATION)
+	if serialization == constant.PROTOBUF_SERIALIZATION {
+		header.SerialID = constant.S_Proto
+	} else {
+		header.SerialID = constant.S_Hessian2
+	}
+	header.ID = request.ID
+	if request.TwoWay {
+		header.Type = impl.PackageRequest_TwoWay
+	} else {
+		header.Type = impl.PackageRequest
+	}
+
+	pkg := &impl.DubboPackage{
+		Header:  header,
+		Service: svc,
+		Body:    impl.NewRequestPayload(invocation.Arguments(), invocation.Attachments()),
+		Err:     nil,
+		Codec:   impl.NewDubboCodec(nil),
+	}
+
+	if err := impl.LoadSerializer(pkg); err != nil {
+		return nil, perrors.WithStack(err)
+	}
+
+	return pkg.Marshal()
+}
+
+// encode heartbeart request
+func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) {
+	header := impl.DubboHeader{
+		Type:     impl.PackageHeartbeat,
+		SerialID: constant.S_Hessian2,
+		ID:       request.ID,
+	}
+
+	pkg := &impl.DubboPackage{
+		Header:  header,
+		Service: impl.Service{},
+		Body:    impl.NewRequestPayload([]interface{}{}, nil),
+		Err:     nil,
+		Codec:   impl.NewDubboCodec(nil),
+	}
+
+	if err := impl.LoadSerializer(pkg); err != nil {
+		return nil, err
+	}
+	return pkg.Marshal()
+}
+
+// encode response
+func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) {
+	var ptype = impl.PackageResponse
+	if response.IsHeartbeat() {
+		ptype = impl.PackageHeartbeat
+	}
+	resp := &impl.DubboPackage{
+		Header: impl.DubboHeader{
+			SerialID:       response.SerialID,
+			Type:           ptype,
+			ID:             response.ID,
+			ResponseStatus: response.Status,
+		},
+	}
+	if !response.IsHeartbeat() {
+		resp.Body = &impl.ResponsePayload{
+			RspObj:      response.Result.(protocol.RPCResult).Rest,
+			Exception:   response.Result.(protocol.RPCResult).Err,
+			Attachments: response.Result.(protocol.RPCResult).Attrs,
+		}
+	}
+
+	codec := impl.NewDubboCodec(nil)
+
+	pkg, err := codec.Encode(*resp)
+	if err != nil {
+		return nil, perrors.WithStack(err)
+	}
+
+	return bytes.NewBuffer(pkg), nil
+}
+
+// Decode data, including request and response.
+func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
+	if c.isRequest(data) {
+		req, len, err := c.decodeRequest(data)
+		if err != nil {
+			return remoting.DecodeResult{}, len, perrors.WithStack(err)
+		}
+		return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
+	} else {
+		resp, len, err := c.decodeResponse(data)
+		if err != nil {
+			return remoting.DecodeResult{}, len, perrors.WithStack(err)
+		}
+		return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err)
+	}
+}
+
+func (c *DubboCodec) isRequest(data []byte) bool {
+	if data[2]&byte(0x80) == 0x00 {
+		return false
+	}
+	return true
+}
+
+// decode request
+func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) {
+	var request *remoting.Request = nil
+	buf := bytes.NewBuffer(data)
+	pkg := impl.NewDubboPackage(buf)
+	pkg.SetBody(make([]interface{}, 7))
+	err := pkg.Unmarshal()
+	if err != nil {
+		originErr := perrors.Cause(err)
+		if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
+			//FIXME
+			return nil, 0, originErr
+		}
+		logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
+
+		return request, 0, perrors.WithStack(err)
+	}
+	request = &remoting.Request{
+		ID:       pkg.Header.ID,
+		SerialID: pkg.Header.SerialID,
+		TwoWay:   pkg.Header.Type&impl.PackageRequest_TwoWay != 0x00,
+		Event:    pkg.Header.Type&impl.PackageHeartbeat != 0x00,
+	}
+	if (pkg.Header.Type & impl.PackageHeartbeat) == 0x00 {

Review comment:
       Maybe it's better to keep the format as follows `pkg.Header.Type&impl.PackageHeartbeat != 0x00`

##########
File path: common/url.go
##########
@@ -325,20 +327,22 @@ func (c URL) Key() string {
 
 // ServiceKey gets a unique key of a service.
 func (c URL) ServiceKey() string {
-	intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/"))
+	return ServiceKey(c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")),
+		c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, ""))
+}
+
+func ServiceKey(intf string, group string, version string) string {
 	if intf == "" {
 		return ""
 	}
-	var buf strings.Builder
-	group := c.GetParam(constant.GROUP_KEY, "")
+	buf := &bytes.Buffer{}

Review comment:
       strings.Builder is better

##########
File path: protocol/dubbo/dubbo_protocol_test.go
##########
@@ -74,12 +115,23 @@ func TestDubboProtocolExport(t *testing.T) {
 	assert.False(t, ok)
 }
 
-func TestDubboProtocolRefer(t *testing.T) {
+func TestDubboProtocol_Refer_No_connect(t *testing.T) {

Review comment:
       Maybe `TestDubboProtocolReferNoConnect` is better.

##########
File path: protocol/invocation/rpcinvocation.go
##########
@@ -18,11 +18,13 @@
 package invocation
 
 import (
+	"github.com/apache/dubbo-go/common"

Review comment:
       split pkgs

##########
File path: protocol/jsonrpc/jsonrpc_invoker.go
##########
@@ -19,13 +19,12 @@ package jsonrpc
 
 import (
 	"context"
-)
 
-import (

Review comment:
       split pkgs

##########
File path: protocol/jsonrpc/http_test.go
##########
@@ -22,19 +22,15 @@ import (
 	"strings"
 	"testing"
 	"time"
-)
 
-import (
 	"github.com/opentracing/opentracing-go"
-	perrors "github.com/pkg/errors"
-	"github.com/stretchr/testify/assert"
-)
 
-import (
 	"github.com/apache/dubbo-go/common"
 	"github.com/apache/dubbo-go/common/constant"
 	"github.com/apache/dubbo-go/common/proxy/proxy_factory"
 	"github.com/apache/dubbo-go/protocol"
+	perrors "github.com/pkg/errors"

Review comment:
       split pkgs




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo-go] zouyx commented on a change in pull request #673: Rft: network & codec

Posted by GitBox <gi...@apache.org>.
zouyx commented on a change in pull request #673:
URL: https://github.com/apache/dubbo-go/pull/673#discussion_r491368602



##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bytes"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/dubbo/impl"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {

Review comment:
       singleton or not?

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +144,91 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK

Review comment:
       delete them if useless

##########
File path: protocol/dubbo/dubbo_invoker.go
##########
@@ -46,24 +49,35 @@ var (
 )
 
 var (
-	attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
+	attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY,
+		constant.VERSION_KEY}
 )
 
-// DubboInvoker is dubbo client invoker.
+// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refer to one service and ip.
 type DubboInvoker struct {
 	protocol.BaseInvoker
-	client   *Client
+	// the exchange layer, it is focus on network communication.
+	client   *remoting.ExchangeClient
 	quitOnce sync.Once
+	// timeout for service(interface) level.
+	timeout time.Duration
 	// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
 	reqNum int64
 }
 
-// NewDubboInvoker create dubbo client invoker.
-func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
+// NewDubboInvoker constructor
+func NewDubboInvoker(url common.URL, client *remoting.ExchangeClient) *DubboInvoker {
+	requestTimeout := config.GetConsumerConfig().RequestTimeout
+
+	requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
+	if t, err := time.ParseDuration(requestTimeoutStr); err == nil {

Review comment:
       I think you should print error if error is not null




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org