You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by ge...@apache.org on 2022/07/11 16:11:02 UTC

[dubbo-go] branch feat/remoting_rocketmq created (now 56d3ff588)

This is an automated email from the ASF dual-hosted git repository.

georgehao pushed a change to branch feat/remoting_rocketmq
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


      at 56d3ff588 feat: init rocketmq go client

This branch includes the following new commits:

     new 56d3ff588 feat: init rocketmq go client

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[dubbo-go] 01/01: feat: init rocketmq go client

Posted by ge...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

georgehao pushed a commit to branch feat/remoting_rocketmq
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git

commit 56d3ff5888a5ae48dc7c4ef78dd64f562d1b4246
Author: haohongfan1 <ha...@jd.com>
AuthorDate: Tue Jul 12 00:10:35 2022 +0800

    feat: init rocketmq go client
---
 go.mod                                |   1 +
 go.sum                                |  15 ++++
 protocol/mq/doc.go                    |  19 +++++
 protocol/mq/rocketmq_exporter.go      |  58 +++++++++++++
 protocol/mq/rocketmq_invoker.go       | 155 ++++++++++++++++++++++++++++++++++
 protocol/mq/rocketmq_invoker_test.go  |   1 +
 protocol/mq/rocketmq_protocol.go      | 131 ++++++++++++++++++++++++++++
 protocol/mq/rocketmq_protocol_test.go |   1 +
 8 files changed, 381 insertions(+)

