You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by ge...@apache.org on 2022/07/11 16:11:03 UTC

[dubbo-go] 01/01: feat: init rocketmq go client

This is an automated email from the ASF dual-hosted git repository.

georgehao pushed a commit to branch feat/remoting_rocketmq
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git

commit 56d3ff5888a5ae48dc7c4ef78dd64f562d1b4246
Author: haohongfan1 <ha...@jd.com>
AuthorDate: Tue Jul 12 00:10:35 2022 +0800

    feat: init rocketmq go client
---
 go.mod                                |   1 +
 go.sum                                |  15 ++++
 protocol/mq/doc.go                    |  19 +++++
 protocol/mq/rocketmq_exporter.go      |  58 +++++++++++++
 protocol/mq/rocketmq_invoker.go       | 155 ++++++++++++++++++++++++++++++++++
 protocol/mq/rocketmq_invoker_test.go  |   1 +
 protocol/mq/rocketmq_protocol.go      | 131 ++++++++++++++++++++++++++++
 protocol/mq/rocketmq_protocol_test.go |   1 +
 8 files changed, 381 insertions(+)

diff --git a/go.mod b/go.mod
index 3b21aa488..e44e4449a 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,7 @@ require (
 	github.com/alibaba/sentinel-golang v1.0.4
 	github.com/apache/dubbo-getty v1.4.8
 	github.com/apache/dubbo-go-hessian2 v1.11.0
+	github.com/apache/rocketmq-client-go/v2 v2.1.0 // indirect
 	github.com/cespare/xxhash/v2 v2.1.2
 	github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4
 	github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1
diff --git a/go.sum b/go.sum
index cbecb919a..0ca800529 100644
--- a/go.sum
+++ b/go.sum
@@ -73,6 +73,8 @@ github.com/apache/dubbo-go-hessian2 v1.9.1/go.mod h1:xQUjE7F8PX49nm80kChFvepA/Av
 github.com/apache/dubbo-go-hessian2 v1.9.3/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
 github.com/apache/dubbo-go-hessian2 v1.11.0 h1:VTdT6NStuEqNmyT3AdSN2DLDBqhXvAAyAAAoh9hLavk=
 github.com/apache/dubbo-go-hessian2 v1.11.0/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
+github.com/apache/rocketmq-client-go/v2 v2.1.0 h1:3eABKfxc1WmS2lLTTbKMe1gZfZV6u1Sx9orFnOfABV0=
+github.com/apache/rocketmq-client-go/v2 v2.1.0/go.mod h1:oEZKFDvS7sz/RWU0839+dQBupazyBV7WX5cP6nrio0Q=
 github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
 github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
@@ -192,6 +194,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
 github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
 github.com/emicklei/go-restful/v3 v3.8.0 h1:eCZ8ulSerjdAiaNpF7GxXIE7ZCMo1moN1qX+S609eVw=
 github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
+github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
+github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
 github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
 github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -682,6 +686,7 @@ github.com/shirou/gopsutil/v3 v3.21.6 h1:vU7jrp1Ic/2sHB7w6UNs7MIkn7ebVtTb5D9j45o
 github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
 github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
 github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
@@ -689,6 +694,7 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
 github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
+github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
@@ -726,6 +732,12 @@ github.com/stretchr/testify v1.7.3 h1:dAm0YRdRQlWojc3CrCRgPBzG5f941d0zvAKu7qY4e+
 github.com/stretchr/testify v1.7.3/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
 github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ=
+github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
+github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
+github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
+github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
+github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 h1:rQ229MBgvW68s1/g6f1/63TgYwYxfF4E+bi/KC19P8g=
+github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
 github.com/tklauser/go-sysconf v0.3.6 h1:oc1sJWvKkmvIxhDHeKWvZS4f6AW+YcoguSfRF2/Hmo4=
 github.com/tklauser/go-sysconf v0.3.6/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
 github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=
@@ -793,6 +805,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
 go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
@@ -1247,3 +1260,5 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
+stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
+stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
diff --git a/protocol/mq/doc.go b/protocol/mq/doc.go
new file mode 100644
index 000000000..206e27957
--- /dev/null
+++ b/protocol/mq/doc.go
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package mq implements mq rpc protocol.
+package mq
diff --git a/protocol/mq/rocketmq_exporter.go b/protocol/mq/rocketmq_exporter.go
new file mode 100644
index 000000000..9904823ed
--- /dev/null
+++ b/protocol/mq/rocketmq_exporter.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+	"sync"
+)
+
+import (
+	tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common"
+	"dubbo.apache.org/dubbo-go/v3/common/constant"
+	"dubbo.apache.org/dubbo-go/v3/common/logger"
+	"dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+// RocketMQExporter is rocketmq service exporter
+type RocketMQExporter struct {
+	protocol.BaseExporter
+	serviceMap *sync.Map
+}
+
+func NewRocketMQExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map, serviceMap *sync.Map) *RocketMQExporter {
+	return &RocketMQExporter{
+		BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap),
+		serviceMap:   serviceMap,
+	}
+}
+
+func (exporter *RocketMQExporter) Unexport() {
+	url := exporter.GetInvoker().GetURL()
+	interfaceName := url.GetParam(constant.InterfaceKey, "")
+	exporter.BaseExporter.Unexport()
+
+	err := common.ServiceMap.UnRegister(interfaceName, tripleConstant.TRIPLE, url.ServiceKey())
+	if err != nil {
+		logger.Errorf("[RocketMQExporter.Unexport] error: %v", err)
+	}
+	exporter.serviceMap.Delete(interfaceName)
+}
diff --git a/protocol/mq/rocketmq_invoker.go b/protocol/mq/rocketmq_invoker.go
new file mode 100644
index 000000000..b55430fec
--- /dev/null
+++ b/protocol/mq/rocketmq_invoker.go
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+	"context"
+	"reflect"
+	"sync"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common/constant"
+	"github.com/apache/rocketmq-client-go/v2"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/producer"
+	"github.com/dubbogo/grpc-go/metadata"
+	tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common"
+	"dubbo.apache.org/dubbo-go/v3/common/logger"
+	"dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+// same as dubbo_invoker.go attachmentKey
+var attachmentKey = []string{
+	constant.InterfaceKey, constant.GroupKey, constant.TokenKey, constant.TimeoutKey,
+	constant.VersionKey,
+}
+
+// RocketMQInvoker is implement of protocol.Invoker, a RocketMQInvoker refer to one service and ip.
+type RocketMQInvoker struct {
+	protocol.BaseInvoker
+
+	topic    string
+	producer rocketmq.Producer
+	quitOnce sync.Once
+	mtx      sync.RWMutex
+}
+
+// NewRocketMQInvoker constructor
+func NewRocketMQInvoker(url *common.URL) (*RocketMQInvoker, error) {
+	// todo 从配置文件中获取
+	groupName := "please_rename_unique_group_name"
+	nameserverIpPort := []string{"127.0.0.1:9876"}
+	nameserverResolver := primitive.NewPassthroughResolver(nameserverIpPort)
+	retry := 2
+
+	rocketmqProducer, err := rocketmq.NewProducer(
+		producer.WithGroupName(groupName),
+		producer.WithNsResolver(nameserverResolver),
+		producer.WithRetry(retry),
+	)
+
+	if err != nil {
+		logger.Warnf("init rocketmq producer groupName:%s nameserver:%v error:%v", groupName, nameserverIpPort, err)
+		return nil, err
+	}
+
+	if err := rocketmqProducer.Start(); err != nil {
+		logger.Warnf("start rocketmq producer groupName:%s nameserver:%v error:%v", groupName, nameserverIpPort, err)
+		return nil, err
+	}
+
+	return &RocketMQInvoker{
+		BaseInvoker: *protocol.NewBaseInvoker(url),
+		producer:    rocketmqProducer,
+	}, nil
+}
+
+func (invoker *RocketMQInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
+	var result protocol.RPCResult
+
+	if invoker.BaseInvoker.IsAvailable() {
+		logger.Warnf("rocketmq invoker destroyed")
+		result.Err = protocol.ErrDestroyedInvoker
+		return &result
+	}
+
+	invoker.mtx.RLock()
+	defer invoker.mtx.RUnlock()
+	if invoker.producer == nil {
+		result.Err = protocol.ErrClientClosed
+		return &result
+	}
+
+	if invoker.BaseInvoker.IsAvailable() {
+		logger.Warnf("rocketmq invoker destroyed")
+		result.Err = protocol.ErrDestroyedInvoker
+		return &result
+	}
+
+	for _, k := range attachmentKey {
+		if v := di.GetURL().GetParam(k, ""); len(v) > 0 {
+			invocation.SetAttachment(k, v)
+		}
+	}
+
+	// append interface id to ctx
+	gRPCMD := make(metadata.MD, 0)
+	for k, v := range invocation.Attachments() {
+		if str, ok := v.(string); ok {
+			gRPCMD.Set(k, str)
+			continue
+		}
+		if str, ok := v.([]string); ok {
+			gRPCMD.Set(k, str...)
+			continue
+		}
+		logger.Warnf("triple attachment value with key = %s is invalid, which should be string or []string", k)
+	}
+	ctx = metadata.NewOutgoingContext(ctx, gRPCMD)
+	ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, invoker.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, ""))
+	in := make([]reflect.Value, 0, 16)
+	in = append(in, reflect.ValueOf(ctx))
+
+	if len(invocation.ParameterValues()) > 0 {
+		in = append(in, invocation.ParameterValues()...)
+	}
+
+	methodName := invocation.MethodName()
+	triAttachmentWithErr := invoker.pro(methodName, in, invocation.Reply())
+	result.Err = triAttachmentWithErr.GetError()
+	result.Attrs = make(map[string]interface{})
+	for k, v := range triAttachmentWithErr.GetAttachments() {
+		result.Attrs[k] = v
+	}
+	result.Rest = invocation.Reply()
+	return &result
+
+	return nil
+}
+
+func (invoker *RocketMQInvoker) IsAvailable() bool {
+	return false
+}
+
+func (invoker *RocketMQInvoker) Destroy() {
+}
diff --git a/protocol/mq/rocketmq_invoker_test.go b/protocol/mq/rocketmq_invoker_test.go
new file mode 100644
index 000000000..71893fd01
--- /dev/null
+++ b/protocol/mq/rocketmq_invoker_test.go
@@ -0,0 +1 @@
+package mq
diff --git a/protocol/mq/rocketmq_protocol.go b/protocol/mq/rocketmq_protocol.go
new file mode 100644
index 000000000..cc0fffba9
--- /dev/null
+++ b/protocol/mq/rocketmq_protocol.go
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+	"sync"
+)
+
+import (
+	"github.com/dubbogo/triple/pkg/triple"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common"
+	"dubbo.apache.org/dubbo-go/v3/common/extension"
+	"dubbo.apache.org/dubbo-go/v3/common/logger"
+	"dubbo.apache.org/dubbo-go/v3/protocol"
+	"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+	"dubbo.apache.org/dubbo-go/v3/remoting"
+	"dubbo.apache.org/dubbo-go/v3/remoting/getty"
+)
+
+const RocketMQ = "rocketmq"
+
+var protocolOnce sync.Once
+
+func init() {
+	extension.SetProtocol(RocketMQ, GetProtocol)
+	protocolOnce = sync.Once{}
+}
+
+var (
+	rocketmqProtocol *RocketMQProtocol
+)
+
+// RocketMQProtocol supports dubbo 3.0 protocol. It implements Protocol interface for dubbo protocol.
+type RocketMQProtocol struct {
+	protocol.BaseProtocol
+	serverLock sync.Mutex
+
+	serviceMap *sync.Map                       // serviceMap is used to export multiple service by one server
+	serverMap  map[string]*triple.TripleServer // serverMap stores all exported server
+}
+
+func NewRocketMQProtocol() *RocketMQProtocol {
+	return &RocketMQProtocol{
+		BaseProtocol: protocol.NewBaseProtocol(),
+	}
+}
+
+func (rp *RocketMQProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
+	url := invoker.GetURL()
+	serviceKey := url.ServiceKey()
+	// todo
+	exporter := NewRocketMQExporter(serviceKey, invoker, nil, nil)
+	rp.SetExporterMap(serviceKey, exporter)
+	logger.Infof("[rocketmq Protocol] Export service: %s", url.String())
+
+	// start server
+	rp.openServer(url)
+	return exporter
+}
+
+func (rp *RocketMQProtocol) Refer(url *common.URL) protocol.Invoker {
+	invoker, err := NewRocketMQInvoker(url)
+	if err != nil {
+		logger.Errorf("rocketmq protocol Refer url = %+v, with error = %s", url, err.Error())
+		return nil
+	}
+	rp.SetInvokers(invoker)
+	logger.Infof("[Rocketmq Protocol] Refer service: %s", url.String())
+	return invoker
+}
+
+// Destroy rocketmq service.
+func (rp *RocketMQProtocol) Destroy() {
+	logger.Infof("RocketMQ Protocol destroy.")
+
+	rp.BaseProtocol.Destroy()
+
+	// stop server
+	for key, server := range rp.serverMap {
+		delete(rp.serverMap, key)
+		server.Stop()
+	}
+}
+
+func (rp *RocketMQProtocol) openServer(url *common.URL) {
+	_, ok := rp.serverMap[url.Location]
+	if !ok {
+		_, ok := rp.ExporterMap().Load(url.ServiceKey())
+		if !ok {
+			panic("[RocketMQProtocol]" + url.Key() + "is not existing")
+		}
+
+		rp.serverLock.Lock()
+		_, ok = rp.serverMap[url.Location]
+		if !ok {
+			handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult {
+				return doHandleRequest(invocation)
+			}
+			srv := remoting.NewExchangeServer(url, getty.NewServer(url, handler))
+			rp.serverMap[url.Location] = srv
+			srv.Start()
+		}
+		rp.serverLock.Unlock()
+	}
+}
+
+// GetProtocol get a single rocketmq protocol.
+func GetProtocol() protocol.Protocol {
+	if rocketmqProtocol == nil {
+		rocketmqProtocol = NewRocketMQProtocol()
+	}
+	return rocketmqProtocol
+}
diff --git a/protocol/mq/rocketmq_protocol_test.go b/protocol/mq/rocketmq_protocol_test.go
new file mode 100644
index 000000000..71893fd01
--- /dev/null
+++ b/protocol/mq/rocketmq_protocol_test.go
@@ -0,0 +1 @@
+package mq