You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/05/07 03:58:58 UTC
[2/3] incubator-rocketmq-externals git commit: Go-Client remoting and
RocketMqClient common method implement,
closes apache/incubator-rocketmq-externals#17
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/response_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/response_code.go b/rocketmq-go/model/response_code.go
index 957c1e9..ed40a6d 100644
--- a/rocketmq-go/model/response_code.go
+++ b/rocketmq-go/model/response_code.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package model
const (
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/send_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/send_result.go b/rocketmq-go/model/send_result.go
index b1af4fb..4d3b31f 100644
--- a/rocketmq-go/model/send_result.go
+++ b/rocketmq-go/model/send_result.go
@@ -52,7 +52,7 @@ func NewSendResult(status SendStatus, msgID, offsetID string, queue *message.Mes
}
func EncoderSendResultToJson(obj interface{}) string {
- return nil // TODO
+ return "" // TODO
}
func DecoderSendResultFromJson(json string) *SendResult {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/subscription_data.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/subscription_data.go b/rocketmq-go/model/subscription_data.go
new file mode 100644
index 0000000..ce5ae74
--- /dev/null
+++ b/rocketmq-go/model/subscription_data.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+type SubscriptionData struct {
+ Topic string
+ SubString string
+ ClassFilterMode bool
+ TagsSet []string
+ CodeSet []string
+ SubVersion int64
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_publishInfo.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publishInfo.go b/rocketmq-go/model/topic_publishInfo.go
index b2f711b..b5f9e37 100644
--- a/rocketmq-go/model/topic_publishInfo.go
+++ b/rocketmq-go/model/topic_publishInfo.go
@@ -18,59 +18,59 @@
package model
import (
- "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+//"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
)
-type TopicPublishInfo struct {
- orderTopic bool
- havaTopicRouteInfo bool
- messageQueueList []*message.MessageQueue
- topicRouteData *TopicRouteData
-}
-
-func (info *TopicPublishInfo) SetOrderTopic(b bool) {
- info.orderTopic = b
-}
-
-func (info *TopicPublishInfo) Ok() bool {
- return false
-}
-
-func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue {
- return info.messageQueueList
-}
-
-func (info *TopicPublishInfo) HaveTopicRouteInfo() bool {
- return info.havaTopicRouteInfo
-}
-
-func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) {
- info.havaTopicRouteInfo = b
-}
-
-func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData {
- return info.topicRouteData
-}
-
-func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) {
- info.topicRouteData = routeDate
-}
-
-func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue {
- return nil //TODO
-}
-
-func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName string) *message.MessageQueue {
- if brokerName == "" {
- return info.SelectOneMessageQueue()
- }
- return nil //TODO
-}
-
-func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int {
- return nil //TODO
-}
-
-func (info *TopicPublishInfo) String() string {
- return nil
-}
+//type TopicPublishInfo struct {
+// orderTopic bool
+// havaTopicRouteInfo bool
+// messageQueueList []*message.MessageQueue
+// topicRouteData *TopicRouteData
+//}
+//
+//func (info *TopicPublishInfo) SetOrderTopic(b bool) {
+// info.orderTopic = b
+//}
+//
+//func (info *TopicPublishInfo) Ok() bool {
+// return false
+//}
+//
+//func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue {
+// return info.messageQueueList
+//}
+//
+//func (info *TopicPublishInfo) HaveTopicRouteInfo() bool {
+// return info.havaTopicRouteInfo
+//}
+//
+//func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) {
+// info.havaTopicRouteInfo = b
+//}
+//
+//func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData {
+// return info.topicRouteData
+//}
+//
+//func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) {
+// info.topicRouteData = routeDate
+//}
+//
+//func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue {
+// return nil //TODO
+//}
+//
+//func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName string) *message.MessageQueue {
+// if brokerName == "" {
+// return info.SelectOneMessageQueue()
+// }
+// return nil //TODO
+//}
+//
+//func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int {
+// return 0 //TODO
+//}
+//
+//func (info *TopicPublishInfo) String() string {
+// return ""
+//}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_publish_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publish_info.go b/rocketmq-go/model/topic_publish_info.go
new file mode 100644
index 0000000..14ec088
--- /dev/null
+++ b/rocketmq-go/model/topic_publish_info.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "sync/atomic"
+)
+
+type TopicPublishInfo struct {
+ OrderTopic bool
+ HaveTopicRouterInfo bool
+ MessageQueueList []MessageQueue
+ TopicRouteDataInstance *TopicRouteData
+ topicQueueIndex int32
+}
+
+//private boolean orderTopic = false;
+//private boolean haveTopicRouterInfo = false;
+//private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
+//private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0); // todo
+//private TopicRouteData topicRouteData;
+
+func (self *TopicPublishInfo) JudgeTopicPublishInfoOk() (bIsTopicOk bool) {
+ bIsTopicOk = (len(self.MessageQueueList) > 0)
+ return
+}
+func (self *TopicPublishInfo) FetchQueueIndex() (index int) {
+ qLen := len(self.MessageQueueList)
+ if qLen > 0 {
+ qIndex := atomic.AddInt32(&self.topicQueueIndex, 1)
+ qIndex = qIndex % int32(qLen)
+ index = int(qIndex)
+ }
+ return
+}
+func BuildTopicSubscribeInfoFromRoteData(topic string, topicRouteData *TopicRouteData) (mqList []*MessageQueue) {
+ mqList = make([]*MessageQueue, 0)
+ for _, queueData := range topicRouteData.QueueDatas {
+ if !constant.ReadAble(queueData.Perm) {
+ continue
+ }
+ var i int32
+ for i = 0; i < queueData.ReadQueueNums; i++ {
+ mq := &MessageQueue{
+ Topic: topic,
+ BrokerName: queueData.BrokerName,
+ QueueId: i,
+ }
+ mqList = append(mqList, mq)
+ }
+ }
+ return
+}
+
+func BuildTopicPublishInfoFromTopicRoteData(topic string, topicRouteData *TopicRouteData) (topicPublishInfo *TopicPublishInfo) {
+ // all order topic is false todo change
+ topicPublishInfo = &TopicPublishInfo{
+ TopicRouteDataInstance: topicRouteData,
+ OrderTopic: false,
+ MessageQueueList: []MessageQueue{}}
+ for _, queueData := range topicRouteData.QueueDatas {
+ if !constant.WriteAble(queueData.Perm) {
+ continue
+ }
+ for _, brokerData := range topicRouteData.BrokerDatas {
+ if brokerData.BrokerName == queueData.BrokerName {
+ if len(brokerData.BrokerAddrs["0"]) == 0 {
+ break
+ }
+ var i int32
+ for i = 0; i < queueData.WriteQueueNums; i++ {
+ messageQueue := MessageQueue{Topic: topic, BrokerName: queueData.BrokerName, QueueId: i}
+ topicPublishInfo.MessageQueueList = append(topicPublishInfo.MessageQueueList, messageQueue)
+ topicPublishInfo.HaveTopicRouterInfo = true
+ }
+ break
+ }
+ }
+ }
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_route_data.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_route_data.go b/rocketmq-go/model/topic_route_data.go
index f387529..348479f 100644
--- a/rocketmq-go/model/topic_route_data.go
+++ b/rocketmq-go/model/topic_route_data.go
@@ -18,108 +18,128 @@
package model
import (
- "fmt"
- "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+ //"fmt"
+ //"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+ "sync"
)
-type BrokerData struct {
-}
+//
+//type BrokerData struct {
+//}
+//
+//type TopicRouteData struct {
+// orderTopicConf string
+// queueDatas []*message.MessageQueue
+// brokerDatas []*BrokerData
+// filterServerTable map[string][]string
+//}
+//
+//func NewTopicRouteData() *TopicRouteData {
+// return &TopicRouteData{}
+//}
+//
+//func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData *TopicRouteData) {
+// clonedRouteData = &TopicRouteData{
+// route.orderTopicConf,
+// route.queueDatas,
+// route.brokerDatas,
+// route.filterServerTable,
+// }
+// // TODO: to complete
+// return
+//}
+//
+//func (route *TopicRouteData) QueueDatas() []*message.MessageQueue {
+// return route.queueDatas
+//}
+//
+//func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) {
+// route.queueDatas = data
+//}
+//
+//func (route *TopicRouteData) BrokerDatas() []*BrokerData {
+// return route.brokerDatas
+//}
+//
+//func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) {
+// route.brokerDatas = data
+//}
+//
+//func (route *TopicRouteData) FilterServerTable() map[string][]string {
+// return route.filterServerTable
+//}
+//
+//func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) {
+// route.filterServerTable = data
+//}
+//
+//func (route *TopicRouteData) OrderTopicConf() string {
+// return route.orderTopicConf
+//}
+//
+//func (route *TopicRouteData) SetOrderTopicConf(s string) {
+// route.orderTopicConf = s
+//}
+//
+//func (route *TopicRouteData) HashCode() (result int) {
+// prime := 31
+// result = 1
+// result *= prime
+// // TODO
+//
+// return
+//}
+//
+//func (route *TopicRouteData) Equals(route1 interface{}) bool {
+// if route == nil {
+// return true
+// }
+// if route1 == nil {
+// return false
+// }
+// //value, ok := route1.(TopicRouteData)
+// //if !ok {
+// // return false
+// //}
+// // TODO
+// //if route.brokerDatas == nil && value.brokerDatas != nil || len(route.brokerDatas) != len(value.brokerDatas) {
+// // return false
+// //}
+// //
+// //if route.orderTopicConf == "" && value.orderTopicConf != "" || route.orderTopicConf != value.orderTopicConf {
+// // return false
+// //}
+// //
+// //if route.queueDatas == nil && value.queueDatas != nil || route.queueDatas != value.queueDatas {
+// // return false
+// //}
+// //
+// //if route.filterServerTable == nil && value.filterServerTable != nil ||
+// // route.filterServerTable != value.filterServerTable {
+// // return false
+// //}
+// return true
+//}
+//
+//func (route *TopicRouteData) String() string {
+// return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, brokerDatas=%s, filterServerTable=%s]",
+// route.orderTopicConf, route.queueDatas, route.brokerDatas, route.filterServerTable)
+//}
type TopicRouteData struct {
- orderTopicConf string
- queueDatas []*message.MessageQueue
- brokerDatas []*BrokerData
- filterServerTable map[string][]string
-}
-
-func NewTopicRouteData() *TopicRouteData {
- return &TopicRouteData{}
-}
-
-func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData *TopicRouteData) {
- clonedRouteData = &TopicRouteData{
- route.orderTopicConf,
- route.queueDatas,
- route.brokerDatas,
- route.filterServerTable,
- }
- // TODO: to complete
- return
-}
-
-func (route *TopicRouteData) QueueDatas() []*message.MessageQueue {
- return route.queueDatas
-}
-
-func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) {
- route.queueDatas = data
-}
-
-func (route *TopicRouteData) BrokerDatas() []*BrokerData {
- return route.brokerDatas
-}
-
-func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) {
- route.brokerDatas = data
+ OrderTopicConf string
+ QueueDatas []*QueueData
+ BrokerDatas []*BrokerData
}
-
-func (route *TopicRouteData) FilterServerTable() map[string][]string {
- return route.filterServerTable
-}
-
-func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) {
- route.filterServerTable = data
-}
-
-func (route *TopicRouteData) OrderTopicConf() string {
- return route.orderTopicConf
-}
-
-func (route *TopicRouteData) SetOrderTopicConf(s string) {
- route.orderTopicConf = s
+type QueueData struct {
+ BrokerName string
+ ReadQueueNums int32
+ WriteQueueNums int32
+ Perm int32
+ TopicSynFlag int32
}
-
-func (route *TopicRouteData) HashCode() (result int) {
- prime := 31
- result = 1
- result *= prime
- // TODO
-
- return
-}
-
-func (route *TopicRouteData) Equals(route1 interface{}) bool {
- if route == nil {
- return true
- }
- if route1 == nil {
- return false
- }
- //value, ok := route1.(TopicRouteData)
- //if !ok {
- // return false
- //}
- // TODO
- //if route.brokerDatas == nil && value.brokerDatas != nil || len(route.brokerDatas) != len(value.brokerDatas) {
- // return false
- //}
- //
- //if route.orderTopicConf == "" && value.orderTopicConf != "" || route.orderTopicConf != value.orderTopicConf {
- // return false
- //}
- //
- //if route.queueDatas == nil && value.queueDatas != nil || route.queueDatas != value.queueDatas {
- // return false
- //}
- //
- //if route.filterServerTable == nil && value.filterServerTable != nil ||
- // route.filterServerTable != value.filterServerTable {
- // return false
- //}
- return true
-}
-
-func (route *TopicRouteData) String() string {
- return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, brokerDatas=%s, filterServerTable=%s]",
- route.orderTopicConf, route.queueDatas, route.brokerDatas, route.filterServerTable)
+type BrokerData struct {
+ BrokerName string
+ BrokerAddrs map[string]string
+ BrokerAddrsLock sync.RWMutex
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_client_manager.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_client_manager.go b/rocketmq-go/mq_client_manager.go
index c07dcfd..731158f 100644
--- a/rocketmq-go/mq_client_manager.go
+++ b/rocketmq-go/mq_client_manager.go
@@ -16,13 +16,67 @@
*/
package rocketmq
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+ "sync"
+ "time"
+)
type MqClientManager struct {
clientFactory *ClientFactory
rocketMqClient service.RocketMqClient
pullMessageController *PullMessageController
defaultProducerService RocketMQProducer //for send back message
+
+ rocketMqManagerLock sync.Mutex
+ //ClientId string
+ BootTimestamp int64
+
+ NamesrvLock sync.Mutex
+ HeartBeatLock sync.Mutex
+ //rebalanceControllr *RebalanceController
+}
+
+type MqClientConfig struct {
+}
+
+func NewMqClientManager(clientConfig *MqClientConfig) (rocketMqManager *MqClientManager) {
+ rocketMqManager = &MqClientManager{}
+ rocketMqManager.BootTimestamp = time.Now().Unix()
+ rocketMqManager.clientFactory = clientFactoryInit()
+ //rocketMqManager.rocketMqClient =
+ //rocketMqManager.pullMessageController = NewPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
+ //rocketMqManager.cleanExpireMsgController = NewCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
+ //rocketMqManager.rebalanceControllr = NewRebalanceController(rocketMqManager.clientFactory)
+
+ return
+}
+
+func (self *MqClientManager) RegisterProducer(producer *DefaultMQProducer) {
+ return
+}
+
+func (self *MqClientManager) RegisterConsumer(consumer RocketMQConsumer) {
+ // todo check config
+ //if (self.defaultProducerService == nil) {
+ // self.defaultProducerService = service.NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, mq_config.NewProducerConfig(), self.mqClient)
+ //}
+ return
+}
+
+func (self *MqClientManager) Start() {
+ //self.SendHeartbeatToAllBrokerWithLock()//we should send heartbeat first
+ self.startAllScheduledTask()
+}
+func (manager *MqClientManager) startAllScheduledTask() {
+
+}
+
+func clientFactoryInit() (clientFactory *ClientFactory) {
+ clientFactory = &ClientFactory{}
+ clientFactory.ProducerTable = make(map[string]RocketMQProducer)
+ clientFactory.ConsumerTable = make(map[string]RocketMQConsumer)
+ return
}
type ClientFactory struct {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_consumer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_consumer.go b/rocketmq-go/mq_consumer.go
index 7712ee1..7112537 100644
--- a/rocketmq-go/mq_consumer.go
+++ b/rocketmq-go/mq_consumer.go
@@ -16,7 +16,10 @@
*/
package rocketmq
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+)
type RocketMQConsumer interface {
}
@@ -28,5 +31,44 @@ type DefaultMQPushConsumer struct {
mqClient service.RocketMqClient
rebalance *service.Rebalance //Rebalance's impl depend on offsetStore
consumeMessageService service.ConsumeMessageService
- ConsumerConfig *MqConsumerConfig
+ consumerConfig *MqConsumerConfig
+
+ consumerGroup string
+ //consumeFromWhere string
+ consumeType string
+ messageModel string
+ unitMode bool
+
+ subscription map[string]string //topic|subExpression
+ subscriptionTag map[string][]string // we use it filter again
+ // 分配策略
+ pause bool //when reset offset we need pause
+}
+
+func NewDefaultMQPushConsumer(consumerGroup string, mqConsumerConfig *MqConsumerConfig) (defaultMQPushConsumer *DefaultMQPushConsumer) {
+ defaultMQPushConsumer = &DefaultMQPushConsumer{}
+ defaultMQPushConsumer.consumerConfig = mqConsumerConfig
+ return
+}
+
+func (self *DefaultMQPushConsumer) RegisterMessageListener(messageListener model.MessageListener) {
+ self.consumeMessageService = service.NewConsumeMessageConcurrentlyServiceImpl(messageListener)
+}
+func (self *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) {
+ //self.subscription[topic] = subExpression
+ //if len(subExpression) == 0 || subExpression == "*" {
+ // return
+ //}
+ //tags := strings.Split(subExpression, "||")
+ //tagsList := []string{}
+ //for _, tag := range tags {
+ // t := strings.TrimSpace(tag)
+ // if len(t) == 0 {
+ // continue
+ // }
+ // tagsList = append(tagsList, t)
+ //}
+ //if len(tagsList) > 0 {
+ // self.subscriptionTag[topic] = tagsList
+ //}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_producer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_producer.go b/rocketmq-go/mq_producer.go
index 3677939..d1a011b 100644
--- a/rocketmq-go/mq_producer.go
+++ b/rocketmq-go/mq_producer.go
@@ -25,7 +25,7 @@ type MqProducerConfig struct {
}
type DefaultMQProducer struct {
- producerGroup string
- ProducerConfig *MqProducerConfig
- producerService service.ProducerService
+ producerGroup string
+ mqProducerConfig *MqProducerConfig
+ producerService service.ProducerService
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/custom_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/custom_header.go b/rocketmq-go/remoting/custom_header.go
index 40feade..04a46be 100644
--- a/rocketmq-go/remoting/custom_header.go
+++ b/rocketmq-go/remoting/custom_header.go
@@ -18,4 +18,5 @@ package remoting
type CustomerHeader interface {
FromMap(headerMap map[string]interface{})
+ //ToMap()(headerMap map[string]interface{})
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/event_executor.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/event_executor.go b/rocketmq-go/remoting/event_executor.go
deleted file mode 100644
index 38e1ee6..0000000
--- a/rocketmq-go/remoting/event_executor.go
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package remoting
-
-import (
- "fmt"
- "github.com/golang/glog"
- "net"
- "sync"
-)
-
-type Runnable interface {
- Run()
-}
-
-type NetEventType int
-
-const (
- Connect NetEventType = iota
- Close
- Idle
- Exception // TODO error?
-)
-
-type NetEvent struct {
- eType NetEventType
- remoteAddress string
- conn net.Conn
-}
-
-func NewEventType(eType NetEventType, remoteAddr string, conn net.Conn) *NetEvent {
- return &NetEvent{eType, remoteAddr, conn}
-}
-
-func (event *NetEvent) Type() NetEventType {
- return event.eType
-}
-
-func (event *NetEvent) RemoteAddress() string {
- return event.remoteAddress
-}
-
-func (event *NetEvent) Conn() net.Conn {
- return event.conn
-}
-
-func (event *NetEvent) String() string {
- return fmt.Sprintf("NettyEvent [type=%s, remoteAddr=%s, channel=%s]",
- event.eType, event.remoteAddress, event.conn)
-}
-
-type NetEventExecutor struct {
- hasNotified bool
- running bool
- stopped chan int
- mu sync.RWMutex // TODO need init?
- client *RemotingClient
-
- eventQueue chan *NetEvent
- maxSize int
-}
-
-func NewNetEventExecutor(client *RemotingClient) *NetEventExecutor {
- return &NetEventExecutor{
- hasNotified: false,
- running: false,
- stopped: make(chan int),
- client: client,
- eventQueue: make(chan *NetEvent, 100), // TODO confirm size
- maxSize: 10000,
- }
-}
-
-func (executor *NetEventExecutor) Start() {
- go executor.run()
-}
-
-func (executor *NetEventExecutor) Shutdown() {
- executor.stopped <- 0
-}
-
-func (executor *NetEventExecutor) PutEvent(event *NetEvent) {
- if len(executor.eventQueue) <= executor.maxSize {
- executor.eventQueue <- event //append(executor.eventQueue, event)
- } else {
- fmt.Sprintf("event queue size[%s] enough, so drop this event %s", len(executor.eventQueue), event.String())
- }
-}
-
-func (executor *NetEventExecutor) ServiceName() string {
- // TODO
- return nil
-}
-
-func (executor *NetEventExecutor) run() {
- glog.Infof("%s service started", executor.ServiceName())
-
- executor.mu.Lock()
- executor.running = true
- executor.mu.Unlock()
-
- listener := executor.client.ConnEventListener()
- for executor.running { // TODO optimize
- select {
- case event := <-executor.eventQueue:
- if event != nil && listener != nil {
- switch event.Type() {
- case Connect:
- listener.OnConnConnect(event.remoteAddress, event.Conn())
- case Close:
- listener.OnConnClose(event.remoteAddress, event.Conn())
- case Idle:
- listener.OnConnIdle(event.remoteAddress, event.Conn())
- case Exception:
- listener.OnConnException(event.remoteAddress, event.Conn())
- default:
- break
- }
- }
- case <-executor.stopped:
- executor.mu.Lock()
- executor.running = false
- executor.mu.Unlock()
- break
- }
- }
-
- glog.Infof("%s service exit.", executor.ServiceName())
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/json_serializable.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/json_serializable.go b/rocketmq-go/remoting/json_serializable.go
new file mode 100644
index 0000000..c2c5ea0
--- /dev/null
+++ b/rocketmq-go/remoting/json_serializable.go
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+ "encoding/json"
+)
+
+type JsonSerializer struct {
+}
+
+func (self *JsonSerializer) EncodeHeaderData(command *RemotingCommand) []byte {
+ buf, err := json.Marshal(command)
+ if err != nil {
+ return nil
+ }
+ return buf
+}
+func (self *JsonSerializer) DecodeRemoteCommand(header, body []byte) *RemotingCommand {
+ cmd := &RemotingCommand{}
+ cmd.ExtFields = make(map[string]interface{})
+ err := json.Unmarshal(header, cmd)
+ if err != nil {
+ return nil
+ }
+ cmd.Body = body
+ return cmd
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/remoting_client.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/remoting_client.go b/rocketmq-go/remoting/remoting_client.go
index 38685cb..206fdcf 100644
--- a/rocketmq-go/remoting/remoting_client.go
+++ b/rocketmq-go/remoting/remoting_client.go
@@ -14,305 +14,357 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package remoting
import (
+ "bytes"
+ "encoding/binary"
"errors"
- "fmt"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
"github.com/golang/glog"
"math/rand"
"net"
+ "strconv"
+ "strings"
"sync"
"time"
)
-type ConnEventListener interface {
- OnConnConnect(remoteAddress string, conn net.Conn)
- OnConnClose(remoteAddress string, conn net.Conn)
- OnConnIdle(remoteAddress string, conn net.Conn)
- OnConnException(remoteAddress string, conn net.Conn)
+type RemotingClient interface {
+ InvokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err error)
+ InvokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error
+ InvokeOneWay(addr string, request *RemotingCommand, timeoutMillis int64) error
}
-
-type NetRequestProcessor struct {
+type DefalutRemotingClient struct {
+ clientId string
+ clientConfig *config.ClientConfig
+
+ connTable map[string]net.Conn
+ connTableLock sync.RWMutex
+
+ responseTable util.ConcurrentMap //map[int32]*ResponseFuture
+ processorTable util.ConcurrentMap //map[int]ClientRequestProcessor //requestCode|ClientRequestProcessor
+ // protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
+ //new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
+ namesrvAddrList []string
+ namesrvAddrSelectedAddr string
+ namesrvAddrSelectedIndex int //how to chose. done
+ namesvrLockRW sync.RWMutex //
+ clientRequestProcessor ClientRequestProcessor //mange register the processor here
+ serializerHandler SerializerHandler //rocketmq encode decode
}
-type NetConfig struct {
- clientWorkerNumber int
- clientCallbackExecutorNumber int
- clientOneWaySemaphoreValue int
- clientAsyncSemaphoreValue int
- connectTimeoutMillis time.Duration
- channelNotActiveInterval time.Duration
-
- clientChannelMaxIdleTimeSeconds time.Duration
- clientSocketSndBufSize int
- clientSocketRcvBufSize int
- clientPooledByteBufAllocatorEnable bool
- clientCloseSocketIfTimeout bool
+func RemotingClientInit(clientConfig *config.ClientConfig, clientRequestProcessor ClientRequestProcessor) (client *DefalutRemotingClient) {
+ client = &DefalutRemotingClient{}
+ client.connTable = map[string]net.Conn{}
+ client.responseTable = util.New()
+ client.clientConfig = clientConfig
+
+ client.namesrvAddrList = strings.Split(clientConfig.NameServerAddress(), ";")
+ client.namesrvAddrSelectedIndex = -1
+ client.clientRequestProcessor = clientRequestProcessor
+ client.serializerHandler = NewSerializerHandler()
+ return
}
-type Pair struct {
- o1 *NetRequestProcessor
- o2 *ExecutorService
+func (self *DefalutRemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err error) {
+ var conn net.Conn
+ conn, err = self.GetOrCreateConn(addr)
+ response := &ResponseFuture{
+ SendRequestOK: false,
+ Opaque: request.Opaque,
+ TimeoutMillis: timeoutMillis,
+ BeginTimestamp: time.Now().Unix(),
+ Done: make(chan bool),
+ }
+ header := self.serializerHandler.EncodeHeader(request)
+ body := request.Body
+ self.SetResponse(request.Opaque, response)
+ err = self.sendRequest(header, body, conn, addr)
+ if err != nil {
+ glog.Error(err)
+ return
+ }
+ select {
+ case <-response.Done:
+ remotingCommand = response.ResponseCommand
+ return
+ case <-time.After(time.Duration(timeoutMillis) * time.Millisecond):
+ err = errors.New("invoke sync timeout:" + strconv.FormatInt(timeoutMillis, 10) + " Millisecond")
+ return
+ }
}
-
-type RemotingClient struct {
- semaphoreOneWay sync.Mutex // TODO right? use chan?
- semaphoreAsync sync.Mutex
- processorTable map[int]*Pair
- netEventExecutor *NetEventExecutor
- defaultRequestProcessor *Pair
-
- config NetConfig
- connTable map[string]net.Conn
- connTableLock sync.RWMutex
- timer *time.Timer
- namesrvAddrList []string
- namesrvAddrChoosed string
-
- callBackExecutor *ExecutorService
- listener ConnEventListener
- rpcHook RPCHook
- responseTable map[int32]*ResponseFuture
- responseTableLock sync.RWMutex
+func (self *DefalutRemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error {
+ conn, err := self.GetOrCreateConn(addr)
+ if err != nil {
+ return err
+ }
+ response := &ResponseFuture{
+ SendRequestOK: false,
+ Opaque: request.Opaque,
+ TimeoutMillis: timeoutMillis,
+ BeginTimestamp: time.Now().Unix(),
+ InvokeCallback: invokeCallback,
+ }
+ self.SetResponse(request.Opaque, response)
+ header := self.serializerHandler.EncodeHeader(request)
+ body := request.Body
+ err = self.sendRequest(header, body, conn, addr)
+ if err != nil {
+ glog.Error(err)
+ return err
+ }
+ return err
}
-
-type ConnHandlerContext struct {
+func (self *DefalutRemotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeoutMillis int64) error {
+ conn, err := self.GetOrCreateConn(addr)
+ if err != nil {
+ return err
+ }
+ header := self.serializerHandler.EncodeHeader(request)
+ body := request.Body
+ err = self.sendRequest(header, body, conn, addr)
+ if err != nil {
+ glog.Error(err)
+ return err
+ }
+ return err
}
-type ExecutorService struct {
- callBackChannel chan func()
- quit chan bool
+func (self *DefalutRemotingClient) sendRequest(header, body []byte, conn net.Conn, addr string) error {
+ var requestBytes []byte
+ requestBytes = append(requestBytes, header...)
+ if body != nil && len(body) > 0 {
+ requestBytes = append(requestBytes, body...)
+ }
+ _, err := conn.Write(requestBytes)
+ if err != nil {
+ glog.Error(err)
+ if len(addr) > 0 {
+ self.ReleaseConn(addr, conn)
+ }
+ return err
+ }
+ return nil
}
-
-func (exec *ExecutorService) submit(callback func()) {
- exec.callBackChannel <- callback
+func (self *DefalutRemotingClient) GetNamesrvAddrList() []string {
+ return self.namesrvAddrList
}
-func (exec *ExecutorService) run() {
- go func() {
- glog.Info("Callback Executor routing start.")
- for {
- select {
- case invoke := <-exec.callBackChannel:
- invoke()
- case <-exec.quit:
- break
- }
- }
- glog.Info("Callback Executor routing quit.")
- }()
+func (self *DefalutRemotingClient) SetResponse(index int32, response *ResponseFuture) {
+ self.responseTable.Set(strconv.Itoa(int(index)), response)
}
-
-func NewRemotingClient(cfg NetConfig) *RemotingClient {
- client := &RemotingClient{
- config: cfg,
- connTable: make(map[string]net.Conn),
- timer: time.NewTimer(10 * time.Second),
- responseTable: make(map[int32]*ResponseFuture),
+func (self *DefalutRemotingClient) getResponse(index int32) (response *ResponseFuture, err error) {
+ obj, ok := self.responseTable.Get(strconv.Itoa(int(index)))
+ if !ok {
+ err = errors.New("get conn from responseTable error")
+ return
}
- // java: super(xxxx)
- return client
+ response = obj.(*ResponseFuture)
+ return
}
-
-func (rc *RemotingClient) PutNetEvent(event *NetEvent) {
- rc.netEventExecutor.PutEvent(event)
+func (self *DefalutRemotingClient) removeResponse(index int32) {
+ self.responseTable.Remove(strconv.Itoa(int(index)))
}
-
-func (rc *RemotingClient) executeInvokeCallback(future *ResponseFuture) {
- executor := rc.CallbackExecutor()
- if executor != nil {
- executor.submit(func() {
- future.invokeCallback(future)
- })
+func (self *DefalutRemotingClient) GetOrCreateConn(address string) (conn net.Conn, err error) {
+ if len(address) == 0 {
+ conn, err = self.getNamesvrConn()
return
}
- future.executeInvokeCallback()
-}
-
-// check timeout future
-func (rc *RemotingClient) scanResponseTable() {
- rfMap := make(map[int]*ResponseFuture)
- for k, future := range rc.responseTable { // TODO safety?
- if int64(future.beginTimestamp)+int64(future.timeoutMillis)+1e9 <= time.Now().Unix() {
- future.Done()
- delete(rc.responseTable, k)
- rfMap[int(k)] = future
- glog.Warningf("remove timeout request, ", future.String())
- }
+ conn = self.GetConn(address)
+ if conn != nil {
+ return
}
-
- go func() {
- for _, future := range rfMap {
- rc.executeInvokeCallback(future) // TODO if still failed, how to deal with the message ?
- }
- }()
+ conn, err = self.CreateConn(address)
+ return
}
-
-func (rc *RemotingClient) CallbackExecutor() *ExecutorService {
- return rc.callBackExecutor
-}
-
-func (rc *RemotingClient) RPCHook() RPCHook {
- return rc.rpcHook
+func (self *DefalutRemotingClient) GetConn(address string) (conn net.Conn) {
+ self.connTableLock.RLock()
+ conn = self.connTable[address]
+ self.connTableLock.RUnlock()
+ return
}
-
-func (rc *RemotingClient) ConnEventListener() ConnEventListener {
- return rc.listener
-}
-
-func (rc *RemotingClient) invokeSync(addr string, request *RemotingCommand,
- timeout time.Duration) (*RemotingCommand, error) {
- conn := rc.getAndCreateConn(addr)
+func (self *DefalutRemotingClient) CreateConn(address string) (conn net.Conn, err error) {
+ defer self.connTableLock.Unlock()
+ self.connTableLock.Lock()
+ conn = self.connTable[address]
if conn != nil {
- if rc.rpcHook != nil {
- rc.rpcHook.DoBeforeRequest(addr, request)
- }
- opaque := request.opaque
- //defer delete(rc.responseTable, opaque) TODO should in listener
-
- future := &ResponseFuture{
- opaque: opaque,
- timeoutMillis: timeout,
- }
- rc.responseTable[opaque] = future
-
- conn.Write(request.encode()) // TODO register listener
-
- response := future.WaitResponse(timeout)
- if response == nil {
- if future.sendRequestOK {
- return nil, errors.New(fmt.Sprintf("RemotingTimeout error: %s", future.err.Error()))
- } else {
- return nil, errors.New(fmt.Sprintf("RemotingSend error: %s", future.err.Error()))
- }
- }
-
- if rc.rpcHook != nil {
- rc.rpcHook.DoBeforeResponse(addr, response)
- }
- return response, nil
- } else {
- rc.CloseConn(addr) // TODO
- return nil, errors.New(fmt.Sprintf("Connection to %s ERROR!", addr))
+ return
}
+ conn, err = self.createAndHandleTcpConn(address)
+ self.connTable[address] = conn
+ return
}
-func (rc *RemotingClient) invokeAsync(addr string, request *RemotingCommand,
- timeout time.Duration, callback InvokeCallback) error {
- conn := rc.getAndCreateConn(addr)
- if conn != nil { // TODO how to confirm conn active?
- if rc.rpcHook != nil {
- rc.rpcHook.DoBeforeRequest(addr, request)
+func (self *DefalutRemotingClient) getNamesvrConn() (conn net.Conn, err error) {
+ self.namesvrLockRW.RLock()
+ address := self.namesrvAddrSelectedAddr
+ self.namesvrLockRW.RUnlock()
+ if len(address) != 0 {
+ conn = self.GetConn(address)
+ if conn != nil {
+ return
}
+ }
- opaque := request.opaque
- acquired := false // TODO semaphore.tryAcquire...
- if acquired {
- //final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
-
- future := NewResponseFuture(opaque, timeout, callback)
- rc.responseTable[opaque] = future
-
- future.WaitResponse(timeout) // TODO add listener
+ defer self.namesvrLockRW.Unlock()
+ self.namesvrLockRW.Lock()
+ //already connected by another write lock owner
+ address = self.namesrvAddrSelectedAddr
+ if len(address) != 0 {
+ conn = self.GetConn(address)
+ if conn != nil {
+ return
}
- return nil
- } else {
- rc.CloseConn(addr) // TODO
- return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr))
}
-}
-func (rc *RemotingClient) invokeOneWay(addr string, request *RemotingCommand,
- timeout time.Duration) error {
- conn := rc.getAndCreateConn(addr)
- if conn != nil {
- if rc.rpcHook != nil {
- rc.rpcHook.DoBeforeRequest(addr, request)
+ addressCount := len(self.namesrvAddrList)
+ if self.namesrvAddrSelectedIndex < 0 {
+ self.namesrvAddrSelectedIndex = rand.Intn(addressCount)
+ }
+ for i := 1; i <= addressCount; i++ {
+ selectedIndex := (self.namesrvAddrSelectedIndex + i) % addressCount
+ selectAddress := self.namesrvAddrList[selectedIndex]
+ if len(selectAddress) == 0 {
+ continue
+ }
+ conn, err = self.CreateConn(selectAddress)
+ if err == nil {
+ self.namesrvAddrSelectedAddr = selectAddress
+ self.namesrvAddrSelectedIndex = selectedIndex
+ return
}
-
- request.MarkOneWayRpc()
- // TODO boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
- return nil
- } else {
- rc.CloseConn(addr) // TODO
- return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr))
}
+ err = errors.New("all namesvrAddress can't use!,address:" + self.clientConfig.NameServerAddress())
+ return
}
-
-func initValueIndex() int {
- r := rand.Int()
- if r < 0 { // math.Abs para is float64
- r = -r
+func (self *DefalutRemotingClient) createAndHandleTcpConn(address string) (conn net.Conn, err error) {
+ conn, err = net.Dial("tcp", address)
+ if err != nil {
+ glog.Error(err)
+ return nil, err
}
- return r % 999 % 999
+ go self.handlerReceiveLoop(conn, address) //handler连接 处理这个连接返回的结果
+ return
}
-
-func (rc *RemotingClient) Start() {
- // TODO
-}
-
-func (rc *RemotingClient) Shutdown() {
- // TODO
- rc.timer.Stop()
-}
-
-func (rc *RemotingClient) registerRPCHook(hk RPCHook) {
- rc.rpcHook = hk
-}
-
-func (rc *RemotingClient) CloseConn(addr string) {
- // TODO
+func (self *DefalutRemotingClient) ReleaseConn(addr string, conn net.Conn) {
+ defer self.connTableLock.Unlock()
+ conn.Close()
+ self.connTableLock.Lock()
+ delete(self.connTable, addr)
}
-func (rc *RemotingClient) updateNameServerAddressList(addrs []string) {
- old, update := rc.namesrvAddrList, false
-
- if addrs != nil && len(addrs) > 0 {
- if old == nil || len(addrs) != len(old) {
- update = true
- } else {
- for i := 0; i < len(addrs) && !update; i++ {
- if contains(old, addrs[i]) {
- update = true
+func (self *DefalutRemotingClient) handlerReceiveLoop(conn net.Conn, addr string) (err error) {
+ defer func() {
+ //when for is break releaseConn
+ glog.Error(err, addr)
+ self.ReleaseConn(addr, conn)
+ }()
+ b := make([]byte, 1024)
+ var length, headerLength, bodyLength int32
+ var buf = bytes.NewBuffer([]byte{})
+ var header, body []byte
+ var readTotalLengthFlag = true //readLen when true,read data when false
+ for {
+ var n int
+ n, err = conn.Read(b)
+ if err != nil {
+ return
+ }
+ _, err = buf.Write(b[:n])
+ if err != nil {
+ return
+ }
+ for {
+ if readTotalLengthFlag {
+ //we read 4 bytes of allDataLength
+ if buf.Len() >= 4 {
+ err = binary.Read(buf, binary.BigEndian, &length)
+ if err != nil {
+ return
+ }
+ readTotalLengthFlag = false //now turn to read data
+ } else {
+ break //wait bytes we not got
+ }
+ }
+ if !readTotalLengthFlag {
+ if buf.Len() < int(length) {
+ // judge all data received.if not,loop to wait
+ break
}
}
+ //now all data received, we can read totalLen again
+ readTotalLengthFlag = true
+
+ //get the data,and handler it
+ //header len
+ err = binary.Read(buf, binary.BigEndian, &headerLength)
+ var realHeaderLen = (headerLength & 0x00ffffff)
+ //headerData the first ff is about serializable type
+ var headerSerializableType = byte(headerLength >> 24)
+ header = make([]byte, realHeaderLen)
+ _, err = buf.Read(header)
+ bodyLength = length - 4 - realHeaderLen
+ body = make([]byte, int(bodyLength))
+ if bodyLength == 0 {
+ // no body
+ } else {
+ _, err = buf.Read(body)
+ }
+ go self.handlerReceivedMessage(conn, headerSerializableType, header, body)
}
}
-
- if update {
- rc.namesrvAddrList = addrs // TODO safe?
- }
-
}
-
-func (rc *RemotingClient) getAndCreateConn(addr string) net.Conn {
- return nil
-}
-
-func (rc *RemotingClient) getAndCreateNamesrvConn() net.Conn {
- return nil
-}
-
-func (rc *RemotingClient) createConn(addr string) net.Conn {
-
- return nil
+func (self *DefalutRemotingClient) handlerReceivedMessage(conn net.Conn, headerSerializableType byte, headBytes []byte, bodyBytes []byte) {
+ cmd := self.serializerHandler.DecodeRemoteCommand(headerSerializableType, headBytes, bodyBytes)
+ if cmd.IsResponseType() {
+ self.handlerResponse(cmd)
+ return
+ }
+ go self.handlerRequest(conn, cmd)
}
-
-func (rc *RemotingClient) RegisterProcessor(requestCode int, processor *NetRequestProcessor, executor ExecutorService) {
- // TODO
+func (self *DefalutRemotingClient) handlerRequest(conn net.Conn, cmd *RemotingCommand) {
+ responseCommand := self.clientRequestProcessor(cmd)
+ if responseCommand == nil {
+ return
+ }
+ responseCommand.Opaque = cmd.Opaque
+ responseCommand.MarkResponseType()
+ header := self.serializerHandler.EncodeHeader(responseCommand)
+ body := responseCommand.Body
+ err := self.sendRequest(header, body, conn, "")
+ if err != nil {
+ glog.Error(err)
+ }
}
+func (self *DefalutRemotingClient) handlerResponse(cmd *RemotingCommand) {
+ response, err := self.getResponse(cmd.Opaque)
+ self.removeResponse(cmd.Opaque)
+ if err != nil {
+ return
+ }
+ response.ResponseCommand = cmd
+ if response.InvokeCallback != nil {
+ response.InvokeCallback(response)
+ }
-func (rc *RemotingClient) String() string {
- return nil // TODO
+ if response.Done != nil {
+ response.Done <- true
+ }
}
-func contains(s []string, o string) bool { // TODO optimize
- for _, v := range s {
- if o == v {
- return true
+func (self *DefalutRemotingClient) ClearExpireResponse() {
+ for seq, responseObj := range self.responseTable.Items() {
+ response := responseObj.(*ResponseFuture)
+ if (response.BeginTimestamp + 30) <= time.Now().Unix() {
+ //30 mins expire
+ self.responseTable.Remove(seq)
+ if response.InvokeCallback != nil {
+ response.InvokeCallback(nil)
+ glog.Warningf("remove time out request %v", response)
+ }
}
}
- return false
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/remoting_command.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/remoting_command.go b/rocketmq-go/remoting/remoting_command.go
index b57f1e9..a8361bd 100644
--- a/rocketmq-go/remoting/remoting_command.go
+++ b/rocketmq-go/remoting/remoting_command.go
@@ -14,168 +14,56 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package remoting
-// TODO: refactor
import (
- "bytes"
- "encoding/binary"
- "encoding/json"
- "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
- "log"
- "os"
- "strconv"
- "sync"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util/structs"
"sync/atomic"
)
-func init() {
- // TODO
-}
-
-const (
- SerializeTypeProperty = "rocketmq.serialize.type"
- SerializeTypeEnv = "ROCKETMQ_SERIALIZE_TYPE"
- RemotingVersionKey = "rocketmq.remoting.version"
- rpcType = 0 // 0, request command
- rpcOneWay = 1 // 0, RPC
-)
-
-type RemotingCommandType int
+var opaque int32
-const (
- ResponseCommand RemotingCommandType = iota
- RqeusetCommand
-)
+var RPC_TYPE int = 0 // 0, REQUEST_COMMAND
+var RPC_ONEWAY int = 1 // 0, RPC
-var configVersion int = -1
-var requestId int32
-var decodeLock sync.Mutex
+//var RESPONSE_TYPE int= 1 << RPC_TYPE
+var RESPONSE_TYPE int = 1
type RemotingCommand struct {
//header
- code int `json:"code"`
- language string `json:"language"`
- version int `json:"version"`
- opaque int32 `json:"opaque"`
- flag int `json:"flag"`
- remark string `json:"remark"`
- extFields map[string]string `json:"extFields"`
- header CustomerHeader // transient
- //body
- body []byte `json:"body,omitempty"`
+ Code int16 `json:"code"`
+ Language string `json:"language"` //int 8
+ Version int16 `json:"version"`
+ Opaque int32 `json:"opaque"`
+ Flag int `json:"flag"`
+ Remark string `json:"remark"`
+ ExtFields map[string]interface{} `json:"extFields"` //java's ExtFields and customHeader is use this key word
+ Body []byte `json:"body,omitempty"`
}
-func NewRemotingCommand(code int, header CustomerHeader) *RemotingCommand {
- cmd := &RemotingCommand{
- code: code,
- header: header,
- }
- setCmdVersion(cmd)
- return cmd
+func NewRemotingCommand(commandCode int16, customerHeader CustomerHeader) *RemotingCommand {
+ return NewRemotingCommandWithBody(commandCode, customerHeader, nil)
}
-func setCmdVersion(cmd *RemotingCommand) {
- if configVersion >= 0 {
- cmd.version = configVersion // safety
- } else if v := os.Getenv(RemotingVersionKey); v != "" {
- value, err := strconv.Atoi(v)
- if err != nil {
- // TODO log
- }
- cmd.version = value
- configVersion = value
+func NewRemotingCommandWithBody(commandCode int16, customerHeader CustomerHeader, body []byte) *RemotingCommand {
+ remotingCommand := new(RemotingCommand)
+ remotingCommand.Code = commandCode
+ currOpaque := atomic.AddInt32(&opaque, 1)
+ remotingCommand.Opaque = currOpaque
+ remotingCommand.Flag = constant.REMOTING_COMMAND_FLAG
+ remotingCommand.Language = constant.REMOTING_COMMAND_LANGUAGE
+ remotingCommand.Version = constant.REMOTING_COMMAND_VERSION
+ if customerHeader != nil {
+ remotingCommand.ExtFields = structs.Map(customerHeader)
}
+ remotingCommand.Body = body
+ return remotingCommand
}
-func (cmd *RemotingCommand) encodeHeader() []byte {
- length := 4
- headerData := cmd.buildHeader()
- length += len(headerData)
-
- if cmd.body != nil {
- length += len(cmd.body)
- }
-
- buf := bytes.NewBuffer([]byte{})
- binary.Write(buf, binary.BigEndian, length)
- binary.Write(buf, binary.BigEndian, len(cmd.body))
- buf.Write(headerData)
-
- return buf.Bytes()
+func (self *RemotingCommand) IsResponseType() bool {
+ return self.Flag&(RESPONSE_TYPE) == RESPONSE_TYPE
}
-
-func (cmd *RemotingCommand) buildHeader() []byte {
- buf, err := json.Marshal(cmd)
- if err != nil {
- return nil
- }
- return buf
-}
-
-func (cmd *RemotingCommand) encode() []byte {
- length := 4
-
- headerData := cmd.buildHeader()
- length += len(headerData)
-
- if cmd.body != nil {
- length += len(cmd.body)
- }
-
- buf := bytes.NewBuffer([]byte{})
- binary.Write(buf, binary.LittleEndian, length)
- binary.Write(buf, binary.LittleEndian, len(cmd.body))
- buf.Write(headerData)
-
- if cmd.body != nil {
- buf.Write(cmd.body)
- }
-
- return buf.Bytes()
-}
-
-func decodeRemoteCommand(header, body []byte) *RemotingCommand {
- decodeLock.Lock()
- defer decodeLock.Unlock()
-
- cmd := &RemotingCommand{}
- cmd.extFields = make(map[string]string)
- err := json.Unmarshal(header, cmd)
- if err != nil {
- log.Print(err)
- return nil
- }
- cmd.body = body
- return cmd
-}
-
-func CreateRemotingCommand(code int, requestHeader *header.SendMessageRequestHeader) *RemotingCommand {
- cmd := &RemotingCommand{}
- cmd.code = code
- cmd.header = requestHeader
- cmd.version = 1
- cmd.opaque = atomic.AddInt32(&requestId, 1) // TODO: safety?
- return cmd
-}
-
-func (cmd *RemotingCommand) SetBody(body []byte) {
- cmd.body = body
-}
-
-func (cmd *RemotingCommand) Type() RemotingCommandType {
- bits := 1 << rpcType
- if (cmd.flag & bits) == bits {
- return ResponseCommand
- }
- return RqeusetCommand
-}
-
-func (cmd *RemotingCommand) MarkOneWayRpc() {
- cmd.flag |= (1 << rpcOneWay)
-}
-
-func (cmd *RemotingCommand) String() string {
- return nil // TODO
+func (self *RemotingCommand) MarkResponseType() {
+ self.Flag = (self.Flag | RESPONSE_TYPE)
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/request_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/request_code.go b/rocketmq-go/remoting/request_code.go
new file mode 100644
index 0000000..52965d5
--- /dev/null
+++ b/rocketmq-go/remoting/request_code.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+const (
+ SEND_MESSAGE = 10
+ PULL_MESSAGE = 11
+ QUERY_MESSAGE = 12
+ QUERY_BROKER_OFFSET = 13
+ QUERY_CONSUMER_OFFSET = 14
+ UPDATE_CONSUMER_OFFSET = 15
+ UPDATE_AND_CREATE_TOPIC = 17
+ GET_ALL_TOPIC_CONFIG = 21
+ GET_TOPIC_CONFIG_LIST = 22
+ GET_TOPIC_NAME_LIST = 23
+ UPDATE_BROKER_CONFIG = 25
+ GET_BROKER_CONFIG = 26
+ TRIGGER_DELETE_FILES = 27
+ GET_BROKER_RUNTIME_INFO = 28
+ SEARCH_OFFSET_BY_TIMESTAMP = 29
+ GET_MAX_OFFSET = 30
+ GET_MIN_OFFSET = 31
+ GET_EARLIEST_MSG_STORETIME = 32
+ VIEW_MESSAGE_BY_ID = 33
+ HEART_BEAT = 34
+ UNREGISTER_CLIENT = 35
+ CONSUMER_SEND_MSG_BACK = 36
+ END_TRANSACTION = 37
+ GET_CONSUMER_LIST_BY_GROUP = 38
+ CHECK_TRANSACTION_STATE = 39
+ NOTIFY_CONSUMER_IDS_CHANGED = 40
+ LOCK_BATCH_MQ = 41
+ UNLOCK_BATCH_MQ = 42
+ GET_ALL_CONSUMER_OFFSET = 43
+ GET_ALL_DELAY_OFFSET = 45
+ PUT_KV_CONFIG = 100
+ GET_KV_CONFIG = 101
+ DELETE_KV_CONFIG = 102
+ REGISTER_BROKER = 103
+ UNREGISTER_BROKER = 104
+ GET_ROUTEINTO_BY_TOPIC = 105
+ GET_BROKER_CLUSTER_INFO = 106
+ UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200
+ GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201
+ GET_TOPIC_STATS_INFO = 202
+ GET_CONSUMER_CONNECTION_LIST = 203
+ GET_PRODUCER_CONNECTION_LIST = 204
+ WIPE_WRITE_PERM_OF_BROKER = 205
+
+ GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206
+ DELETE_SUBSCRIPTIONGROUP = 207
+ GET_CONSUME_STATS = 208
+ SUSPEND_CONSUMER = 209
+ RESUME_CONSUMER = 210
+ RESET_CONSUMER_OFFSET_IN_CONSUMER = 211
+ RESET_CONSUMER_OFFSET_IN_BROKER = 212
+ ADJUST_CONSUMER_THREAD_POOL = 213
+ WHO_CONSUME_THE_MESSAGE = 214
+
+ DELETE_TOPIC_IN_BROKER = 215
+ DELETE_TOPIC_IN_NAMESRV = 216
+ GET_KV_CONFIG_BY_VALUE = 217
+ DELETE_KV_CONFIG_BY_VALUE = 218
+ GET_KVLIST_BY_NAMESPACE = 219
+
+ RESET_CONSUMER_CLIENT_OFFSET = 220
+ GET_CONSUMER_STATUS_FROM_CLIENT = 221
+ INVOKE_BROKER_TO_RESET_OFFSET = 222
+ INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223
+
+ QUERY_TOPIC_CONSUME_BY_WHO = 300
+
+ GET_TOPICS_BY_CLUSTER = 224
+
+ REGISTER_FILTER_SERVER = 301
+ REGISTER_MESSAGE_FILTER_CLASS = 302
+ QUERY_CONSUME_TIME_SPAN = 303
+ GET_SYSTEM_TOPIC_LIST_FROM_NS = 304
+ GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305
+
+ CLEAN_EXPIRED_CONSUMEQUEUE = 306
+
+ GET_CONSUMER_RUNNING_INFO = 307
+
+ QUERY_CORRECTION_OFFSET = 308
+
+ CONSUME_MESSAGE_DIRECTLY = 309
+
+ SEND_MESSAGE_V2 = 310
+
+ GET_UNIT_TOPIC_LIST = 311
+ GET_HAS_UNIT_SUB_TOPIC_LIST = 312
+ GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313
+ CLONE_GROUP_OFFSET = 314
+
+ VIEW_BROKER_STATS_DATA = 315
+)
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/request_processor.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/request_processor.go b/rocketmq-go/remoting/request_processor.go
new file mode 100644
index 0000000..eec8cd8
--- /dev/null
+++ b/rocketmq-go/remoting/request_processor.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+type ClientRequestProcessor func(remotingCommand *RemotingCommand) (responseCommand *RemotingCommand)
+
+//CHECK_TRANSACTION_STATE
+//NOTIFY_CONSUMER_IDS_CHANGED
+//RESET_CONSUMER_CLIENT_OFFSET
+//GET_CONSUMER_STATUS_FROM_CLIENT
+//GET_CONSUMER_RUNNING_INFO
+//CONSUME_MESSAGE_DIRECTLY
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/response_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/response_code.go b/rocketmq-go/remoting/response_code.go
new file mode 100644
index 0000000..6a49c77
--- /dev/null
+++ b/rocketmq-go/remoting/response_code.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+const (
+ SUCCESS = 0
+ SYSTEM_ERROR = 1
+ SYSTEM_BUSY = 2
+ REQUEST_CODE_NOT_SUPPORTED = 3
+ TRANSACTION_FAILED = 4
+ FLUSH_DISK_TIMEOUT = 10
+ SLAVE_NOT_AVAILABLE = 11
+ FLUSH_SLAVE_TIMEOUT = 12
+ MESSAGE_ILLEGAL = 13
+ SERVICE_NOT_AVAILABLE = 14
+ VERSION_NOT_SUPPORTED = 15
+ NO_PERMISSION = 16
+ TOPIC_NOT_EXIST = 17
+ TOPIC_EXIST_ALREADY = 18
+ PULL_NOT_FOUND = 19
+ PULL_RETRY_IMMEDIATELY = 20
+ PULL_OFFSET_MOVED = 21
+ QUERY_NOT_FOUND = 22
+ SUBSCRIPTION_PARSE_FAILED = 23
+ SUBSCRIPTION_NOT_EXIST = 24
+ SUBSCRIPTION_NOT_LATEST = 25
+ SUBSCRIPTION_GROUP_NOT_EXIST = 26
+ TRANSACTION_SHOULD_COMMIT = 200
+ TRANSACTION_SHOULD_ROLLBACK = 201
+ TRANSACTION_STATE_UNKNOW = 202
+ TRANSACTION_STATE_GROUP_WRONG = 203
+ NO_BUYER_ID = 204
+
+ NOT_IN_CURRENT_UNIT = 205
+
+ CONSUMER_NOT_ONLINE = 206
+
+ CONSUME_MSG_TIMEOUT = 207
+)
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/response_future.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/response_future.go b/rocketmq-go/remoting/response_future.go
index a8c2aec..1bece6d 100644
--- a/rocketmq-go/remoting/response_future.go
+++ b/rocketmq-go/remoting/response_future.go
@@ -16,57 +16,14 @@
*/
package remoting
-import (
- "sync"
- "time"
-)
-
-type InvokeCallback func(responseFuture *ResponseFuture)
-
type ResponseFuture struct {
- opaque int32
- timeoutMillis time.Duration
- invokeCallback InvokeCallback
- beginTimestamp int64
- responseCommand *RemotingCommand
- sendRequestOK bool
- done chan bool
- latch sync.WaitGroup
- err error
-}
-
-func NewResponseFuture(opaque int32, timeout time.Duration, callback InvokeCallback) *ResponseFuture {
- future := &ResponseFuture{
- opaque: opaque,
- timeoutMillis: timeout,
- invokeCallback: callback,
- latch: sync.WaitGroup{},
- }
- future.latch.Add(1)
- return future
-}
-func (future *ResponseFuture) SetResponseFuture(cmd *RemotingCommand) {
- future.responseCommand = cmd
-}
-
-func (future *ResponseFuture) Done() {
- future.latch.Done()
- future.done <- true
-}
-
-func (future *ResponseFuture) executeInvokeCallback() {
- future.invokeCallback(nil) // TODO
-}
-
-func (future *ResponseFuture) WaitResponse(timeout time.Duration) *RemotingCommand {
- go func() { // TODO optimize
- time.Sleep(timeout)
- future.latch.Add(-1) // TODO whats happened when counter less than 0
- }()
- future.latch.Wait()
- return future.responseCommand
-}
-
-func (future *ResponseFuture) String() string {
- return nil
+ ResponseCommand *RemotingCommand
+ SendRequestOK bool
+ Rrr error
+ Opaque int32
+ TimeoutMillis int64
+ InvokeCallback InvokeCallback
+ BeginTimestamp int64
+ Done chan bool
}
+type InvokeCallback func(responseFuture *ResponseFuture)
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/rocketmq_serializable.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/rocketmq_serializable.go b/rocketmq-go/remoting/rocketmq_serializable.go
new file mode 100644
index 0000000..22dc41f
--- /dev/null
+++ b/rocketmq-go/remoting/rocketmq_serializable.go
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+)
+
+type RocketMqSerializer struct {
+}
+
+func (self *RocketMqSerializer) EncodeHeaderData(cmd *RemotingCommand) []byte {
+ var (
+ remarkBytes []byte
+ remarkBytesLen int
+ extFieldsBytes []byte
+ extFieldsBytesLen int
+ )
+ remarkBytesLen = 0
+ if len(cmd.Remark) > 0 {
+ remarkBytes = []byte(cmd.Remark)
+ remarkBytesLen = len(remarkBytes)
+ }
+ if cmd.ExtFields != nil {
+ extFieldsBytes = rocketMqCustomHeaderSerialize(cmd.ExtFields)
+ extFieldsBytesLen = len(extFieldsBytes)
+ }
+ buf := bytes.NewBuffer([]byte{})
+ binary.Write(buf, binary.BigEndian, int16(cmd.Code)) //code(~32767) 2
+ binary.Write(buf, binary.BigEndian, int8(0)) //JAVA
+ binary.Write(buf, binary.BigEndian, int16(cmd.Version)) //2
+ binary.Write(buf, binary.BigEndian, int32(cmd.Opaque)) //opaque 4
+ binary.Write(buf, binary.BigEndian, int32(cmd.Flag)) //4
+ binary.Write(buf, binary.BigEndian, int32(remarkBytesLen)) //4
+ if remarkBytesLen > 0 {
+ buf.Write(remarkBytes)
+ }
+ binary.Write(buf, binary.BigEndian, int32(extFieldsBytesLen)) //4
+ if extFieldsBytesLen > 0 {
+ buf.Write(extFieldsBytes)
+ }
+ fmt.Println(buf.Bytes())
+ return buf.Bytes()
+}
+
+func (self *RocketMqSerializer) DecodeRemoteCommand(headerArray, body []byte) (cmd *RemotingCommand) {
+ cmd = &RemotingCommand{}
+ buf := bytes.NewBuffer(headerArray)
+ // int code(~32767)
+ binary.Read(buf, binary.BigEndian, &cmd.Code)
+ // LanguageCode language
+ var LanguageCodeNope byte
+ binary.Read(buf, binary.BigEndian, &LanguageCodeNope)
+ cmd.Language = constant.REMOTING_COMMAND_LANGUAGE //todo use code from remote
+ // int version(~32767)
+ binary.Read(buf, binary.BigEndian, &cmd.Version)
+ // int opaque
+ binary.Read(buf, binary.BigEndian, &cmd.Opaque)
+ // int flag
+ binary.Read(buf, binary.BigEndian, &cmd.Flag)
+ // String remark
+ var remarkLen, extFieldsLen int32
+ binary.Read(buf, binary.BigEndian, &remarkLen)
+ if remarkLen > 0 {
+ var remarkData = make([]byte, remarkLen)
+ binary.Read(buf, binary.BigEndian, &remarkData)
+ cmd.Remark = string(remarkData)
+ }
+ //map ext
+ // HashMap<String, String> extFields
+ binary.Read(buf, binary.BigEndian, &extFieldsLen)
+ if extFieldsLen > 0 {
+ var extFieldsData = make([]byte, extFieldsLen)
+ binary.Read(buf, binary.BigEndian, &extFieldsData)
+ extFiledMap := customHeaderDeserialize(extFieldsData)
+ cmd.ExtFields = extFiledMap
+ }
+ cmd.Body = body
+ return
+}
+
+func rocketMqCustomHeaderSerialize(extFiled map[string]interface{}) (byteData []byte) {
+ buf := bytes.NewBuffer([]byte{})
+ for key, value := range extFiled {
+ keyBytes := []byte(fmt.Sprintf("%v", key))
+ valueBytes := []byte(fmt.Sprintf("%v", value))
+ binary.Write(buf, binary.BigEndian, int16(len(keyBytes)))
+ buf.Write(keyBytes)
+ binary.Write(buf, binary.BigEndian, int32(len(valueBytes)))
+ buf.Write(valueBytes)
+ }
+ byteData = buf.Bytes()
+ return
+}
+
+func customHeaderDeserialize(extFiledDataBytes []byte) (extFiledMap map[string]interface{}) {
+ extFiledMap = make(map[string]interface{})
+ buf := bytes.NewBuffer(extFiledDataBytes)
+ for buf.Len() > 0 {
+ var key = getItemFormExtFiledDataBytes(buf, "key")
+ var value = getItemFormExtFiledDataBytes(buf, "value")
+ extFiledMap[key] = value
+ }
+ return
+}
+func getItemFormExtFiledDataBytes(buff *bytes.Buffer, itemType string) (item string) {
+ if itemType == "key" {
+ var len int16
+ binary.Read(buff, binary.BigEndian, &len)
+ var data = make([]byte, len)
+ binary.Read(buff, binary.BigEndian, &data)
+ item = string(data)
+ }
+ if itemType == "value" {
+ var len int32
+ binary.Read(buff, binary.BigEndian, &len)
+ var data = make([]byte, len)
+ binary.Read(buff, binary.BigEndian, &data)
+ item = string(data)
+ }
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/serializable.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/serializable.go b/rocketmq-go/remoting/serializable.go
new file mode 100644
index 0000000..24420cd
--- /dev/null
+++ b/rocketmq-go/remoting/serializable.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+ "bytes"
+ "encoding/binary"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/golang/glog"
+)
+
+type SerializerHandler struct {
+ serializer Serializer //which serializer this client use, depend on constant.USE_HEADER_SERIALIZETYPE
+}
+
+type Serializer interface {
+ EncodeHeaderData(request *RemotingCommand) []byte
+ DecodeRemoteCommand(header, body []byte) *RemotingCommand
+}
+
+var JSON_SERIALIZER = &JsonSerializer{}
+var ROCKETMQ_SERIALIZER = &RocketMqSerializer{}
+
+func NewSerializerHandler() SerializerHandler {
+ serializerHandler := SerializerHandler{}
+ switch constant.USE_HEADER_SERIALIZETYPE {
+ case constant.JSON_SERIALIZE:
+ serializerHandler.serializer = JSON_SERIALIZER
+ break
+
+ case constant.ROCKETMQ_SERIALIZE:
+ serializerHandler.serializer = ROCKETMQ_SERIALIZER
+ break
+ default:
+ panic("illeage serializer type")
+ }
+ return serializerHandler
+}
+func (self *SerializerHandler) EncodeHeader(request *RemotingCommand) []byte {
+ length := 4
+ headerData := self.serializer.EncodeHeaderData(request)
+ length += len(headerData)
+ if request.Body != nil {
+ length += len(request.Body)
+ }
+ buf := bytes.NewBuffer([]byte{})
+ binary.Write(buf, binary.BigEndian, int32(length)) // len
+ binary.Write(buf, binary.BigEndian, int32(len(headerData)|(int(constant.USE_HEADER_SERIALIZETYPE)<<24))) // header len
+ buf.Write(headerData)
+ return buf.Bytes()
+}
+
+func (self *SerializerHandler) DecodeRemoteCommand(headerSerializableType byte, header, body []byte) *RemotingCommand {
+ var serializer Serializer
+ switch headerSerializableType {
+ case constant.JSON_SERIALIZE:
+ serializer = JSON_SERIALIZER
+ break
+ case constant.ROCKETMQ_SERIALIZE:
+ serializer = ROCKETMQ_SERIALIZER
+ break
+ default:
+ glog.Error("Unknow headerSerializableType", headerSerializableType)
+ }
+ return serializer.DecodeRemoteCommand(header, body)
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/client_api.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/client_api.go b/rocketmq-go/service/client_api.go
index ff750ba..6901f4c 100644
--- a/rocketmq-go/service/client_api.go
+++ b/rocketmq-go/service/client_api.go
@@ -20,7 +20,6 @@ package service
import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
- "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
)
@@ -45,25 +44,25 @@ type MQClientAPI struct {
config *config.ClientConfig
}
-func NewMQClientAPI(cfg *config.ClientConfig, processor *ClientRemotingProcessor, hook remoting.RPCHook) *MQClientAPI {
- api := &MQClientAPI{
- remotingClient: &remoting.RemotingClient{}, //TODO
- topAddress: &TopAddress{}, // TODO
- crp: processor,
- config: cfg,
- }
-
- // TODO register
- return api
-}
-
-func (api *MQClientAPI) SendMessage(addr, brokerName string,
- msg message.Message, requestHeader header.SendMessageRequestHeader, timeout int64) *model.SendResult {
- var request *remoting.RemotingCommand
- request = remoting.CreateRemotingCommand(model.SendMsg, &requestHeader)
- request.SetBody(msg.Body)
- return api.sendMessageSync(addr, brokerName, msg, timeout, request)
-}
+//func NewMQClientAPI(cfg *config.ClientConfig, processor *ClientRemotingProcessor, hook remoting.RPCHook) *MQClientAPI {
+// api := &MQClientAPI{
+// remotingClient: &remoting.RemotingClient{}, //TODO
+// topAddress: &TopAddress{}, // TODO
+// crp: processor,
+// config: cfg,
+// }
+//
+// // TODO register
+// return api
+//}
+//
+//func (api *MQClientAPI) SendMessage(addr, brokerName string,
+// msg message.Message, requestHeader header.SendMessageRequestHeader, timeout int64) *model.SendResult {
+// var request *remoting.RemotingCommand
+// request = remoting.CreateRemotingCommand(model.SendMsg, &requestHeader)
+// request.SetBody(msg.Body)
+// return api.sendMessageSync(addr, brokerName, msg, timeout, request)
+//}
func (api *MQClientAPI) sendMessageSync(addr, brokerName string,
msg message.Message,
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/consume_message_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/consume_message_service.go b/rocketmq-go/service/consume_message_service.go
new file mode 100644
index 0000000..09be61c
--- /dev/null
+++ b/rocketmq-go/service/consume_message_service.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package service
+
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/golang/glog"
+ "time"
+)
+
+type ConsumeMessageService interface {
+ //ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
+
+ Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig)
+ SubmitConsumeRequest(msgs []model.MessageExt, processQueue *model.ProcessQueue,
+ messageQueue *model.MessageQueue, dispathToConsume bool)
+ SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error)
+ ConsumeMessageDirectly(messageExt *model.MessageExt, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error)
+}
+
+type ConsumeMessageConcurrentlyServiceImpl struct {
+ consumerGroup string
+ messageListener model.MessageListener
+ //sendMessageBackProducerService SendMessageBackProducerService //for send retry Message
+ offsetStore OffsetStore
+ consumerConfig *config.RocketMqConsumerConfig
+}
+
+func NewConsumeMessageConcurrentlyServiceImpl(messageListener model.MessageListener) (consumeService ConsumeMessageService) {
+ //consumeService = &ConsumeMessageConcurrentlyServiceImpl{messageListener:messageListener, sendMessageBackProducerService:&SendMessageBackProducerServiceImpl{}}
+ return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) {
+ self.consumerGroup = consumerGroup
+ self.offsetStore = offsetStore
+ //self.sendMessageBackProducerService.InitSendMessageBackProducerService(consumerGroup, mqClient,defaultProducerService,consumerConfig)
+ self.consumerConfig = consumerConfig
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) SubmitConsumeRequest(msgs []model.MessageExt, processQueue *model.ProcessQueue, messageQueue *model.MessageQueue, dispathToConsume bool) {
+ msgsLen := len(msgs)
+ for i := 0; i < msgsLen; {
+ begin := i
+ end := i + self.consumerConfig.ConsumeMessageBatchMaxSize
+ if end > msgsLen {
+ end = msgsLen
+ }
+ go func() {
+ glog.V(2).Infof("look slice begin %d end %d msgsLen %d", begin, end, msgsLen)
+ batchMsgs := transformMessageToConsume(self.consumerGroup, msgs[begin:end])
+ consumeState := self.messageListener(batchMsgs)
+ self.processConsumeResult(consumeState, batchMsgs, messageQueue, processQueue)
+ }()
+ i = end
+ }
+ return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error) {
+ //err = self.sendMessageBackProducerService.SendMessageBack(messageExt, 0, brokerName)
+ return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) ConsumeMessageDirectly(messageExt *model.MessageExt, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error) {
+ start := time.Now().UnixNano() / 1000000
+ consumeResult := self.messageListener([]model.MessageExt{*messageExt})
+ consumeMessageDirectlyResult.AutoCommit = true
+ consumeMessageDirectlyResult.Order = false
+ consumeMessageDirectlyResult.SpentTimeMills = time.Now().UnixNano()/1000000 - start
+ if consumeResult.ConsumeConcurrentlyStatus == "CONSUME_SUCCESS" && consumeResult.AckIndex >= 0 {
+ consumeMessageDirectlyResult.ConsumeResult = "CR_SUCCESS"
+
+ } else {
+ consumeMessageDirectlyResult.ConsumeResult = "CR_THROW_EXCEPTION"
+ }
+ return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) processConsumeResult(result model.ConsumeConcurrentlyResult, msgs []model.MessageExt, messageQueue *model.MessageQueue, processQueue *model.ProcessQueue) {
+ if processQueue.IsDropped() {
+ glog.Warning("processQueue is dropped without process consume result. ", msgs)
+ return
+ }
+ if len(msgs) == 0 {
+ return
+ }
+ ackIndex := result.AckIndex
+ if model.CONSUME_SUCCESS == result.ConsumeConcurrentlyStatus {
+ if ackIndex >= len(msgs) {
+ ackIndex = len(msgs) - 1
+ } else {
+ if result.AckIndex < 0 {
+ ackIndex = -1
+ }
+ }
+ }
+ var failedMessages []model.MessageExt
+ successMessages := []model.MessageExt{}
+ if ackIndex >= 0 {
+ successMessages = msgs[:ackIndex+1]
+ }
+ for i := ackIndex + 1; i < len(msgs); i++ {
+ err := self.SendMessageBack(&msgs[i], 0, messageQueue.BrokerName)
+ if err != nil {
+ msgs[i].ReconsumeTimes = msgs[i].ReconsumeTimes + 1
+ failedMessages = append(failedMessages, msgs[i])
+ } else {
+ successMessages = append(successMessages, msgs[i])
+ }
+ }
+ if len(failedMessages) > 0 {
+ self.SubmitConsumeRequest(failedMessages, processQueue, messageQueue, true)
+ }
+ //commitOffset := processQueue.RemoveMessage(successMessages)
+ //if (commitOffset > 0 && ! processQueue.IsDropped()) {
+ // self.offsetStore.UpdateOffset(messageQueue, commitOffset, true)
+ //}
+
+}
+
+func transformMessageToConsume(consumerGroup string, msgs []model.MessageExt) []model.MessageExt {
+ retryTopicName := constant.RETRY_GROUP_TOPIC_PREFIX + consumerGroup
+
+ for _, msg := range msgs {
+ //reset retry topic name
+ if msg.Message.Topic == retryTopicName {
+ retryTopic := msg.Properties[constant.PROPERTY_RETRY_TOPIC]
+ if len(retryTopic) > 0 {
+ msg.Message.Topic = retryTopic
+ }
+ }
+ //set consume start time
+ msg.SetConsumeStartTime()
+ }
+ return msgs
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/consume_messsage_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/consume_messsage_service.go b/rocketmq-go/service/consume_messsage_service.go
deleted file mode 100644
index d3b28fc..0000000
--- a/rocketmq-go/service/consume_messsage_service.go
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package service
-
-type ConsumeMessageService struct {
-}