diff --git a/go.mod b/go.mod
index 3b21aa488..e44e4449a 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,7 @@ require (
 	github.com/alibaba/sentinel-golang v1.0.4
 	github.com/apache/dubbo-getty v1.4.8
 	github.com/apache/dubbo-go-hessian2 v1.11.0
+	github.com/apache/rocketmq-client-go/v2 v2.1.0 // indirect
 	github.com/cespare/xxhash/v2 v2.1.2
 	github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4
 	github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1
diff --git a/go.sum b/go.sum
index cbecb919a..0ca800529 100644
--- a/go.sum
+++ b/go.sum
@@ -73,6 +73,8 @@ github.com/apache/dubbo-go-hessian2 v1.9.1/go.mod h1:xQUjE7F8PX49nm80kChFvepA/Av
 github.com/apache/dubbo-go-hessian2 v1.9.3/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
 github.com/apache/dubbo-go-hessian2 v1.11.0 h1:VTdT6NStuEqNmyT3AdSN2DLDBqhXvAAyAAAoh9hLavk=
 github.com/apache/dubbo-go-hessian2 v1.11.0/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
+github.com/apache/rocketmq-client-go/v2 v2.1.0 h1:3eABKfxc1WmS2lLTTbKMe1gZfZV6u1Sx9orFnOfABV0=
+github.com/apache/rocketmq-client-go/v2 v2.1.0/go.mod h1:oEZKFDvS7sz/RWU0839+dQBupazyBV7WX5cP6nrio0Q=
 github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
 github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
@@ -192,6 +194,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
 github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
 github.com/emicklei/go-restful/v3 v3.8.0 h1:eCZ8ulSerjdAiaNpF7GxXIE7ZCMo1moN1qX+S609eVw=
 github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
+github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
+github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
 github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
 github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -682,6 +686,7 @@ github.com/shirou/gopsutil/v3 v3.21.6 h1:vU7jrp1Ic/2sHB7w6UNs7MIkn7ebVtTb5D9j45o
 github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
 github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
 github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
@@ -689,6 +694,7 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
 github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
+github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
@@ -726,6 +732,12 @@ github.com/stretchr/testify v1.7.3 h1:dAm0YRdRQlWojc3CrCRgPBzG5f941d0zvAKu7qY4e+
 github.com/stretchr/testify v1.7.3/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
 github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ=
+github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
+github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
+github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
+github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
+github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 h1:rQ229MBgvW68s1/g6f1/63TgYwYxfF4E+bi/KC19P8g=
+github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
 github.com/tklauser/go-sysconf v0.3.6 h1:oc1sJWvKkmvIxhDHeKWvZS4f6AW+YcoguSfRF2/Hmo4=
 github.com/tklauser/go-sysconf v0.3.6/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
 github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=
@@ -793,6 +805,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
 go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
@@ -1247,3 +1260,5 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
+stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
+stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
diff --git a/protocol/mq/doc.go b/protocol/mq/doc.go
new file mode 100644
index 000000000..206e27957
--- /dev/null
+++ b/protocol/mq/doc.go
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package mq implements mq rpc protocol.
+package mq
diff --git a/protocol/mq/rocketmq_exporter.go b/protocol/mq/rocketmq_exporter.go
new file mode 100644
index 000000000..9904823ed
--- /dev/null
+++ b/protocol/mq/rocketmq_exporter.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+	"sync"
+)
+
+import (
+	tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common"
+	"dubbo.apache.org/dubbo-go/v3/common/constant"
+	"dubbo.apache.org/dubbo-go/v3/common/logger"
+	"dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+// RocketMQExporter is rocketmq service exporter
+type RocketMQExporter struct {
+	protocol.BaseExporter
+	serviceMap *sync.Map
+}
+
+func NewRocketMQExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map, serviceMap *sync.Map) *RocketMQExporter {
+	return &RocketMQExporter{
+		BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap),
+		serviceMap:   serviceMap,
+	}
+}
+
+func (exporter *RocketMQExporter) Unexport() {
+	url := exporter.GetInvoker().GetURL()
+	interfaceName := url.GetParam(constant.InterfaceKey, "")
+	exporter.BaseExporter.Unexport()
+
+	err := common.ServiceMap.UnRegister(interfaceName, tripleConstant.TRIPLE, url.ServiceKey())
+	if err != nil {
+		logger.Errorf("[RocketMQExporter.Unexport] error: %v", err)
+	}
+	exporter.serviceMap.Delete(interfaceName)
+}
diff --git a/protocol/mq/rocketmq_invoker.go b/protocol/mq/rocketmq_invoker.go
new file mode 100644
index 000000000..b55430fec
--- /dev/null
+++ b/protocol/mq/rocketmq_invoker.go
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+	"context"
+	"reflect"
+	"sync"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common/constant"
+	"github.com/apache/rocketmq-client-go/v2"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/producer"
+	"github.com/dubbogo/grpc-go/metadata"
+	tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common"
+	"dubbo.apache.org/dubbo-go/v3/common/logger"
+	"dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+// same as dubbo_invoker.go attachmentKey
+var attachmentKey = []string{
+	constant.InterfaceKey, constant.GroupKey, constant.TokenKey, constant.TimeoutKey,
+	constant.VersionKey,
+}
+
+// RocketMQInvoker is implement of protocol.Invoker, a RocketMQInvoker refer to one service and ip.
+type RocketMQInvoker struct {
+	protocol.BaseInvoker
+
+	topic    string
+	producer rocketmq.Producer
+	quitOnce sync.Once
+	mtx      sync.RWMutex
+}
+
+// NewRocketMQInvoker constructor
+func NewRocketMQInvoker(url *common.URL) (*RocketMQInvoker, error) {
+	// todo 从配置文件中获取
+	groupName := "please_rename_unique_group_name"
+	nameserverIpPort := []string{"127.0.0.1:9876"}
+	nameserverResolver := primitive.NewPassthroughResolver(nameserverIpPort)
+	retry := 2
+
+	rocketmqProducer, err := rocketmq.NewProducer(
+		producer.WithGroupName(groupName),
+		producer.WithNsResolver(nameserverResolver),
+		producer.WithRetry(retry),
+	)
+
+	if err != nil {
+		logger.Warnf("init rocketmq producer groupName:%s nameserver:%v error:%v", groupName, nameserverIpPort, err)
+		return nil, err
+	}
+
+	if err := rocketmqProducer.Start(); err != nil {
+		logger.Warnf("start rocketmq producer groupName:%s nameserver:%v error:%v", groupName, nameserverIpPort, err)
+		return nil, err
+	}
+
+	return &RocketMQInvoker{
+		BaseInvoker: *protocol.NewBaseInvoker(url),
+		producer:    rocketmqProducer,
+	}, nil
+}
+
+func (invoker *RocketMQInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
+	var result protocol.RPCResult
+
+	if invoker.BaseInvoker.IsAvailable() {
+		logger.Warnf("rocketmq invoker destroyed")
+		result.Err = protocol.ErrDestroyedInvoker
+		return &result
+	}
+
+	invoker.mtx.RLock()
+	defer invoker.mtx.RUnlock()
+	if invoker.producer == nil {
+		result.Err = protocol.ErrClientClosed
+		return &result
+	}
+
+	if invoker.BaseInvoker.IsAvailable() {
+		logger.Warnf("rocketmq invoker destroyed")
+		result.Err = protocol.ErrDestroyedInvoker
+		return &result
+	}
+
+	for _, k := range attachmentKey {
+		if v := di.GetURL().GetParam(k, ""); len(v) > 0 {
+			invocation.SetAttachment(k, v)
+		}
+	}
+
+	// append interface id to ctx
+	gRPCMD := make(metadata.MD, 0)
+	for k, v := range invocation.Attachments() {
+		if str, ok := v.(string); ok {
+			gRPCMD.Set(k, str)
+			continue
+		}
+		if str, ok := v.([]string); ok {
+			gRPCMD.Set(k, str...)
+			continue
+		}
+		logger.Warnf("triple attachment value with key = %s is invalid, which should be string or []string", k)
+	}
+	ctx = metadata.NewOutgoingContext(ctx, gRPCMD)
+	ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, invoker.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, ""))
+	in := make([]reflect.Value, 0, 16)
+	in = append(in, reflect.ValueOf(ctx))
+
+	if len(invocation.ParameterValues()) > 0 {
+		in = append(in, invocation.ParameterValues()...)
+	}
+
+	methodName := invocation.MethodName()
+	triAttachmentWithErr := invoker.pro(methodName, in, invocation.Reply())
+	result.Err = triAttachmentWithErr.GetError()
+	result.Attrs = make(map[string]interface{})
+	for k, v := range triAttachmentWithErr.GetAttachments() {
+		result.Attrs[k] = v
+	}
+	result.Rest = invocation.Reply()
+	return &result
+
+	return nil
+}
+
+func (invoker *RocketMQInvoker) IsAvailable() bool {
+	return false
+}
+
+func (invoker *RocketMQInvoker) Destroy() {
+}
diff --git a/protocol/mq/rocketmq_invoker_test.go b/protocol/mq/rocketmq_invoker_test.go
new file mode 100644
index 000000000..71893fd01
--- /dev/null
+++ b/protocol/mq/rocketmq_invoker_test.go
@@ -0,0 +1 @@
+package mq
diff --git a/protocol/mq/rocketmq_protocol.go b/protocol/mq/rocketmq_protocol.go
new file mode 100644
index 000000000..cc0fffba9
--- /dev/null
+++ b/protocol/mq/rocketmq_protocol.go
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+	"sync"
+)
+
+import (
+	"github.com/dubbogo/triple/pkg/triple"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common"
+	"dubbo.apache.org/dubbo-go/v3/common/extension"
+	"dubbo.apache.org/dubbo-go/v3/common/logger"
+	"dubbo.apache.org/dubbo-go/v3/protocol"
+	"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+	"dubbo.apache.org/dubbo-go/v3/remoting"
+	"dubbo.apache.org/dubbo-go/v3/remoting/getty"
+)
+
+const RocketMQ = "rocketmq"
+
+var protocolOnce sync.Once
+
+func init() {
+	extension.SetProtocol(RocketMQ, GetProtocol)
+	protocolOnce = sync.Once{}
+}
+
+var (
+	rocketmqProtocol *RocketMQProtocol
+)
+
+// RocketMQProtocol supports dubbo 3.0 protocol. It implements Protocol interface for dubbo protocol.
+type RocketMQProtocol struct {
+	protocol.BaseProtocol
+	serverLock sync.Mutex
+
+	serviceMap *sync.Map                       // serviceMap is used to export multiple service by one server
+	serverMap  map[string]*triple.TripleServer // serverMap stores all exported server
+}
+
+func NewRocketMQProtocol() *RocketMQProtocol {
+	return &RocketMQProtocol{
+		BaseProtocol: protocol.NewBaseProtocol(),
+	}
+}
+
+func (rp *RocketMQProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
+	url := invoker.GetURL()
+	serviceKey := url.ServiceKey()
+	// todo
+	exporter := NewRocketMQExporter(serviceKey, invoker, nil, nil)
+	rp.SetExporterMap(serviceKey, exporter)
+	logger.Infof("[rocketmq Protocol] Export service: %s", url.String())
+
+	// start server
+	rp.openServer(url)
+	return exporter
+}
+
+func (rp *RocketMQProtocol) Refer(url *common.URL) protocol.Invoker {
+	invoker, err := NewRocketMQInvoker(url)
+	if err != nil {
+		logger.Errorf("rocketmq protocol Refer url = %+v, with error = %s", url, err.Error())
+		return nil
+	}
+	rp.SetInvokers(invoker)
+	logger.Infof("[Rocketmq Protocol] Refer service: %s", url.String())
+	return invoker
+}
+
+// Destroy rocketmq service.
+func (rp *RocketMQProtocol) Destroy() {
+	logger.Infof("RocketMQ Protocol destroy.")
+
+	rp.BaseProtocol.Destroy()
+
+	// stop server
+	for key, server := range rp.serverMap {
+		delete(rp.serverMap, key)
+		server.Stop()
+	}
+}
+
+func (rp *RocketMQProtocol) openServer(url *common.URL) {
+	_, ok := rp.serverMap[url.Location]
+	if !ok {
+		_, ok := rp.ExporterMap().Load(url.ServiceKey())
+		if !ok {
+			panic("[RocketMQProtocol]" + url.Key() + "is not existing")
+		}
+
+		rp.serverLock.Lock()
+		_, ok = rp.serverMap[url.Location]
+		if !ok {
+			handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult {
+				return doHandleRequest(invocation)
+			}
+			srv := remoting.NewExchangeServer(url, getty.NewServer(url, handler))
+			rp.serverMap[url.Location] = srv
+			srv.Start()
+		}
+		rp.serverLock.Unlock()
+	}
+}
+
+// GetProtocol get a single rocketmq protocol.
+func GetProtocol() protocol.Protocol {
+	if rocketmqProtocol == nil {
+		rocketmqProtocol = NewRocketMQProtocol()
+	}
+	return rocketmqProtocol
+}
diff --git a/protocol/mq/rocketmq_protocol_test.go b/protocol/mq/rocketmq_protocol_test.go
new file mode 100644
index 000000000..71893fd01
--- /dev/null
+++ b/protocol/mq/rocketmq_protocol_test.go
@@ -0,0 +1 @@
+package mq