You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by ge...@apache.org on 2022/07/11 16:11:03 UTC
[dubbo-go] 01/01: feat: init rocketmq go client
This is an automated email from the ASF dual-hosted git repository.
georgehao pushed a commit to branch feat/remoting_rocketmq
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
commit 56d3ff5888a5ae48dc7c4ef78dd64f562d1b4246
Author: haohongfan1 <ha...@jd.com>
AuthorDate: Tue Jul 12 00:10:35 2022 +0800
feat: init rocketmq go client
---
go.mod | 1 +
go.sum | 15 ++++
protocol/mq/doc.go | 19 +++++
protocol/mq/rocketmq_exporter.go | 58 +++++++++++++
protocol/mq/rocketmq_invoker.go | 155 ++++++++++++++++++++++++++++++++++
protocol/mq/rocketmq_invoker_test.go | 1 +
protocol/mq/rocketmq_protocol.go | 131 ++++++++++++++++++++++++++++
protocol/mq/rocketmq_protocol_test.go | 1 +
8 files changed, 381 insertions(+)
diff --git a/go.mod b/go.mod
index 3b21aa488..e44e4449a 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,7 @@ require (
github.com/alibaba/sentinel-golang v1.0.4
github.com/apache/dubbo-getty v1.4.8
github.com/apache/dubbo-go-hessian2 v1.11.0
+ github.com/apache/rocketmq-client-go/v2 v2.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1
diff --git a/go.sum b/go.sum
index cbecb919a..0ca800529 100644
--- a/go.sum
+++ b/go.sum
@@ -73,6 +73,8 @@ github.com/apache/dubbo-go-hessian2 v1.9.1/go.mod h1:xQUjE7F8PX49nm80kChFvepA/Av
github.com/apache/dubbo-go-hessian2 v1.9.3/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
github.com/apache/dubbo-go-hessian2 v1.11.0 h1:VTdT6NStuEqNmyT3AdSN2DLDBqhXvAAyAAAoh9hLavk=
github.com/apache/dubbo-go-hessian2 v1.11.0/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
+github.com/apache/rocketmq-client-go/v2 v2.1.0 h1:3eABKfxc1WmS2lLTTbKMe1gZfZV6u1Sx9orFnOfABV0=
+github.com/apache/rocketmq-client-go/v2 v2.1.0/go.mod h1:oEZKFDvS7sz/RWU0839+dQBupazyBV7WX5cP6nrio0Q=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
@@ -192,6 +194,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/emicklei/go-restful/v3 v3.8.0 h1:eCZ8ulSerjdAiaNpF7GxXIE7ZCMo1moN1qX+S609eVw=
github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
+github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
+github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -682,6 +686,7 @@ github.com/shirou/gopsutil/v3 v3.21.6 h1:vU7jrp1Ic/2sHB7w6UNs7MIkn7ebVtTb5D9j45o
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
@@ -689,6 +694,7 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
+github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
@@ -726,6 +732,12 @@ github.com/stretchr/testify v1.7.3 h1:dAm0YRdRQlWojc3CrCRgPBzG5f941d0zvAKu7qY4e+
github.com/stretchr/testify v1.7.3/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ=
+github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
+github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
+github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
+github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
+github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 h1:rQ229MBgvW68s1/g6f1/63TgYwYxfF4E+bi/KC19P8g=
+github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tklauser/go-sysconf v0.3.6 h1:oc1sJWvKkmvIxhDHeKWvZS4f6AW+YcoguSfRF2/Hmo4=
github.com/tklauser/go-sysconf v0.3.6/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=
@@ -793,6 +805,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
@@ -1247,3 +1260,5 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
+stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
+stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
diff --git a/protocol/mq/doc.go b/protocol/mq/doc.go
new file mode 100644
index 000000000..206e27957
--- /dev/null
+++ b/protocol/mq/doc.go
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package mq implements mq rpc protocol.
+package mq
diff --git a/protocol/mq/rocketmq_exporter.go b/protocol/mq/rocketmq_exporter.go
new file mode 100644
index 000000000..9904823ed
--- /dev/null
+++ b/protocol/mq/rocketmq_exporter.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+ "sync"
+)
+
+import (
+ tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+// RocketMQExporter is rocketmq service exporter
+type RocketMQExporter struct {
+ protocol.BaseExporter
+ serviceMap *sync.Map
+}
+
+func NewRocketMQExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map, serviceMap *sync.Map) *RocketMQExporter {
+ return &RocketMQExporter{
+ BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap),
+ serviceMap: serviceMap,
+ }
+}
+
+func (exporter *RocketMQExporter) Unexport() {
+ url := exporter.GetInvoker().GetURL()
+ interfaceName := url.GetParam(constant.InterfaceKey, "")
+ exporter.BaseExporter.Unexport()
+
+ err := common.ServiceMap.UnRegister(interfaceName, tripleConstant.TRIPLE, url.ServiceKey())
+ if err != nil {
+ logger.Errorf("[RocketMQExporter.Unexport] error: %v", err)
+ }
+ exporter.serviceMap.Delete(interfaceName)
+}
diff --git a/protocol/mq/rocketmq_invoker.go b/protocol/mq/rocketmq_invoker.go
new file mode 100644
index 000000000..b55430fec
--- /dev/null
+++ b/protocol/mq/rocketmq_invoker.go
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+ "context"
+ "reflect"
+ "sync"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "github.com/apache/rocketmq-client-go/v2"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/apache/rocketmq-client-go/v2/producer"
+ "github.com/dubbogo/grpc-go/metadata"
+ tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+// same as dubbo_invoker.go attachmentKey
+var attachmentKey = []string{
+ constant.InterfaceKey, constant.GroupKey, constant.TokenKey, constant.TimeoutKey,
+ constant.VersionKey,
+}
+
+// RocketMQInvoker is implement of protocol.Invoker, a RocketMQInvoker refer to one service and ip.
+type RocketMQInvoker struct {
+ protocol.BaseInvoker
+
+ topic string
+ producer rocketmq.Producer
+ quitOnce sync.Once
+ mtx sync.RWMutex
+}
+
+// NewRocketMQInvoker constructor
+func NewRocketMQInvoker(url *common.URL) (*RocketMQInvoker, error) {
+ // todo 从配置文件中获取
+ groupName := "please_rename_unique_group_name"
+ nameserverIpPort := []string{"127.0.0.1:9876"}
+ nameserverResolver := primitive.NewPassthroughResolver(nameserverIpPort)
+ retry := 2
+
+ rocketmqProducer, err := rocketmq.NewProducer(
+ producer.WithGroupName(groupName),
+ producer.WithNsResolver(nameserverResolver),
+ producer.WithRetry(retry),
+ )
+
+ if err != nil {
+ logger.Warnf("init rocketmq producer groupName:%s nameserver:%v error:%v", groupName, nameserverIpPort, err)
+ return nil, err
+ }
+
+ if err := rocketmqProducer.Start(); err != nil {
+ logger.Warnf("start rocketmq producer groupName:%s nameserver:%v error:%v", groupName, nameserverIpPort, err)
+ return nil, err
+ }
+
+ return &RocketMQInvoker{
+ BaseInvoker: *protocol.NewBaseInvoker(url),
+ producer: rocketmqProducer,
+ }, nil
+}
+
+func (invoker *RocketMQInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
+ var result protocol.RPCResult
+
+ if invoker.BaseInvoker.IsAvailable() {
+ logger.Warnf("rocketmq invoker destroyed")
+ result.Err = protocol.ErrDestroyedInvoker
+ return &result
+ }
+
+ invoker.mtx.RLock()
+ defer invoker.mtx.RUnlock()
+ if invoker.producer == nil {
+ result.Err = protocol.ErrClientClosed
+ return &result
+ }
+
+ if invoker.BaseInvoker.IsAvailable() {
+ logger.Warnf("rocketmq invoker destroyed")
+ result.Err = protocol.ErrDestroyedInvoker
+ return &result
+ }
+
+ for _, k := range attachmentKey {
+ if v := di.GetURL().GetParam(k, ""); len(v) > 0 {
+ invocation.SetAttachment(k, v)
+ }
+ }
+
+ // append interface id to ctx
+ gRPCMD := make(metadata.MD, 0)
+ for k, v := range invocation.Attachments() {
+ if str, ok := v.(string); ok {
+ gRPCMD.Set(k, str)
+ continue
+ }
+ if str, ok := v.([]string); ok {
+ gRPCMD.Set(k, str...)
+ continue
+ }
+ logger.Warnf("triple attachment value with key = %s is invalid, which should be string or []string", k)
+ }
+ ctx = metadata.NewOutgoingContext(ctx, gRPCMD)
+ ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, invoker.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, ""))
+ in := make([]reflect.Value, 0, 16)
+ in = append(in, reflect.ValueOf(ctx))
+
+ if len(invocation.ParameterValues()) > 0 {
+ in = append(in, invocation.ParameterValues()...)
+ }
+
+ methodName := invocation.MethodName()
+ triAttachmentWithErr := invoker.pro(methodName, in, invocation.Reply())
+ result.Err = triAttachmentWithErr.GetError()
+ result.Attrs = make(map[string]interface{})
+ for k, v := range triAttachmentWithErr.GetAttachments() {
+ result.Attrs[k] = v
+ }
+ result.Rest = invocation.Reply()
+ return &result
+
+ return nil
+}
+
+func (invoker *RocketMQInvoker) IsAvailable() bool {
+ return false
+}
+
+func (invoker *RocketMQInvoker) Destroy() {
+}
diff --git a/protocol/mq/rocketmq_invoker_test.go b/protocol/mq/rocketmq_invoker_test.go
new file mode 100644
index 000000000..71893fd01
--- /dev/null
+++ b/protocol/mq/rocketmq_invoker_test.go
@@ -0,0 +1 @@
+package mq
diff --git a/protocol/mq/rocketmq_protocol.go b/protocol/mq/rocketmq_protocol.go
new file mode 100644
index 000000000..cc0fffba9
--- /dev/null
+++ b/protocol/mq/rocketmq_protocol.go
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+ "sync"
+)
+
+import (
+ "github.com/dubbogo/triple/pkg/triple"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+ "dubbo.apache.org/dubbo-go/v3/remoting/getty"
+)
+
+const RocketMQ = "rocketmq"
+
+var protocolOnce sync.Once
+
+func init() {
+ extension.SetProtocol(RocketMQ, GetProtocol)
+ protocolOnce = sync.Once{}
+}
+
+var (
+ rocketmqProtocol *RocketMQProtocol
+)
+
+// RocketMQProtocol supports dubbo 3.0 protocol. It implements Protocol interface for dubbo protocol.
+type RocketMQProtocol struct {
+ protocol.BaseProtocol
+ serverLock sync.Mutex
+
+ serviceMap *sync.Map // serviceMap is used to export multiple service by one server
+ serverMap map[string]*triple.TripleServer // serverMap stores all exported server
+}
+
+func NewRocketMQProtocol() *RocketMQProtocol {
+ return &RocketMQProtocol{
+ BaseProtocol: protocol.NewBaseProtocol(),
+ }
+}
+
+func (rp *RocketMQProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
+ url := invoker.GetURL()
+ serviceKey := url.ServiceKey()
+ // todo
+ exporter := NewRocketMQExporter(serviceKey, invoker, nil, nil)
+ rp.SetExporterMap(serviceKey, exporter)
+ logger.Infof("[rocketmq Protocol] Export service: %s", url.String())
+
+ // start server
+ rp.openServer(url)
+ return exporter
+}
+
+func (rp *RocketMQProtocol) Refer(url *common.URL) protocol.Invoker {
+ invoker, err := NewRocketMQInvoker(url)
+ if err != nil {
+ logger.Errorf("rocketmq protocol Refer url = %+v, with error = %s", url, err.Error())
+ return nil
+ }
+ rp.SetInvokers(invoker)
+ logger.Infof("[Rocketmq Protocol] Refer service: %s", url.String())
+ return invoker
+}
+
+// Destroy rocketmq service.
+func (rp *RocketMQProtocol) Destroy() {
+ logger.Infof("RocketMQ Protocol destroy.")
+
+ rp.BaseProtocol.Destroy()
+
+ // stop server
+ for key, server := range rp.serverMap {
+ delete(rp.serverMap, key)
+ server.Stop()
+ }
+}
+
+func (rp *RocketMQProtocol) openServer(url *common.URL) {
+ _, ok := rp.serverMap[url.Location]
+ if !ok {
+ _, ok := rp.ExporterMap().Load(url.ServiceKey())
+ if !ok {
+ panic("[RocketMQProtocol]" + url.Key() + "is not existing")
+ }
+
+ rp.serverLock.Lock()
+ _, ok = rp.serverMap[url.Location]
+ if !ok {
+ handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult {
+ return doHandleRequest(invocation)
+ }
+ srv := remoting.NewExchangeServer(url, getty.NewServer(url, handler))
+ rp.serverMap[url.Location] = srv
+ srv.Start()
+ }
+ rp.serverLock.Unlock()
+ }
+}
+
+// GetProtocol get a single rocketmq protocol.
+func GetProtocol() protocol.Protocol {
+ if rocketmqProtocol == nil {
+ rocketmqProtocol = NewRocketMQProtocol()
+ }
+ return rocketmqProtocol
+}
diff --git a/protocol/mq/rocketmq_protocol_test.go b/protocol/mq/rocketmq_protocol_test.go
new file mode 100644
index 000000000..71893fd01
--- /dev/null
+++ b/protocol/mq/rocketmq_protocol_test.go
@@ -0,0 +1 @@
+package mq