You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/07/07 01:56:42 UTC
[2/2] incubator-rocketmq-externals git commit: [ROCKETMQ-198]
Go-Client's incomplete implement.
[ROCKETMQ-198] Go-Client's incomplete implement.
Author: tangjie <st...@gmail.com>
Closes #22 from StyleTang/go-client-all.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/28b98b09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/28b98b09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/28b98b09
Branch: refs/heads/master
Commit: 28b98b096f7104f08658a116525b0812b9a14367
Parents: c98a770
Author: tangjie <st...@gmail.com>
Authored: Fri Jul 7 09:56:15 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Fri Jul 7 09:56:15 2017 +0800
----------------------------------------------------------------------
rocketmq-go/clean_expire_msg_controller.go | 49 +++
rocketmq-go/docs/roadmap.md | 105 +++--
rocketmq-go/example/consumer_example.go | 50 ++-
.../example/producer_consumer_example.go | 77 ++++
rocketmq-go/example/producer_example.go | 40 ++
rocketmq-go/model/config/consumer_config.go | 58 ++-
rocketmq-go/model/config/producer_config.go | 46 ++
rocketmq-go/model/config/rocketmq_config.go | 20 -
rocketmq-go/model/constant/config.go | 2 +-
rocketmq-go/model/constant/mix_all.go | 3 -
.../model/header/pull_message_request_header.go | 12 +-
.../model/header/send_message_request_header.go | 29 +-
rocketmq-go/model/message/message_queue.go | 84 ----
rocketmq-go/model/process_queue.go | 421 -------------------
rocketmq-go/model/process_queue_info.go | 8 -
rocketmq-go/model/response_code.go | 4 +-
rocketmq-go/model/send_result.go | 13 +-
rocketmq-go/model/topic_publishInfo.go | 76 ----
rocketmq-go/model/topic_publish_info.go | 6 -
rocketmq-go/model/topic_route_data.go | 105 -----
rocketmq-go/mq_client_manage.go | 261 ++++++++++++
rocketmq-go/mq_client_manager.go | 90 ----
rocketmq-go/mq_consumer.go | 74 ----
rocketmq-go/mq_producer.go | 53 ++-
rocketmq-go/mq_push_consumer.go | 153 +++++++
rocketmq-go/pull_message_controller.go | 329 +++++++++++++++
rocketmq-go/rebalance_controller.go | 33 ++
.../allocate_message_averagely.go | 80 ++++
.../allocate_message_averagely_by_circle.go | 79 ++++
.../allocate_message_by_config.go | 27 ++
.../allocate_message_by_machine_room.go | 80 ++++
.../allocate_message_queue_strategy.go | 27 ++
rocketmq-go/service/consume_message_service.go | 24 +-
rocketmq-go/service/mq_client.go | 12 +-
rocketmq-go/service/mq_fault_strategy.go | 49 +++
rocketmq-go/service/offset_store.go | 163 +++++++
rocketmq-go/service/offset_store_service.go | 21 -
rocketmq-go/service/producer_service.go | 222 +++++++++-
.../service/producer_service_for_send_back.go | 115 +++++
rocketmq-go/service/rebalance.go | 307 ++++++++++++++
rocketmq-go/service/rebalance_service.go | 25 --
rocketmq-go/tasks.go | 68 +++
rocketmq-go/util/compress_util.go | 63 +++
rocketmq-go/util/concurrent_map.go | 16 +
rocketmq-go/util/message_client_id_generator.go | 4 +-
rocketmq-go/util/message_properties.go | 47 +++
rocketmq-go/util/regex_util.go | 33 ++
47 files changed, 2572 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/clean_expire_msg_controller.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/clean_expire_msg_controller.go b/rocketmq-go/clean_expire_msg_controller.go
new file mode 100644
index 0000000..73bb35b
--- /dev/null
+++ b/rocketmq-go/clean_expire_msg_controller.go
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+ "time"
+)
+
+type CleanExpireMsgController struct {
+ mqClient service.RocketMqClient
+ clientFactory *ClientFactory
+}
+
+func NewCleanExpireMsgController(mqClient service.RocketMqClient, clientFactory *ClientFactory) *CleanExpireMsgController {
+ return &CleanExpireMsgController{
+ mqClient: mqClient,
+ clientFactory: clientFactory,
+ }
+}
+
+func (self *CleanExpireMsgController) Start() {
+ for _, consumer := range self.clientFactory.ConsumerTable {
+ go func() {
+ cleanExpireMsgTimer := time.NewTimer(time.Duration(consumer.ConsumerConfig.ConsumeTimeout) * 1000 * 60 * time.Millisecond)
+ //cleanExpireMsgTimer := time.NewTimer(time.Duration(consumer.ConsumerConfig.ConsumeTimeout) * time.Millisecond)
+ for {
+ <-cleanExpireMsgTimer.C
+ consumer.CleanExpireMsg()
+ cleanExpireMsgTimer.Reset(time.Duration(consumer.ConsumerConfig.ConsumeTimeout) * 1000 * 60 * time.Millisecond)
+ //cleanExpireMsgTimer.Reset(time.Duration(consumer.ConsumerConfig.ConsumeTimeout) * time.Millisecond)
+ }
+ }()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/docs/roadmap.md
----------------------------------------------------------------------
diff --git a/rocketmq-go/docs/roadmap.md b/rocketmq-go/docs/roadmap.md
index 0db9033..a69c561 100644
--- a/rocketmq-go/docs/roadmap.md
+++ b/rocketmq-go/docs/roadmap.md
@@ -1,45 +1,64 @@
# RoadMap-Milestone1
## Consumer
-- [ ] ConsumerType
- - [ ] PushConsumer
-- [ ] MessageListener
- - [ ] Concurrently
-- [ ] MessageModel
- - [ ] CLUSTERING
-- [ ] OffsetStore
- - [ ] RemoteBrokerOffsetStore
-- [ ] RebalanceService
-- [ ] PullMessageService
-- [ ] ConsumeMessageService
-- [ ] AllocateMessageQueueStrategy
- - [ ] AllocateMessageQueueAveragely
-- [ ] Other
- - [ ] Config
- - [ ] ZIP
- - [ ] ConsumeFromWhere
- - [ ] CONSUME_FROM_LAST_OFFSET
- - [ ] CONSUME_FROM_FIRST_OFFSET
- - [ ] CONSUME_FROM_TIMESTAMP
- - [ ] Retry(sendMessageBack)
- - [ ] TimeOut(clearExpiredMessage)
- - [ ] ACK(partSuccess)
- - [ ] FlowControl(messageCanNotConsume)
+- [x] ConsumerType
+ - [x] PushConsumer
+- [x] MessageListener
+ - [x] Concurrently
+- [x] MessageModel
+ - [x] CLUSTERING
+- [x] OffsetStore
+ - [x] RemoteBrokerOffsetStore
+- [x] RebalanceService
+- [x] PullMessageService
+- [x] ConsumeMessageService
+- [x] AllocateMessageQueueStrategy
+ - [x] AllocateMessageQueueAveragely
+- [x] Other
+ - [x] Config
+ - [x] ZIP
+ - [x] ConsumeFromWhere
+ - [x] CONSUME_FROM_LAST_OFFSET
+ - [x] CONSUME_FROM_FIRST_OFFSET
+ - [x] CONSUME_FROM_TIMESTAMP
+ - [x] Retry(sendMessageBack)
+ - [x] TimeOut(clearExpiredMessage)
+ - [x] ACK(partSuccess)
+ - [x] FlowControl(messageCanNotConsume)
+
+## Producer
+- [x] ProducerType
+ - [x] DefaultProducer
+- [x] API
+ - [x] Send
+ - [x] Sync
+- [x] Other
+ - [x] DelayMessage
+ - [x] Config
+ - [x] MessageId Generate
+ - [x] CompressMsg
+ - [x] TimeOut
+ - [x] LoadBalance
+ - [x] DefaultTopic
+ - [x] VipChannel
+ - [x] MQFaultStrategy
+
## Manager
-- [ ] Controller
- - [ ] PullMessageController
-- [ ] Task
- - [ ] Heartbeat
- - [ ] UpdateTopicRouteInfoFromNameServer
- - [ ] PersistAllConsumerOffset
- - [ ] ClearExpiredMessage(form consumer consumeMessageService)
-- [ ] ClientRemotingProcessor
- - [ ] CHECK_TRANSACTION_STATE
- - [ ] NOTIFY_CONSUMER_IDS_CHANGED
- - [ ] RESET_CONSUMER_CLIENT_OFFSET
- - [ ] GET_CONSUMER_STATUS_FROM_CLIENT
- - [ ] GET_CONSUMER_RUNNING_INFO
- - [ ] CONSUME_MESSAGE_DIRECTLY
+- [x] Controller
+ - [x] PullMessageController
+- [x] Task
+ - [x] UpdateTopicRouteInfo
+ - [x] Heartbeat
+ - [x] Rebalance
+ - [x] PullMessage
+ - [x] CleanExpireMsg
+- [x] ClientRemotingProcessor
+ - [x] CHECK_TRANSACTION_STATE
+ - [x] NOTIFY_CONSUMER_IDS_CHANGED
+ - [x] RESET_CONSUMER_CLIENT_OFFSET
+ - [x] GET_CONSUMER_STATUS_FROM_CLIENT
+ - [x] GET_CONSUMER_RUNNING_INFO
+ - [x] CONSUME_MESSAGE_DIRECTLY
## Remoting
- [x] MqClientRequest
@@ -122,13 +141,11 @@
- [ ] RebalanceController
- [ ] PullMessageController
- [ ] Task
- - [ ] PollNameServer
+ - [ ] UpdateTopicRouteInfo
- [ ] Heartbeat
- - [ ] UpdateTopicRouteInfoFromNameServer
- - [ ] CleanOfflineBroker
- - [ ] PersistAllConsumerOffset
- - [ ] ClearExpiredMessage(form consumer consumeMessageService)
- - [ ] UploadFilterClassSource(FromHeartBeat/But Golang Not Easy To do this(Java Source))
+ - [ ] Rebalance
+ - [ ] PullMessage
+ - [ ] CleanExpireMsg
- [ ] ClientRemotingProcessor
- [ ] CHECK_TRANSACTION_STATE
- [ ] NOTIFY_CONSUMER_IDS_CHANGED
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/example/consumer_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/consumer_example.go b/rocketmq-go/example/consumer_example.go
index af74c01..7c94e58 100644
--- a/rocketmq-go/example/consumer_example.go
+++ b/rocketmq-go/example/consumer_example.go
@@ -17,39 +17,37 @@
package main
import (
- "errors"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
"github.com/golang/glog"
+ "time"
)
func main() {
- // create a mqClientManager instance
- var mqClientConfig = &rocketmq.MqClientConfig{}
- var mqClientManager = rocketmq.NewMqClientManager(mqClientConfig)
-
- // create rocketMq consumer
- var consumerConfig = &rocketmq.MqConsumerConfig{}
- var consumer1 = rocketmq.NewDefaultMQPushConsumer("testGroup", consumerConfig)
- consumer1.Subscribe("testTopic", "*")
- consumer1.RegisterMessageListener(func(msgs []model.MessageExt) model.ConsumeConcurrentlyResult {
- var index = -1
- for i, msg := range msgs {
- // your code here,for example,print msg
- glog.Info(msg)
- var err = errors.New("error")
- if err != nil {
- break
- }
- index = i
+ var (
+ testTopic = "GoLang"
+ )
+ var comsumer1 = rocketmq.NewDefaultMQPushConsumer(testTopic + "-StyleTang")
+ comsumer1.ConsumerConfig.PullInterval = 0
+ comsumer1.ConsumerConfig.ConsumeTimeout = 1
+ comsumer1.ConsumerConfig.ConsumeMessageBatchMaxSize = 16
+ comsumer1.ConsumerConfig.ConsumeFromWhere = "CONSUME_FROM_TIMESTAMP"
+ comsumer1.ConsumerConfig.ConsumeTimestamp = time.Now()
+ comsumer1.Subscribe(testTopic, "*")
+ comsumer1.RegisterMessageListener(func(msgs []model.MessageExt) model.ConsumeConcurrentlyResult {
+ for _, msg := range msgs {
+ glog.Info(msg.BornTimestamp)
}
- return model.ConsumeConcurrentlyResult{ConsumeConcurrentlyStatus: model.CONSUME_SUCCESS, AckIndex: index}
+ glog.Info("look message len ", len(msgs))
+ return model.ConsumeConcurrentlyResult{ConsumeConcurrentlyStatus: model.CONSUME_SUCCESS, AckIndex: len(msgs)}
})
-
- //register consumer to mqClientManager
- mqClientManager.RegisterConsumer(consumer1)
-
- //start it
- mqClientManager.Start()
+ var clienConfig = &config.ClientConfig{}
+ clienConfig.SetNameServerAddress("120.55.113.35:9876")
+ rocketMqManager := rocketmq.MqClientManagerInit(clienConfig)
+ rocketMqManager.RegistConsumer(comsumer1)
+ rocketMqManager.Start()
+ select {}
+ rocketMqManager.ShutDown()
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/example/producer_consumer_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/producer_consumer_example.go b/rocketmq-go/example/producer_consumer_example.go
new file mode 100644
index 0000000..0d8e455
--- /dev/null
+++ b/rocketmq-go/example/producer_consumer_example.go
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go" //todo todo I want only import this
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+ "github.com/golang/glog"
+ "net/http"
+ _ "net/http/pprof"
+ "time"
+)
+
+func main() {
+ go func() {
+ http.ListenAndServe("localhost:6060", nil)
+ }()
+ var (
+ testTopic = "GoLang"
+ )
+ var producer1 = rocketmq.NewDefaultMQProducer("Test1")
+ producer1.ProducerConfig.CompressMsgBodyOverHowMuch = 1
+ var producer2 = rocketmq.NewDefaultMQProducer("Test2")
+ var comsumer1 = rocketmq.NewDefaultMQPushConsumer(testTopic + "-StyleTang")
+ comsumer1.ConsumerConfig.PullInterval = 0
+ comsumer1.ConsumerConfig.ConsumeTimeout = 1
+ comsumer1.ConsumerConfig.ConsumeMessageBatchMaxSize = 16
+ comsumer1.ConsumerConfig.ConsumeFromWhere = "CONSUME_FROM_TIMESTAMP"
+ comsumer1.ConsumerConfig.ConsumeTimestamp = time.Now()
+ comsumer1.Subscribe(testTopic, "*")
+ comsumer1.RegisterMessageListener(func(msgs []model.MessageExt) model.ConsumeConcurrentlyResult {
+ for _, msg := range msgs {
+ glog.Info(msg.BornTimestamp)
+ }
+ glog.Info("look message len ", len(msgs))
+ return model.ConsumeConcurrentlyResult{ConsumeConcurrentlyStatus: model.CONSUME_SUCCESS, AckIndex: len(msgs)}
+ })
+ var clienConfig = &config.ClientConfig{}
+ clienConfig.SetNameServerAddress("120.55.113.35:9876")
+ rocketMqManager := rocketmq.MqClientManagerInit(clienConfig)
+ rocketMqManager.RegistProducer(producer1)
+ rocketMqManager.RegistProducer(producer2)
+ rocketMqManager.RegistConsumer(comsumer1)
+ rocketMqManager.Start()
+ for i := 0; i < 10000000; i++ {
+ var message = &model.Message{}
+ message.Topic = testTopic
+ message.SetKeys([]string{"xxx"})
+ message.SetTag("1122")
+ message.Body = []byte("hellAXXWord" + util.IntToString(i))
+
+ xx, ee := producer1.Send(message)
+ if ee != nil {
+ glog.Error(ee)
+ continue
+ }
+ glog.V(0).Infof("sendMessageResutl messageId[%s] err[%s]", xx.MsgID(), ee)
+ }
+ select {}
+ rocketMqManager.ShutDown()
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/example/producer_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/producer_example.go b/rocketmq-go/example/producer_example.go
index bda2941..acc2011 100644
--- a/rocketmq-go/example/producer_example.go
+++ b/rocketmq-go/example/producer_example.go
@@ -15,3 +15,43 @@
* limitations under the License.
*/
package main
+
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+ "github.com/golang/glog"
+ _ "net/http/pprof"
+)
+
+func main() {
+ var (
+ testTopic = "GoLang"
+ )
+ var producer1 = rocketmq.NewDefaultMQProducer("Test1")
+ producer1.ProducerConfig.CompressMsgBodyOverHowMuch = 1
+ var producer2 = rocketmq.NewDefaultMQProducer("Test2")
+ var clienConfig = &config.ClientConfig{}
+ clienConfig.SetNameServerAddress("120.55.113.35:9876")
+ rocketMqManager := rocketmq.MqClientManagerInit(clienConfig)
+ rocketMqManager.RegistProducer(producer1)
+ rocketMqManager.RegistProducer(producer2)
+ rocketMqManager.Start()
+ for i := 0; i < 1000; i++ {
+ var message = &model.Message{}
+ message.Topic = testTopic
+ message.SetKeys([]string{"xxx"})
+ message.SetTag("1122")
+ message.Body = []byte("hellAXXWord" + util.IntToString(i))
+
+ xx, ee := producer1.Send(message)
+ if ee != nil {
+ glog.Error(ee)
+ continue
+ }
+ glog.V(0).Infof("sendMessageResutl messageId[%s] err[%s]", xx.MsgID(), ee)
+ }
+ select {}
+ rocketMqManager.ShutDown()
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/config/consumer_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/consumer_config.go b/rocketmq-go/model/config/consumer_config.go
index 25f7585..b6a6f32 100644
--- a/rocketmq-go/model/config/consumer_config.go
+++ b/rocketmq-go/model/config/consumer_config.go
@@ -18,22 +18,30 @@ package config
import "time"
-type RocketMqConsumerConfig struct {
- ConsumeFromWhere string
- /**
- * Minimum consumer thread number
- */
- //consumeThreadMin int
- // /**
- // * Max consumer thread number
- // */
- //consumeThreadMax int
+/**
+ * Delay some time when exception occur
+ */
+const PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION int64 = 3000
- /**
- * Threshold for dynamic adjustment of the number of thread pool
- */
- //adjustThreadPoolNumsThreshold int // = 100000;
+/**
+ * Flow control interval
+ */
+const PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL int64 = 50
+//consume from where
+//first consume from the last offset
+const CONSUME_FROM_LAST_OFFSET string = "CONSUME_FROM_LAST_OFFSET"
+
+//first consume from the first offset
+const CONSUME_FROM_FIRST_OFFSET string = "CONSUME_FROM_FIRST_OFFSET"
+
+//first consume from the time
+const CONSUME_FROM_TIMESTAMP string = "CONSUME_FROM_TIMESTAMP"
+
+//consume from where
+
+type RocketMqConsumerConfig struct {
+ ConsumeFromWhere string
/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
@@ -94,6 +102,26 @@ type RocketMqConsumerConfig struct {
}
func NewRocketMqConsumerConfig() (consumerConfig *RocketMqConsumerConfig) {
- consumerConfig = &RocketMqConsumerConfig{}
+ consumerConfig = &RocketMqConsumerConfig{
+ ConsumeFromWhere: CONSUME_FROM_LAST_OFFSET,
+ ConsumeConcurrentlyMaxSpan: 2000,
+ PullThresholdForQueue: 1000,
+ PullInterval: 0,
+ ConsumeMessageBatchMaxSize: 1,
+ PullBatchSize: 32,
+ PostSubscriptionWhenPull: false,
+ UnitMode: false,
+ MaxReconsumeTimes: 16,
+ SuspendCurrentQueueTimeMillis: 1000,
+ ConsumeTimeout: 15,
+ ConsumeTimestamp: time.Now().Add(-30 * time.Minute),
+
+ // use custom or constants.don't suggest to change
+ PullTimeDelayMillsWhenException: PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION,
+ PullTimeDelayMillsWhenFlowControl: PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL,
+ PullTimeDelayMillsWhenSuspend: 1000,
+ BrokerSuspendMaxTimeMillis: 1000 * 15,
+ ConsumerTimeoutMillisWhenSuspend: 1000 * 30,
+ }
return
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/config/producer_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/producer_config.go b/rocketmq-go/model/config/producer_config.go
index ce109fb..3bf4844 100644
--- a/rocketmq-go/model/config/producer_config.go
+++ b/rocketmq-go/model/config/producer_config.go
@@ -17,4 +17,50 @@
package config
type RocketMqProducerConfig struct {
+ SendMsgTimeout int64 //done
+ //private int sendMsgTimeout = 3000;
+ CompressMsgBodyOverHowMuch int //done
+ //private int compressMsgBodyOverHowmuch = 1024 * 4;
+ ZipCompressLevel int //done
+ //private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
+ /**
+ * Just for testing or demo program
+ */
+ // private String createTopicKey = MixAll.DEFAULT_TOPIC;
+
+ //DefaultTopicQueueNums int
+ ////private volatile int defaultTopicQueueNums = 4;
+
+ RetryTimesWhenSendFailed int
+ //private int retryTimesWhenSendFailed = 2;
+ RetryTimesWhenSendAsyncFailed int
+ //private int retryTimesWhenSendAsyncFailed = 2;
+ //
+ RetryAnotherBrokerWhenNotStoreOK bool
+ //private boolean retryAnotherBrokerWhenNotStoreOK = false;
+ MaxMessageSize int
+ //private int maxMessageSize = 1024 * 1024 * 4; // 4M
+
+ //for MQFaultStrategy todo to be done
+ SendLatencyFaultEnable bool //false
+ LatencyMax []int64 //= {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
+ NotAvailableDuration []int64 // {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
+}
+
+//set defaultValue
+func NewProducerConfig() (producerConfig *RocketMqProducerConfig) {
+ producerConfig = &RocketMqProducerConfig{
+ SendMsgTimeout: 3000,
+ CompressMsgBodyOverHowMuch: 1024 * 4,
+ ZipCompressLevel: 5,
+ MaxMessageSize: 1024 * 1024 * 4, // 4M
+
+ RetryTimesWhenSendFailed: 2,
+ RetryTimesWhenSendAsyncFailed: 2, //
+ RetryAnotherBrokerWhenNotStoreOK: false,
+ SendLatencyFaultEnable: false,
+ LatencyMax: []int64{50, 100, 550, 1000, 2000, 3000, 15000},
+ NotAvailableDuration: []int64{0, 0, 30000, 60000, 120000, 180000, 600000},
+ }
+ return
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/config/rocketmq_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/rocketmq_config.go b/rocketmq-go/model/config/rocketmq_config.go
deleted file mode 100644
index 56e89b9..0000000
--- a/rocketmq-go/model/config/rocketmq_config.go
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package config
-
-type RocketMqClientConfig struct {
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/constant/config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/config.go b/rocketmq-go/model/constant/config.go
index c48dfa5..5f7f5db 100644
--- a/rocketmq-go/model/constant/config.go
+++ b/rocketmq-go/model/constant/config.go
@@ -26,4 +26,4 @@ var USE_HEADER_SERIALIZETYPE = JSON_SERIALIZE
var REMOTING_COMMAND_FLAG = 0
var REMOTING_COMMAND_LANGUAGE = "OTHER"
-var REMOTING_COMMAND_VERSION int16 = 137
+var REMOTING_COMMAND_VERSION int16 = 213
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/constant/mix_all.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/mix_all.go b/rocketmq-go/model/constant/mix_all.go
index 6abaabe..2c8f0a7 100644
--- a/rocketmq-go/model/constant/mix_all.go
+++ b/rocketmq-go/model/constant/mix_all.go
@@ -44,9 +44,6 @@ const (
CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL"
CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_"
- //public static final List<String> LocalInetAddrs = getLocalInetAddress()
- //Localhost = localhost()
- //DEFAULT_CHARSET = "UTF-8"
MASTER_ID int64 = 0
CURRENT_JVM_PID
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/header/pull_message_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/pull_message_request_header.go b/rocketmq-go/model/header/pull_message_request_header.go
index 0133796..f341b49 100644
--- a/rocketmq-go/model/header/pull_message_request_header.go
+++ b/rocketmq-go/model/header/pull_message_request_header.go
@@ -17,8 +17,18 @@
package header
type PullMessageRequestHeader struct {
+ ConsumerGroup string `json:"consumerGroup"`
+ Topic string `json:"topic"`
+ QueueId int32 `json:"queueId"`
+ QueueOffset int64 `json:"queueOffset"`
+ MaxMsgNums int32 `json:"maxMsgNums"`
+ SysFlag int32 `json:"sysFlag"`
+ CommitOffset int64 `json:"commitOffset"`
+ SuspendTimeoutMillis int64 `json:"suspendTimeoutMillis"`
+ Subscription string `json:"subscription"`
+ SubVersion int64 `json:"subVersion"`
}
-func (header *PullMessageRequestHeader) FromMap(headerMap map[string]interface{}) {
+func (self *PullMessageRequestHeader) FromMap(headerMap map[string]interface{}) {
return
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/header/send_message_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/send_message_request_header.go b/rocketmq-go/model/header/send_message_request_header.go
index 5c828a8..80f17cc 100644
--- a/rocketmq-go/model/header/send_message_request_header.go
+++ b/rocketmq-go/model/header/send_message_request_header.go
@@ -17,21 +17,20 @@
package header
type SendMessageRequestHeader struct {
- //CommandCustomHeader
- ProducerGroup string
- Topic string
- DefaultTopic string
- DefaultTopicQueueNum int
- QueueID int
- SysFlag int
- BornTimestamp int
- Flag int
- Properties string
- ReconsumeTimes int
- UnitMode bool
- MaxReconsumeTimes int
+ ProducerGroup string `json:"producerGroup"`
+ Topic string `json:"topic"`
+ DefaultTopic string `json:"defaultTopic"`
+ DefaultTopicQueueNums int `json:"defaultTopicQueueNums"`
+ QueueId int32 `json:"queueId"`
+ SysFlag int `json:"sysFlag"`
+ BornTimestamp int64 `json:"bornTimestamp"`
+ Flag int `json:"flag"`
+ Properties string `json:"properties"`
+ ReconsumeTimes int `json:"reconsumeTimes"`
+ UnitMode bool `json:"unitMode"`
+ MaxReconsumeTimes int `json:"maxReconsumeTimes"`
}
-func (header *SendMessageRequestHeader) FromMap(headerMap map[string]interface{}) {
- //TODO
+func (self *SendMessageRequestHeader) FromMap(headerMap map[string]interface{}) {
+ return
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/message/message_queue.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message/message_queue.go b/rocketmq-go/model/message/message_queue.go
deleted file mode 100644
index 20b47be..0000000
--- a/rocketmq-go/model/message/message_queue.go
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package message
-
-type MessageQueue struct {
- topic string
- brokerName string
- queueId int32
-}
-
-func NewMessageQueue(topic string, brokerName string, queueId int32) *MessageQueue {
- return &MessageQueue{
- topic: topic,
- brokerName: brokerName,
- queueId: queueId,
- }
-}
-
-func (queue *MessageQueue) clone() *MessageQueue {
- no := new(MessageQueue)
- no.topic = queue.topic
- no.queueId = queue.queueId
- no.brokerName = queue.brokerName
- return no
-}
-
-func (queue MessageQueue) BrokerName() string {
- return queue.brokerName
-}
-
-func (queue *MessageQueue) QueueID() int32 {
- return queue.queueId
-}
-
-type MessageQueues []*MessageQueue
-
-func (queues MessageQueues) Less(i, j int) bool {
- imq := queues[i]
- jmq := queues[j]
-
- if imq.topic < jmq.topic {
- return true
- }
-
- if imq.topic < jmq.topic {
- return false
- }
-
- if imq.brokerName < jmq.brokerName {
- return true
- }
-
- if imq.brokerName < jmq.brokerName {
- return false
- }
-
- if imq.queueId < jmq.queueId {
- return true
- }
-
- return false
-}
-
-func (queues MessageQueues) Swap(i, j int) {
- queues[i], queues[j] = queues[j], queues[i]
-}
-
-func (queues MessageQueues) Len() int {
- return len(queues)
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/process_queue.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/process_queue.go b/rocketmq-go/model/process_queue.go
index 285cbda..e0075e0 100644
--- a/rocketmq-go/model/process_queue.go
+++ b/rocketmq-go/model/process_queue.go
@@ -191,424 +191,3 @@ func (self *ProcessQueue) PutMessage(msgs []MessageExt) (dispatchToConsume bool)
}
return
}
-
-//func (self *ProcessQueue) TakeMessages(batchSize int) (messageToConsumeList []MessageExt) {
-// defer self.lockTreeMap.Unlock()
-// self.lockTreeMap.Lock()
-// self.lastConsumeTimestamp = time.Now()
-// it := self.msgTreeMap.Iterator()
-// nowIndex := 0
-// for it.Next() {
-// offset, message := it.Key(), it.Value()
-// if (nowIndex >= batchSize) {
-// break
-// }
-// self.msgTreeMap.Remove(offset)
-// self.msgTreeMapToBeConsume.Put(offset, message)
-// //messageToConsumeList = append(messageToConsumeList, message)
-// }
-// if (len(messageToConsumeList) == 0) {
-// self.consuming = false
-// }
-// return
-//}
-
-/**
-#
-public final static long RebalanceLockMaxLiveTime =Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
-public final static long RebalanceLockInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
-#并发消费过期的
- case CONSUME_PASSIVELY:
- pq.setDropped(true);
- if (this.removeUnnecessaryMessageQueue(mq, pq)) {
- it.remove();
- changed = true;
- log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
- consumerGroup, mq);
- }
- break;
-private final static long PullMaxIdleTime = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
-private final Logger log = ClientLogger.getLog();
-private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
-
-private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
-private final AtomicLong msgCount = new AtomicLong();
-private final Lock lockConsume = new ReentrantLock();
-private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
-private final AtomicLong tryUnlockTimes = new AtomicLong(0);
-private volatile long queueOffsetMax = 0L;
-private volatile boolean dropped = false;
-private volatile long lastPullTimestamp = System.currentTimeMillis();
-private volatile long lastConsumeTimestamp = System.currentTimeMillis();
-private volatile boolean locked = false;
-private volatile long lastLockTimestamp = System.currentTimeMillis();
-private volatile boolean consuming = false;
-private volatile long msgAccCnt = 0;
-
- public boolean isLockExpired() {
- boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > RebalanceLockMaxLiveTime;
- return result;
- }
-
-
- public boolean isPullExpired() {
- boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PullMaxIdleTime;
- return result;
- }
-
-param pushConsumer
-cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
-if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
-return;
-}
-
-int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
-for (int i = 0; i < loop; i++) {
-MessageExt msg = null;
-try {
-this.lockTreeMap.readLock().lockInterruptibly();
-try {
-if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
-msg = msgTreeMap.firstEntry().getValue();
-} else {
-
-break;
-}
-} finally {
-this.lockTreeMap.readLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("getExpiredMsg exception", e);
-}
-
-try {
-
-pushConsumer.sendMessageBack(msg, 3);
-log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
-try {
-msgTreeMap.remove(msgTreeMap.firstKey());
-} catch (Exception e) {
-log.error("send expired msg exception", e);
-}
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("getExpiredMsg exception", e);
-}
-} catch (Exception e) {
-log.error("send expired msg exception", e);
-}
-}
-}
-
-
-public boolean putMessage(final List<MessageExt> msgs) {
-boolean dispatchToConsume = false;
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-int validMsgCnt = 0;
-for (MessageExt msg : msgs) {
-MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
-if (null == old) {
-validMsgCnt++;
-this.queueOffsetMax = msg.getQueueOffset();
-}
-}
-msgCount.addAndGet(validMsgCnt);
-
-if (!msgTreeMap.isEmpty() && !this.consuming) {
-dispatchToConsume = true;
-this.consuming = true;
-}
-
-if (!msgs.isEmpty()) {
-MessageExt messageExt = msgs.get(msgs.size() - 1);
-String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
-if (property != null) {
-long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
-if (accTotal > 0) {
-this.msgAccCnt = accTotal;
-}
-}
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("putMessage exception", e);
-}
-
-return dispatchToConsume;
-}
-
-
-public long getMaxSpan() {
-try {
-this.lockTreeMap.readLock().lockInterruptibly();
-try {
-if (!this.msgTreeMap.isEmpty()) {
-return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
-}
-} finally {
-this.lockTreeMap.readLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("getMaxSpan exception", e);
-}
-
-return 0;
-}
-
-
-public long removeMessage(final List<MessageExt> msgs) { //treeMap是维护了没有消费的 为了处理过期使用
-long result = -1;
-final long now = System.currentTimeMillis();
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-this.lastConsumeTimestamp = now;
-try {
-if (!msgTreeMap.isEmpty()) {
-result = this.queueOffsetMax + 1;
-int removedCnt = 0;
-for (MessageExt msg : msgs) {
-MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
-if (prev != null) {
-removedCnt--;
-}
-}
-msgCount.addAndGet(removedCnt);
-
-if (!msgTreeMap.isEmpty()) {
-result = msgTreeMap.firstKey();
-}
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (Throwable t) {
-log.error("removeMessage exception", t);
-}
-
-return result;
-}
-
-
-public TreeMap<Long, MessageExt> getMsgTreeMap() {
-return msgTreeMap;
-}
-
-
-public AtomicLong getMsgCount() {
-return msgCount;
-}
-
-
-public boolean isDropped() {
-return dropped;
-}
-
-
-public void setDropped(boolean dropped) {
-this.dropped = dropped;
-}
-
-public boolean isLocked() {
-return locked;
-}
-
-public void setLocked(boolean locked) {
-this.locked = locked;
-}
-
-public void rollback() {
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-this.msgTreeMap.putAll(this.msgTreeMapTemp);
-this.msgTreeMapTemp.clear();
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("rollback exception", e);
-}
-}
-
-
-public long commit() {
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-Long offset = this.msgTreeMapTemp.lastKey();
-msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
-this.msgTreeMapTemp.clear();
-if (offset != null) {
-return offset + 1;
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("commit exception", e);
-}
-
-return -1;
-}
-
-
-public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-for (MessageExt msg : msgs) {
-this.msgTreeMapTemp.remove(msg.getQueueOffset());
-this.msgTreeMap.put(msg.getQueueOffset(), msg);
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("makeMessageToCosumeAgain exception", e);
-}
-}
-
-
-public List<MessageExt> takeMessags(final int batchSize) {
-List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
-final long now = System.currentTimeMillis();
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-this.lastConsumeTimestamp = now;
-try {
-if (!this.msgTreeMap.isEmpty()) {
-for (int i = 0; i < batchSize; i++) {
-Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
-if (entry != null) {
-result.add(entry.getValue());
-msgTreeMapTemp.put(entry.getKey(), entry.getValue());
-} else {
-break;
-}
-}
-}
-
-if (result.isEmpty()) {
-consuming = false;
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("take Messages exception", e);
-}
-
-return result;
-}
-
-
-public boolean hasTempMessage() {
-try {
-this.lockTreeMap.readLock().lockInterruptibly();
-try {
-return !this.msgTreeMap.isEmpty();
-} finally {
-this.lockTreeMap.readLock().unlock();
-}
-} catch (InterruptedException e) {
-}
-
-return true;
-}
-
-
-public void clear() {
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-this.msgTreeMap.clear();
-this.msgTreeMapTemp.clear();
-this.msgCount.set(0);
-this.queueOffsetMax = 0L;
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("rollback exception", e);
-}
-}
-
-
-
-
-public void setLastLockTimestamp(long lastLockTimestamp) {
-this.lastLockTimestamp = lastLockTimestamp;
-}
-
-
-public Lock getLockConsume() {
-return lockConsume;
-}
-
-
-
-
-public void setLastPullTimestamp(long lastPullTimestamp) {
-this.lastPullTimestamp = lastPullTimestamp;
-}
-
-
-public long getMsgAccCnt() {
-return msgAccCnt;
-}
-
-
-
-public long getTryUnlockTimes() {
-return this.tryUnlockTimes.get();
-}
-
-
-public void incTryUnlockTimes() {
-this.tryUnlockTimes.incrementAndGet();
-}
-
-
-public void fillProcessQueueInfo(final ProcessQueueInfo info) {
-try {
-this.lockTreeMap.readLock().lockInterruptibly();
-
-if (!this.msgTreeMap.isEmpty()) {
-info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
-info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
-info.setCachedMsgCount(this.msgTreeMap.size());
-}
-
-if (!this.msgTreeMapTemp.isEmpty()) {
-info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
-info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
-info.setTransactionMsgCount(this.msgTreeMapTemp.size());
-}
-
-info.setLocked(this.locked);
-info.setTryUnlockTimes(this.tryUnlockTimes.get());
-info.setLastLockTimestamp(this.lastLockTimestamp);
-
-info.setDroped(this.dropped);
-info.setLastPullTimestamp(this.lastPullTimestamp);
-info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
-} catch (Exception e) {
-} finally {
-this.lockTreeMap.readLock().unlock();
-}
-}
-
-
-
-*/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/process_queue_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/process_queue_info.go b/rocketmq-go/model/process_queue_info.go
index 6bd71bd..c221ef8 100644
--- a/rocketmq-go/model/process_queue_info.go
+++ b/rocketmq-go/model/process_queue_info.go
@@ -35,11 +35,3 @@ type ProcessQueueInfo struct {
LastPullTimestamp int64 `json:"lastPullTimestamp"`
LastConsumeTimestamp int64 `json:"lastConsumeTimestamp"`
}
-
-//func (self ProcessQueueInfo) BuildFromProcessQueue(processQueue ProcessQueue) (processQueueInfo ProcessQueueInfo) {
-// processQueueInfo = ProcessQueueInfo{}
-// //processQueueInfo.CommitOffset =
-// processQueueInfo.CachedMsgCount = processQueue.GetMsgCount()
-// processQueueInfo.CachedMsgCount
-// return
-//}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/response_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/response_code.go b/rocketmq-go/model/response_code.go
index ed40a6d..a42120e 100644
--- a/rocketmq-go/model/response_code.go
+++ b/rocketmq-go/model/response_code.go
@@ -28,11 +28,11 @@ const (
// transaction failed, because of add db failed
TransactionFailed = 4
// Broker flush disk timeout
- FlushDiskTimeout = 10
+ //FlushDiskTimeout = 10
// Broker slave unavailable, just for sync double write
SlaveNotAvailable = 11
// Broker write slave timeout, just for sync double write
- FlushSlaveTimeout = 12
+ //FlushSlaveTimeout = 12
// Broker illegal message
MessageIllegal = 13
// Broker, Namesrv not available,maybe service is closing or incorrect permission
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/send_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/send_result.go b/rocketmq-go/model/send_result.go
index 4d3b31f..857b6c4 100644
--- a/rocketmq-go/model/send_result.go
+++ b/rocketmq-go/model/send_result.go
@@ -18,22 +18,21 @@ package model
import (
"fmt"
- "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
)
type SendStatus int
const (
SendOK SendStatus = iota
- //FlushDiskTimeout
- //FlushSlaveTimeout
+ FlushDiskTimeout
+ FlushSlaveTimeout
SlaveNotAvaliable
)
type SendResult struct {
sendStatus SendStatus
msgID string
- messageQueue *message.MessageQueue
+ messageQueue MessageQueue
queueOffset int64
transactionID string
offsetMsgID string
@@ -41,7 +40,7 @@ type SendResult struct {
traceOn bool
}
-func NewSendResult(status SendStatus, msgID, offsetID string, queue *message.MessageQueue, queueOffset int64) *SendResult {
+func NewSendResult(status SendStatus, msgID, offsetID string, queue MessageQueue, queueOffset int64) *SendResult {
return &SendResult{
sendStatus: status,
msgID: msgID,
@@ -87,11 +86,11 @@ func (result *SendResult) SetSendStatus(status SendStatus) {
result.sendStatus = status
}
-func (result *SendResult) MessageQueue() *message.MessageQueue {
+func (result *SendResult) MessageQueue() MessageQueue {
return result.messageQueue
}
-func (result *SendResult) SetMessageQueue(queue *message.MessageQueue) {
+func (result *SendResult) SetMessageQueue(queue MessageQueue) {
result.messageQueue = queue
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/topic_publishInfo.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publishInfo.go b/rocketmq-go/model/topic_publishInfo.go
deleted file mode 100644
index b5f9e37..0000000
--- a/rocketmq-go/model/topic_publishInfo.go
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
-//"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
-)
-
-//type TopicPublishInfo struct {
-// orderTopic bool
-// havaTopicRouteInfo bool
-// messageQueueList []*message.MessageQueue
-// topicRouteData *TopicRouteData
-//}
-//
-//func (info *TopicPublishInfo) SetOrderTopic(b bool) {
-// info.orderTopic = b
-//}
-//
-//func (info *TopicPublishInfo) Ok() bool {
-// return false
-//}
-//
-//func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue {
-// return info.messageQueueList
-//}
-//
-//func (info *TopicPublishInfo) HaveTopicRouteInfo() bool {
-// return info.havaTopicRouteInfo
-//}
-//
-//func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) {
-// info.havaTopicRouteInfo = b
-//}
-//
-//func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData {
-// return info.topicRouteData
-//}
-//
-//func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) {
-// info.topicRouteData = routeDate
-//}
-//
-//func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue {
-// return nil //TODO
-//}
-//
-//func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName string) *message.MessageQueue {
-// if brokerName == "" {
-// return info.SelectOneMessageQueue()
-// }
-// return nil //TODO
-//}
-//
-//func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int {
-// return 0 //TODO
-//}
-//
-//func (info *TopicPublishInfo) String() string {
-// return ""
-//}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/topic_publish_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publish_info.go b/rocketmq-go/model/topic_publish_info.go
index 14ec088..26a541c 100644
--- a/rocketmq-go/model/topic_publish_info.go
+++ b/rocketmq-go/model/topic_publish_info.go
@@ -29,12 +29,6 @@ type TopicPublishInfo struct {
topicQueueIndex int32
}
-//private boolean orderTopic = false;
-//private boolean haveTopicRouterInfo = false;
-//private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
-//private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0); // todo
-//private TopicRouteData topicRouteData;
-
func (self *TopicPublishInfo) JudgeTopicPublishInfoOk() (bIsTopicOk bool) {
bIsTopicOk = (len(self.MessageQueueList) > 0)
return
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/topic_route_data.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_route_data.go b/rocketmq-go/model/topic_route_data.go
index 348479f..9c1ab27 100644
--- a/rocketmq-go/model/topic_route_data.go
+++ b/rocketmq-go/model/topic_route_data.go
@@ -18,114 +18,9 @@
package model
import (
- //"fmt"
- //"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
"sync"
)
-//
-//type BrokerData struct {
-//}
-//
-//type TopicRouteData struct {
-// orderTopicConf string
-// queueDatas []*message.MessageQueue
-// brokerDatas []*BrokerData
-// filterServerTable map[string][]string
-//}
-//
-//func NewTopicRouteData() *TopicRouteData {
-// return &TopicRouteData{}
-//}
-//
-//func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData *TopicRouteData) {
-// clonedRouteData = &TopicRouteData{
-// route.orderTopicConf,
-// route.queueDatas,
-// route.brokerDatas,
-// route.filterServerTable,
-// }
-// // TODO: to complete
-// return
-//}
-//
-//func (route *TopicRouteData) QueueDatas() []*message.MessageQueue {
-// return route.queueDatas
-//}
-//
-//func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) {
-// route.queueDatas = data
-//}
-//
-//func (route *TopicRouteData) BrokerDatas() []*BrokerData {
-// return route.brokerDatas
-//}
-//
-//func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) {
-// route.brokerDatas = data
-//}
-//
-//func (route *TopicRouteData) FilterServerTable() map[string][]string {
-// return route.filterServerTable
-//}
-//
-//func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) {
-// route.filterServerTable = data
-//}
-//
-//func (route *TopicRouteData) OrderTopicConf() string {
-// return route.orderTopicConf
-//}
-//
-//func (route *TopicRouteData) SetOrderTopicConf(s string) {
-// route.orderTopicConf = s
-//}
-//
-//func (route *TopicRouteData) HashCode() (result int) {
-// prime := 31
-// result = 1
-// result *= prime
-// // TODO
-//
-// return
-//}
-//
-//func (route *TopicRouteData) Equals(route1 interface{}) bool {
-// if route == nil {
-// return true
-// }
-// if route1 == nil {
-// return false
-// }
-// //value, ok := route1.(TopicRouteData)
-// //if !ok {
-// // return false
-// //}
-// // TODO
-// //if route.brokerDatas == nil && value.brokerDatas != nil || len(route.brokerDatas) != len(value.brokerDatas) {
-// // return false
-// //}
-// //
-// //if route.orderTopicConf == "" && value.orderTopicConf != "" || route.orderTopicConf != value.orderTopicConf {
-// // return false
-// //}
-// //
-// //if route.queueDatas == nil && value.queueDatas != nil || route.queueDatas != value.queueDatas {
-// // return false
-// //}
-// //
-// //if route.filterServerTable == nil && value.filterServerTable != nil ||
-// // route.filterServerTable != value.filterServerTable {
-// // return false
-// //}
-// return true
-//}
-//
-//func (route *TopicRouteData) String() string {
-// return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, brokerDatas=%s, filterServerTable=%s]",
-// route.orderTopicConf, route.queueDatas, route.brokerDatas, route.filterServerTable)
-//}
-
type TopicRouteData struct {
OrderTopicConf string
QueueDatas []*QueueData
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/mq_client_manage.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_client_manage.go b/rocketmq-go/mq_client_manage.go
new file mode 100644
index 0000000..7903116
--- /dev/null
+++ b/rocketmq-go/mq_client_manage.go
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util/structs"
+ "github.com/golang/glog"
+ "strings"
+ "sync"
+ "time"
+)
+
+//@see com.alibaba.rocketmq.client.impl.factory.MQClientInstance
+type MqClientManager struct {
+ rocketMqManagerLock sync.Mutex
+ //ClientId string
+ BootTimestamp int64
+
+ clientFactory *ClientFactory
+
+ NamesrvLock sync.Mutex
+ HeartBeatLock sync.Mutex
+ //all producer and consumer use this
+ mqClient service.RocketMqClient
+ //all producer and consumer use this
+ //private final ClientRemotingProcessor clientRemotingProcessor;
+ // private final PullMessageService pullMessageService;
+ //private final RebalanceService rebalanceService;
+ // private final ConsumerStatsManager consumerStatsManager;
+ // private final AtomicLong storeTimesTotal = new AtomicLong(0);
+ ServiceState int
+
+ //should be here because need all producer consumer
+ pullMessageController *PullMessageController
+ cleanExpireMsgController *CleanExpireMsgController
+ rebalanceControllr *RebalanceController
+ //should be here because need all producer consumer
+ defaultProducerService *service.DefaultProducerService //for send back message
+}
+
+func MqClientManagerInit(clientConfig *config.ClientConfig) (rocketMqManager *MqClientManager) {
+ rocketMqManager = &MqClientManager{}
+ rocketMqManager.BootTimestamp = time.Now().Unix()
+ rocketMqManager.clientFactory = ClientFactoryInit()
+ rocketMqManager.mqClient = service.MqClientInit(clientConfig, rocketMqManager.InitClientRequestProcessor()) // todo todo todo
+ rocketMqManager.pullMessageController = NewPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
+ rocketMqManager.cleanExpireMsgController = NewCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
+ rocketMqManager.rebalanceControllr = NewRebalanceController(rocketMqManager.clientFactory)
+
+ return
+}
+
+//CHECK_TRANSACTION_STATE
+//NOTIFY_CONSUMER_IDS_CHANGED
+//RESET_CONSUMER_CLIENT_OFFSET
+//GET_CONSUMER_STATUS_FROM_CLIENT
+//GET_CONSUMER_RUNNING_INFO
+//CONSUME_MESSAGE_DIRECTLY
+func (self *MqClientManager) InitClientRequestProcessor() (clientRequestProcessor remoting.ClientRequestProcessor) {
+ clientRequestProcessor = func(cmd *remoting.RemotingCommand) (response *remoting.RemotingCommand) {
+ switch cmd.Code {
+ case remoting.CHECK_TRANSACTION_STATE:
+ glog.V(2).Info("receive_request_code CHECK_TRANSACTION_STATE")
+ // todo this version don't impl this
+ break
+ case remoting.NOTIFY_CONSUMER_IDS_CHANGED:
+ glog.V(1).Info("receive_request_code NOTIFY_CONSUMER_IDS_CHANGED")
+ self.rebalanceControllr.doRebalance()
+ break
+ case remoting.RESET_CONSUMER_CLIENT_OFFSET: // struct json key supported
+ glog.V(2).Info("receive_request_code RESET_CONSUMER_CLIENT_OFFSET")
+ glog.V(2).Info("op=look cmd body", string(cmd.Body))
+ var resetOffsetRequestHeader = &header.ResetOffsetRequestHeader{}
+ if cmd.ExtFields != nil {
+ resetOffsetRequestHeader.FromMap(cmd.ExtFields) //change map[string]interface{} into CustomerHeader struct
+ glog.V(2).Info("op=look ResetOffsetRequestHeader", resetOffsetRequestHeader)
+ resetOffsetBody := &model.ResetOffsetBody{}
+ err := resetOffsetBody.Decode(cmd.Body)
+ if err != nil {
+ return
+ }
+ glog.V(2).Info("op=look resetOffsetBody xxxxx", resetOffsetBody)
+ self.resetConsumerOffset(resetOffsetRequestHeader.Topic, resetOffsetRequestHeader.Group, resetOffsetBody.OffsetTable)
+ }
+ break
+ case remoting.GET_CONSUMER_STATUS_FROM_CLIENT: // useless we can use GET_CONSUMER_RUNNING_INFO instead
+ glog.V(2).Info("receive_request_code GET_CONSUMER_STATUS_FROM_CLIENT")
+ break
+ case remoting.GET_CONSUMER_RUNNING_INFO:
+ glog.V(2).Info("receive_request_code GET_CONSUMER_RUNNING_INFO")
+ var getConsumerRunningInfoRequestHeader = &header.GetConsumerRunningInfoRequestHeader{}
+ if cmd.ExtFields != nil {
+ getConsumerRunningInfoRequestHeader.FromMap(cmd.ExtFields) //change map[string]interface{} into CustomerHeader struct
+ consumerRunningInfo := model.ConsumerRunningInfo{}
+ consumerRunningInfo.Properties = map[string]string{}
+ defaultMQPushConsumer := self.clientFactory.ConsumerTable[getConsumerRunningInfoRequestHeader.ConsumerGroup]
+ consumerConfigMap := structs.Map(defaultMQPushConsumer.ConsumerConfig) // todo test
+ for key, value := range consumerConfigMap {
+ consumerRunningInfo.Properties[key] = fmt.Sprintf("%v", value)
+ }
+
+ consumerRunningInfo.Properties["PROP_NAMESERVER_ADDR"] = strings.Join(defaultMQPushConsumer.mqClient.GetRemotingClient().GetNamesrvAddrList(), ";")
+ consumerRunningInfo.MqTable = defaultMQPushConsumer.rebalance.GetMqTableInfo()
+
+ glog.V(2).Info("op=look consumerRunningInfo", consumerRunningInfo)
+ jsonByte, err := consumerRunningInfo.Encode()
+ glog.V(2).Info("op=enCode jsonByte", string(jsonByte))
+ if err != nil {
+ glog.Error(err)
+ return
+ }
+ response = remoting.NewRemotingCommandWithBody(remoting.SUCCESS, nil, jsonByte)
+ }
+
+ break
+ case remoting.CONSUME_MESSAGE_DIRECTLY:
+ glog.V(2).Info("receive_request_code CONSUME_MESSAGE_DIRECTLY")
+ var consumeMessageDirectlyResultRequestHeader = &header.ConsumeMessageDirectlyResultRequestHeader{}
+ if cmd.ExtFields != nil {
+ consumeMessageDirectlyResultRequestHeader.FromMap(cmd.ExtFields)
+ messageExt := &DecodeMessage(cmd.Body)[0]
+ glog.V(2).Info("op=look", messageExt)
+ defaultMQPushConsumer := self.clientFactory.ConsumerTable[consumeMessageDirectlyResultRequestHeader.ConsumerGroup]
+ consumeResult, err := defaultMQPushConsumer.consumeMessageService.ConsumeMessageDirectly(messageExt, consumeMessageDirectlyResultRequestHeader.BrokerName)
+ if err != nil {
+ return
+ }
+ jsonByte, err := json.Marshal(consumeResult)
+ if err != nil {
+ glog.Error(err)
+ return
+ }
+ response = remoting.NewRemotingCommandWithBody(remoting.SUCCESS, nil, jsonByte)
+ }
+ default:
+ glog.Error("illeage requestCode ", cmd.Code)
+ }
+ return
+ }
+ return
+}
+func (self *MqClientManager) RegistProducer(producer *DefaultMQProducer) {
+ producer.producerService = service.NewDefaultProducerService(producer.producerGroup, producer.ProducerConfig, self.mqClient)
+ self.clientFactory.ProducerTable[producer.producerGroup] = producer
+ return
+}
+
+func (self *MqClientManager) resetConsumerOffset(topic, group string, offsetTable map[model.MessageQueue]int64) {
+ consumer := self.clientFactory.ConsumerTable[group]
+ if consumer == nil {
+ glog.Error("resetConsumerOffset beacuse consumer not online,group=", group)
+ return
+ }
+ consumer.resetOffset(offsetTable)
+}
+func (self *MqClientManager) RegistConsumer(consumer *DefaultMQPushConsumer) {
+ if self.defaultProducerService == nil {
+ self.defaultProducerService = service.NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, config.NewProducerConfig(), self.mqClient)
+ }
+ consumer.mqClient = self.mqClient
+ consumer.offsetStore = service.RemoteOffsetStoreInit(consumer.consumerGroup, self.mqClient)
+ self.clientFactory.ConsumerTable[consumer.consumerGroup] = consumer
+ consumer.rebalance = service.NewRebalance(consumer.consumerGroup, consumer.subscription, consumer.mqClient, consumer.offsetStore, consumer.ConsumerConfig)
+
+ fmt.Println(consumer.consumeMessageService)
+
+ consumer.consumeMessageService.Init(consumer.consumerGroup, self.mqClient, consumer.offsetStore, self.defaultProducerService, consumer.ConsumerConfig)
+ return
+}
+
+func (self *MqClientManager) Start() {
+ //self.SendHeartbeatToAllBrokerWithLock()//we should send heartbeat first
+ self.StartAllScheduledTask()
+}
+
+func (self MqClientManager) ShutDown() {
+
+}
+
+type ClientFactory struct {
+ ProducerTable map[string]*DefaultMQProducer //group|RocketMQProducer
+ ConsumerTable map[string]*DefaultMQPushConsumer //group|Consumer
+}
+
+func ClientFactoryInit() (clientFactory *ClientFactory) {
+ clientFactory = &ClientFactory{}
+ clientFactory.ProducerTable = make(map[string]*DefaultMQProducer)
+ clientFactory.ConsumerTable = make(map[string]*DefaultMQPushConsumer)
+ return
+}
+
+//heart beat
+func (self MqClientManager) SendHeartbeatToAllBrokerWithLock() error {
+ heartbeatData := self.prepareHeartbeatData()
+ if len(heartbeatData.ConsumerDataSet) == 0 {
+ return errors.New("send heartbeat error")
+ }
+ self.mqClient.SendHeartbeatToAllBroker(heartbeatData)
+ return nil
+}
+
+//routeInfo
+func (self MqClientManager) UpdateTopicRouteInfoFromNameServer() {
+ var topicSet []string
+ for _, consumer := range self.clientFactory.ConsumerTable {
+ for key, _ := range consumer.subscription {
+ topicSet = append(topicSet, key)
+ }
+ }
+ topicSet = append(topicSet, self.mqClient.GetPublishTopicList()...)
+ for _, topic := range topicSet {
+ self.mqClient.UpdateTopicRouteInfoFromNameServer(topic)
+
+ }
+}
+
+func (self MqClientManager) prepareHeartbeatData() *model.HeartbeatData {
+ heartbeatData := new(model.HeartbeatData)
+ heartbeatData.ClientId = self.mqClient.GetClientId()
+ heartbeatData.ConsumerDataSet = make([]*model.ConsumerData, 0)
+ heartbeatData.ProducerDataSet = make([]*model.ProducerData, 0)
+ for group, consumer := range self.clientFactory.ConsumerTable {
+ consumerData := new(model.ConsumerData)
+ consumerData.GroupName = group
+ consumerData.ConsumeType = consumer.consumeType
+ consumerData.ConsumeFromWhere = consumer.ConsumerConfig.ConsumeFromWhere
+ consumerData.MessageModel = consumer.messageModel
+ consumerData.SubscriptionDataSet = consumer.Subscriptions()
+ consumerData.UnitMode = consumer.unitMode
+ heartbeatData.ConsumerDataSet = append(heartbeatData.ConsumerDataSet, consumerData)
+ }
+ for group := range self.clientFactory.ProducerTable {
+ producerData := new(model.ProducerData)
+ producerData.GroupName = group
+ heartbeatData.ProducerDataSet = append(heartbeatData.ProducerDataSet, producerData)
+ }
+ return heartbeatData
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/mq_client_manager.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_client_manager.go b/rocketmq-go/mq_client_manager.go
deleted file mode 100644
index 731158f..0000000
--- a/rocketmq-go/mq_client_manager.go
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package rocketmq
-
-import (
- "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
- "sync"
- "time"
-)
-
-type MqClientManager struct {
- clientFactory *ClientFactory
- rocketMqClient service.RocketMqClient
- pullMessageController *PullMessageController
- defaultProducerService RocketMQProducer //for send back message
-
- rocketMqManagerLock sync.Mutex
- //ClientId string
- BootTimestamp int64
-
- NamesrvLock sync.Mutex
- HeartBeatLock sync.Mutex
- //rebalanceControllr *RebalanceController
-}
-
-type MqClientConfig struct {
-}
-
-func NewMqClientManager(clientConfig *MqClientConfig) (rocketMqManager *MqClientManager) {
- rocketMqManager = &MqClientManager{}
- rocketMqManager.BootTimestamp = time.Now().Unix()
- rocketMqManager.clientFactory = clientFactoryInit()
- //rocketMqManager.rocketMqClient =
- //rocketMqManager.pullMessageController = NewPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
- //rocketMqManager.cleanExpireMsgController = NewCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
- //rocketMqManager.rebalanceControllr = NewRebalanceController(rocketMqManager.clientFactory)
-
- return
-}
-
-func (self *MqClientManager) RegisterProducer(producer *DefaultMQProducer) {
- return
-}
-
-func (self *MqClientManager) RegisterConsumer(consumer RocketMQConsumer) {
- // todo check config
- //if (self.defaultProducerService == nil) {
- // self.defaultProducerService = service.NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, mq_config.NewProducerConfig(), self.mqClient)
- //}
- return
-}
-
-func (self *MqClientManager) Start() {
- //self.SendHeartbeatToAllBrokerWithLock()//we should send heartbeat first
- self.startAllScheduledTask()
-}
-func (manager *MqClientManager) startAllScheduledTask() {
-
-}
-
-func clientFactoryInit() (clientFactory *ClientFactory) {
- clientFactory = &ClientFactory{}
- clientFactory.ProducerTable = make(map[string]RocketMQProducer)
- clientFactory.ConsumerTable = make(map[string]RocketMQConsumer)
- return
-}
-
-type ClientFactory struct {
- ProducerTable map[string]RocketMQProducer //group|RocketMQProducer
- ConsumerTable map[string]RocketMQConsumer //group|Consumer
-}
-
-type PullMessageController struct {
- rocketMqClient service.RocketMqClient
- clientFactory *ClientFactory
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/mq_consumer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_consumer.go b/rocketmq-go/mq_consumer.go
deleted file mode 100644
index 7112537..0000000
--- a/rocketmq-go/mq_consumer.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package rocketmq
-
-import (
- "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
- "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
-)
-
-type RocketMQConsumer interface {
-}
-
-type MqConsumerConfig struct {
-}
-type DefaultMQPushConsumer struct {
- offsetStore service.OffsetStore //for consumer's offset
- mqClient service.RocketMqClient
- rebalance *service.Rebalance //Rebalance's impl depend on offsetStore
- consumeMessageService service.ConsumeMessageService
- consumerConfig *MqConsumerConfig
-
- consumerGroup string
- //consumeFromWhere string
- consumeType string
- messageModel string
- unitMode bool
-
- subscription map[string]string //topic|subExpression
- subscriptionTag map[string][]string // we use it filter again
- // 分配策略
- pause bool //when reset offset we need pause
-}
-
-func NewDefaultMQPushConsumer(consumerGroup string, mqConsumerConfig *MqConsumerConfig) (defaultMQPushConsumer *DefaultMQPushConsumer) {
- defaultMQPushConsumer = &DefaultMQPushConsumer{}
- defaultMQPushConsumer.consumerConfig = mqConsumerConfig
- return
-}
-
-func (self *DefaultMQPushConsumer) RegisterMessageListener(messageListener model.MessageListener) {
- self.consumeMessageService = service.NewConsumeMessageConcurrentlyServiceImpl(messageListener)
-}
-func (self *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) {
- //self.subscription[topic] = subExpression
- //if len(subExpression) == 0 || subExpression == "*" {
- // return
- //}
- //tags := strings.Split(subExpression, "||")
- //tagsList := []string{}
- //for _, tag := range tags {
- // t := strings.TrimSpace(tag)
- // if len(t) == 0 {
- // continue
- // }
- // tagsList = append(tagsList, t)
- //}
- //if len(tagsList) > 0 {
- // self.subscriptionTag[topic] = tagsList
- //}
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/mq_producer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_producer.go b/rocketmq-go/mq_producer.go
index d1a011b..098377d 100644
--- a/rocketmq-go/mq_producer.go
+++ b/rocketmq-go/mq_producer.go
@@ -1,31 +1,40 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package rocketmq
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+import ()
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+)
type RocketMQProducer interface {
+ Send(message *model.Message) (sendResult *model.SendResult, err error)
+ SendWithTimeout(message *model.Message, timeout int) (sendResult *model.SendResult, err error)
+ //SendAsync(message *model.Message) (sendResult *model.SendResult,err error)
+ //SendAsyncWithTimeout(message *model.Message) (sendResult *model.SendResult,err error)
+ //SendOneWay(message *model.Message) (sendResult *model.SendResult,err error)
}
+type DefaultMQProducer struct {
+ producerGroup string
+ ProducerConfig *config.RocketMqProducerConfig
-type MqProducerConfig struct {
+ producerService service.ProducerService
}
-type DefaultMQProducer struct {
- producerGroup string
- mqProducerConfig *MqProducerConfig
- producerService service.ProducerService
+func NewDefaultMQProducer(producerGroup string) (rocketMQProducer *DefaultMQProducer) {
+ rocketMQProducer = &DefaultMQProducer{
+ producerGroup: producerGroup,
+ ProducerConfig: config.NewProducerConfig(),
+ }
+ return
+}
+
+func (self *DefaultMQProducer) Send(message *model.Message) (sendResult *model.SendResult, err error) {
+ sendResult, err = self.producerService.SendDefaultImpl(message, constant.COMMUNICATIONMODE_SYNC, "", self.ProducerConfig.SendMsgTimeout)
+ return
+}
+func (self *DefaultMQProducer) SendWithTimeout(message *model.Message, timeout int64) (sendResult *model.SendResult, err error) {
+ sendResult, err = self.producerService.SendDefaultImpl(message, constant.COMMUNICATIONMODE_SYNC, "", timeout)
+ return
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/mq_push_consumer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_push_consumer.go b/rocketmq-go/mq_push_consumer.go
new file mode 100644
index 0000000..245bbe4
--- /dev/null
+++ b/rocketmq-go/mq_push_consumer.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+ "github.com/golang/glog"
+ "strings"
+ "time"
+)
+
+type Consumer interface {
+ RegisterMessageListener(listener model.MessageListener)
+ Subscribe(topic string, subExpression string)
+}
+
+type DefaultMQPushConsumer struct {
+ consumerGroup string
+ //consumeFromWhere string
+ consumeType string
+ messageModel string
+ unitMode bool
+
+ subscription map[string]string //topic|subExpression
+ subscriptionTag map[string][]string // we use it filter again
+ offsetStore service.OffsetStore
+ mqClient service.RocketMqClient
+ rebalance *service.Rebalance
+ pause bool //when reset offset we need pause
+ consumeMessageService service.ConsumeMessageService
+ ConsumerConfig *config.RocketMqConsumerConfig
+}
+
+func NewDefaultMQPushConsumer(consumerGroup string) (defaultMQPushConsumer *DefaultMQPushConsumer) {
+ defaultMQPushConsumer = &DefaultMQPushConsumer{
+ consumerGroup: consumerGroup,
+ //consumeFromWhere:"CONSUME_FROM_FIRST_OFFSET", //todo use config
+ consumeType: "CONSUME_PASSIVELY",
+ messageModel: "CLUSTERING",
+ pause: false}
+ defaultMQPushConsumer.subscription = make(map[string]string)
+ defaultMQPushConsumer.subscriptionTag = make(map[string][]string)
+ defaultMQPushConsumer.ConsumerConfig = config.NewRocketMqConsumerConfig()
+ return
+}
+func (self *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) {
+ self.subscription[topic] = subExpression
+ if len(subExpression) == 0 || subExpression == "*" {
+ return
+ }
+ tags := strings.Split(subExpression, "||")
+ tagsList := []string{}
+ for _, tag := range tags {
+ t := strings.TrimSpace(tag)
+ if len(t) == 0 {
+ continue
+ }
+ tagsList = append(tagsList, t)
+ }
+ if len(tagsList) > 0 {
+ self.subscriptionTag[topic] = tagsList
+ }
+}
+
+func (self *DefaultMQPushConsumer) RegisterMessageListener(messageListener model.MessageListener) {
+ self.consumeMessageService = service.NewConsumeMessageConcurrentlyServiceImpl(messageListener)
+}
+
+func (self *DefaultMQPushConsumer) resetOffset(offsetTable map[model.MessageQueue]int64) {
+ self.pause = true
+ glog.Info("now we ClearProcessQueue 0 ", offsetTable)
+
+ self.rebalance.ClearProcessQueue(offsetTable)
+ glog.Info("now we ClearProcessQueue", offsetTable)
+ go func() {
+ waitTime := time.NewTimer(10 * time.Second)
+ <-waitTime.C
+ defer func() {
+ self.pause = false
+ self.rebalance.DoRebalance()
+ }()
+
+ for messageQueue, offset := range offsetTable {
+ processQueue := self.rebalance.GetProcessQueue(messageQueue)
+ if processQueue == nil || offset < 0 {
+ continue
+ }
+ glog.Info("now we UpdateOffset", messageQueue, offset)
+ self.offsetStore.UpdateOffset(&messageQueue, offset, false)
+ self.rebalance.RemoveProcessQueue(&messageQueue)
+ }
+ }()
+}
+
+func (self *DefaultMQPushConsumer) Subscriptions() []*model.SubscriptionData {
+ subscriptions := make([]*model.SubscriptionData, 0)
+ for _, subscription := range self.rebalance.SubscriptionInner {
+ subscriptions = append(subscriptions, subscription)
+ }
+ return subscriptions
+}
+
+func (self *DefaultMQPushConsumer) CleanExpireMsg() {
+ nowTime := int64(time.Now().UnixNano()) / 1000000 //will cause nowTime - consumeStartTime <0 ,but no matter
+ messageQueueList, processQueueList := self.rebalance.GetProcessQueueList()
+ for messageQueueIndex, processQueue := range processQueueList {
+ loop := processQueue.GetMsgCount()
+ if loop > 16 {
+ loop = 16
+ }
+ for i := 0; i < loop; i++ {
+ _, message := processQueue.GetMinMessageInTree()
+ if message == nil {
+ break
+ }
+ consumeStartTime := message.GetConsumeStartTime()
+ maxDiffTime := self.ConsumerConfig.ConsumeTimeout * 1000 * 60
+ //maxDiffTime := self.ConsumerConfig.ConsumeTimeout
+ glog.V(2).Info("look message.GetConsumeStartTime()", consumeStartTime)
+ glog.V(2).Infof("look diff %d %d", nowTime-consumeStartTime, maxDiffTime)
+ //if(nowTime - consumeStartTime <0){
+ // panic("nowTime - consumeStartTime <0")
+ //}
+ if nowTime-consumeStartTime < maxDiffTime {
+ break
+ }
+ glog.Info("look now we send expire message back", message.Topic, message.MsgId)
+ err := self.consumeMessageService.SendMessageBack(message, 3, messageQueueList[messageQueueIndex].BrokerName)
+ if err != nil {
+ glog.Error("op=send_expire_message_back_error", err)
+ continue
+ }
+ processQueue.DeleteExpireMsg(int(message.QueueOffset))
+ }
+ }
+ return
+}