You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/05/07 03:58:59 UTC
[3/3] incubator-rocketmq-externals git commit: Go-Client remoting and
RocketMqClient common method implement,
closes apache/incubator-rocketmq-externals#17
Go-Client remoting and RocketMqClient common method implement, closes apache/incubator-rocketmq-externals#17
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/aaa0758e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/aaa0758e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/aaa0758e
Branch: refs/heads/master
Commit: aaa0758e6cdf2d1020a900c236ab84ce4071f2ff
Parents: 7ceba1b
Author: StyleTang <st...@gmail.com>
Authored: Sun May 7 11:58:44 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Sun May 7 11:58:44 2017 +0800
----------------------------------------------------------------------
rocketmq-go/docs/checklist.md | 1 +
rocketmq-go/docs/package.puml | 20 +-
rocketmq-go/docs/roadmap.md | 48 +-
rocketmq-go/example/consumer_example.go | 35 ++
rocketmq-go/example/rocketmq_client_example.go | 40 ++
rocketmq-go/model/config/client_config.go | 3 +-
rocketmq-go/model/config/consumer_config.go | 79 +++
rocketmq-go/model/constant/config.go | 29 +
rocketmq-go/model/constant/message_const.go | 45 ++
rocketmq-go/model/constant/message_sys_flag.go | 26 +
rocketmq-go/model/constant/mix_all.go | 62 ++
rocketmq-go/model/constant/perm.go | 33 +
rocketmq-go/model/constant/pull_sys_flag.go | 24 +
.../model/consume_concurrently_result.go | 27 +
.../model/consume_message_directly_result.go | 31 +
rocketmq-go/model/consumer_running_info.go | 81 +++
...me_message_directly_result_request_header.go | 32 +
.../consumer_send_msg_back_request_header.go | 31 +
.../model/header/get_consumer_list_by_group.go | 33 +
.../get_consumer_running_info_request_header.go | 29 +
.../header/get_max_offset_request_header.go | 26 +
.../header/get_max_offset_response_header.go | 28 +
.../header/get_route_info_request_header.go | 25 +
.../query_consumer_offset_request_header.go | 27 +
.../model/header/reset_offset_request_header.go | 37 ++
.../header/search_offset_request_header.go | 33 +
.../update_consumer_offset_request_header.go | 34 +
rocketmq-go/model/heart_beat.go | 34 +
rocketmq-go/model/message.go | 134 ++++
rocketmq-go/model/message/message.go | 2 +-
rocketmq-go/model/message_ext.go | 71 +++
rocketmq-go/model/message_listener.go | 19 +
rocketmq-go/model/message_queue.go | 77 +++
rocketmq-go/model/process_queue.go | 614 +++++++++++++++++++
rocketmq-go/model/process_queue_info.go | 45 ++
rocketmq-go/model/pull_request.go | 24 +
rocketmq-go/model/reset_offset_body.go | 55 ++
rocketmq-go/model/response_code.go | 16 +
rocketmq-go/model/send_result.go | 2 +-
rocketmq-go/model/subscription_data.go | 26 +
rocketmq-go/model/topic_publishInfo.go | 108 ++--
rocketmq-go/model/topic_publish_info.go | 96 +++
rocketmq-go/model/topic_route_data.go | 216 ++++---
rocketmq-go/mq_client_manager.go | 56 +-
rocketmq-go/mq_consumer.go | 46 +-
rocketmq-go/mq_producer.go | 6 +-
rocketmq-go/remoting/custom_header.go | 1 +
rocketmq-go/remoting/event_executor.go | 144 -----
rocketmq-go/remoting/json_serializable.go | 42 ++
rocketmq-go/remoting/remoting_client.go | 534 ++++++++--------
rocketmq-go/remoting/remoting_command.go | 178 +-----
rocketmq-go/remoting/request_code.go | 111 ++++
rocketmq-go/remoting/request_processor.go | 26 +
rocketmq-go/remoting/response_code.go | 53 ++
rocketmq-go/remoting/response_future.go | 61 +-
rocketmq-go/remoting/rocketmq_serializable.go | 139 +++++
rocketmq-go/remoting/serializable.go | 80 +++
rocketmq-go/service/client_api.go | 39 +-
rocketmq-go/service/consume_message_service.go | 153 +++++
rocketmq-go/service/consume_messsage_service.go | 20 -
rocketmq-go/service/mq_client.go | 343 +++++++++++
rocketmq-go/util/concurrent_map.go | 278 +++++++++
rocketmq-go/util/ip.go | 73 +++
rocketmq-go/util/json_util.go | 157 +++++
rocketmq-go/util/message_client_id_generator.go | 110 ++++
rocketmq-go/util/string_util.go | 91 +++
rocketmq-go/util/structs/field.go | 141 +++++
rocketmq-go/util/structs/structs.go | 581 ++++++++++++++++++
rocketmq-go/util/structs/tags.go | 32 +
69 files changed, 5135 insertions(+), 818 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/docs/checklist.md
----------------------------------------------------------------------
diff --git a/rocketmq-go/docs/checklist.md b/rocketmq-go/docs/checklist.md
new file mode 100644
index 0000000..0412f86
--- /dev/null
+++ b/rocketmq-go/docs/checklist.md
@@ -0,0 +1 @@
+# Test Check List
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/docs/package.puml
----------------------------------------------------------------------
diff --git a/rocketmq-go/docs/package.puml b/rocketmq-go/docs/package.puml
index 9c78711..f34dd38 100644
--- a/rocketmq-go/docs/package.puml
+++ b/rocketmq-go/docs/package.puml
@@ -37,6 +37,18 @@ serviceState
clientIP
instanceName
}
+ class remoting.ClientRequestProcessor{
+ receive request and invoke the method.
+ GET_CONSUMER_STATUS_FROM_CLIENT
+ GET_CONSUMER_RUNNING_INFO
+ ...
+ }
+ class remoting.SerializerHandler{
+ JsonSerializer
+ RocketMqSerializer
+ }
+
+
namespace service{
@@ -61,8 +73,10 @@ namespace service{
}
namespace remoting {
- RemotingClient *-- RemotingCommand:contains
- RemotingClient *-- ClientConfig:contains
+ RemotingClient *-- RemotingCommand
+ RemotingClient *-- ClientConfig
+ RemotingClient *-- ClientRequestProcessor
+ RemotingClient *-- SerializerHandler
}
@@ -75,5 +89,5 @@ namespace rocketmq_go{
note top of remoting.RemotingClient :(sync|aysc|oneWay)
note top of remoting :net,serialize,connect,request response
-note top of service.MqClient :mq method
+note top of service.MqClient :mq common method
@enduml
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/docs/roadmap.md
----------------------------------------------------------------------
diff --git a/rocketmq-go/docs/roadmap.md b/rocketmq-go/docs/roadmap.md
index e6eabff..0db9033 100644
--- a/rocketmq-go/docs/roadmap.md
+++ b/rocketmq-go/docs/roadmap.md
@@ -1,19 +1,5 @@
# RoadMap-Milestone1
-## Producer
-- [ ] ProducerType
- - [ ] DefaultProducer
-- [ ] API
- - [ ] Send
- - [ ] Sync
-- [ ] Other
- - [ ] DelayMessage
- - [ ] Config
- - [ ] MessageId Generate
- - [ ] CompressMsg
- - [ ] TimeOut
- - [ ] LoadBalance
- - [ ] DefaultTopic
## Consumer
- [ ] ConsumerType
- [ ] PushConsumer
@@ -47,23 +33,23 @@
- [ ] UpdateTopicRouteInfoFromNameServer
- [ ] PersistAllConsumerOffset
- [ ] ClearExpiredMessage(form consumer consumeMessageService)
-
-
-## Remoting
-- [ ] MqClientRequest
- - [ ] InvokeSync
- - [ ] InvokeAsync
- - [ ] InvokeOneWay
- [ ] ClientRemotingProcessor
+ - [ ] CHECK_TRANSACTION_STATE
- [ ] NOTIFY_CONSUMER_IDS_CHANGED
- [ ] RESET_CONSUMER_CLIENT_OFFSET
- [ ] GET_CONSUMER_STATUS_FROM_CLIENT
- [ ] GET_CONSUMER_RUNNING_INFO
- [ ] CONSUME_MESSAGE_DIRECTLY
-- [ ] Serialize
- - [ ] JSON
- - [ ] ROCKETMQ
-- [ ] NamesrvAddrChoosed(HA)
+
+## Remoting
+- [x] MqClientRequest
+ - [x] InvokeSync
+ - [x] InvokeAsync
+ - [x] InvokeOneWay
+- [x] Serialize
+ - [x] JSON
+ - [x] ROCKETMQ
+- [x] NamesrvAddrChoosed(HA)
# RoadMap-ALL
@@ -143,13 +129,6 @@
- [ ] PersistAllConsumerOffset
- [ ] ClearExpiredMessage(form consumer consumeMessageService)
- [ ] UploadFilterClassSource(FromHeartBeat/But Golang Not Easy To do this(Java Source))
-
-
-## Remoting
-- [ ] MqClientRequest
- - [ ] InvokeSync
- - [ ] InvokeAsync
- - [ ] InvokeOneWay
- [ ] ClientRemotingProcessor
- [ ] CHECK_TRANSACTION_STATE
- [ ] NOTIFY_CONSUMER_IDS_CHANGED
@@ -157,6 +136,11 @@
- [ ] GET_CONSUMER_STATUS_FROM_CLIENT
- [ ] GET_CONSUMER_RUNNING_INFO
- [ ] CONSUME_MESSAGE_DIRECTLY
+## Remoting
+- [ ] MqClientRequest
+ - [ ] InvokeSync
+ - [ ] InvokeAsync
+ - [ ] InvokeOneWay
- [ ] Serialize
- [ ] JSON
- [ ] ROCKETMQ
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/example/consumer_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/consumer_example.go b/rocketmq-go/example/consumer_example.go
index 5c0044f..af74c01 100644
--- a/rocketmq-go/example/consumer_example.go
+++ b/rocketmq-go/example/consumer_example.go
@@ -16,5 +16,40 @@
*/
package main
+import (
+ "errors"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/golang/glog"
+)
+
func main() {
+
+ // create a mqClientManager instance
+ var mqClientConfig = &rocketmq.MqClientConfig{}
+ var mqClientManager = rocketmq.NewMqClientManager(mqClientConfig)
+
+ // create rocketMq consumer
+ var consumerConfig = &rocketmq.MqConsumerConfig{}
+ var consumer1 = rocketmq.NewDefaultMQPushConsumer("testGroup", consumerConfig)
+ consumer1.Subscribe("testTopic", "*")
+ consumer1.RegisterMessageListener(func(msgs []model.MessageExt) model.ConsumeConcurrentlyResult {
+ var index = -1
+ for i, msg := range msgs {
+ // your code here,for example,print msg
+ glog.Info(msg)
+ var err = errors.New("error")
+ if err != nil {
+ break
+ }
+ index = i
+ }
+ return model.ConsumeConcurrentlyResult{ConsumeConcurrentlyStatus: model.CONSUME_SUCCESS, AckIndex: index}
+ })
+
+ //register consumer to mqClientManager
+ mqClientManager.RegisterConsumer(consumer1)
+
+ //start it
+ mqClientManager.Start()
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/example/rocketmq_client_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/rocketmq_client_example.go b/rocketmq-go/example/rocketmq_client_example.go
new file mode 100644
index 0000000..c6828c8
--- /dev/null
+++ b/rocketmq-go/example/rocketmq_client_example.go
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import (
+ "fmt"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+)
+
+func main() {
+
+ var clienConfig = config.NewClientConfig()
+ clienConfig.SetNameServerAddress("120.55.113.35:9876")
+
+ //use json serializer
+ var mqClient = service.MqClientInit(clienConfig, nil)
+ fmt.Println(mqClient.TryToFindTopicPublishInfo("GoLang"))
+ //&{false true [{GoLang broker-a 0} {GoLang broker-a 1} {GoLang broker-a 2} {GoLang broker-a 3}] 0xc420016800 0} <nil>
+
+ //use rocketmq serializer
+ constant.USE_HEADER_SERIALIZETYPE = constant.ROCKETMQ_SERIALIZE
+ var mqClient2 = service.MqClientInit(clienConfig, nil)
+ fmt.Println(mqClient2.TryToFindTopicPublishInfo("GoLang"))
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/config/client_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/client_config.go b/rocketmq-go/model/config/client_config.go
index d9ad88c..8a415d8 100644
--- a/rocketmq-go/model/config/client_config.go
+++ b/rocketmq-go/model/config/client_config.go
@@ -24,7 +24,8 @@ import (
// client common config
type ClientConfig struct {
- nameServerAddress string
+ nameServerAddress string // only this is in use
+
clientIP string
instanceName string
clientCallbackExecutorThreads int // TODO: clientCallbackExecutorThreads
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/config/consumer_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/consumer_config.go b/rocketmq-go/model/config/consumer_config.go
index a37eaa0..25f7585 100644
--- a/rocketmq-go/model/config/consumer_config.go
+++ b/rocketmq-go/model/config/consumer_config.go
@@ -16,5 +16,84 @@
*/
package config
+import "time"
+
type RocketMqConsumerConfig struct {
+ ConsumeFromWhere string
+ /**
+ * Minimum consumer thread number
+ */
+ //consumeThreadMin int
+ // /**
+ // * Max consumer thread number
+ // */
+ //consumeThreadMax int
+
+ /**
+ * Threshold for dynamic adjustment of the number of thread pool
+ */
+ //adjustThreadPoolNumsThreshold int // = 100000;
+
+ /**
+ * Concurrently max span offset.it has no effect on sequential consumption
+ */
+ ConsumeConcurrentlyMaxSpan int // = 2000;
+ /**
+ * Flow control threshold
+ */
+ PullThresholdForQueue int //= 1000;
+ /**
+ * Message pull Interval
+ */
+ PullInterval int64 //= 0;
+ /**
+ * Batch consumption size
+ */
+ ConsumeMessageBatchMaxSize int //= 1;
+ /**
+ * Batch pull size
+ */
+ PullBatchSize int //= 32;
+
+ /**
+ * Whether update subscription relationship when every pull
+ */
+ PostSubscriptionWhenPull bool //= false; //get subExpression
+
+ /**
+ * Whether the unit of subscription group
+ */
+ UnitMode bool // = false;
+ MaxReconsumeTimes int //= 16;
+ SuspendCurrentQueueTimeMillis int64 //= 1000;
+ ConsumeTimeout int64 //= 15 //minutes
+
+ //=========can not change
+ /**
+ * Delay some time when exception occur
+ */
+ PullTimeDelayMillsWhenException int64 //= 3000;
+ /**
+ * Flow control interval
+ */
+ PullTimeDelayMillsWhenFlowControl int64 //= 50;
+ /**
+ * Delay some time when suspend pull service
+ */
+ PullTimeDelayMillsWhenSuspend int64 //= 1000;
+ BrokerSuspendMaxTimeMillis int64 //1000 * 15;
+ ConsumerTimeoutMillisWhenSuspend int64 //= 1000 * 30;
+
+ /**
+ * Backtracking consumption time with second precision.time format is
+ * 20131223171201
+ * Implying Seventeen twelve and 01 seconds on December 23, 2013 year
+ * Default backtracking consumption time Half an hour ago
+ */
+ ConsumeTimestamp time.Time //when use CONSUME_FROM_TIMESTAMP
+}
+
+func NewRocketMqConsumerConfig() (consumerConfig *RocketMqConsumerConfig) {
+ consumerConfig = &RocketMqConsumerConfig{}
+ return
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/config.go b/rocketmq-go/model/constant/config.go
new file mode 100644
index 0000000..c48dfa5
--- /dev/null
+++ b/rocketmq-go/model/constant/config.go
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package constant
+
+//-------SerializeType-------
+var JSON_SERIALIZE byte = 0
+var ROCKETMQ_SERIALIZE byte = 1
+
+//-------SerializeType-------
+
+var USE_HEADER_SERIALIZETYPE = JSON_SERIALIZE
+
+var REMOTING_COMMAND_FLAG = 0
+var REMOTING_COMMAND_LANGUAGE = "OTHER"
+var REMOTING_COMMAND_VERSION int16 = 137
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/message_const.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/message_const.go b/rocketmq-go/model/constant/message_const.go
new file mode 100644
index 0000000..402d328
--- /dev/null
+++ b/rocketmq-go/model/constant/message_const.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package constant
+
+const (
+ PROPERTY_KEYS = "KEYS"
+ PROPERTY_TAGS = "TAGS"
+ PROPERTY_WAIT_STORE_MSG_OK = "WAIT"
+ PROPERTY_DELAY_TIME_LEVEL = "DELAY"
+ PROPERTY_RETRY_TOPIC = "RETRY_TOPIC"
+ PROPERTY_REAL_TOPIC = "REAL_TOPIC"
+ PROPERTY_REAL_QUEUE_ID = "REAL_QID"
+ PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"
+ PROPERTY_PRODUCER_GROUP = "PGROUP"
+ PROPERTY_MIN_OFFSET = "MIN_OFFSET"
+ PROPERTY_MAX_OFFSET = "MAX_OFFSET"
+ PROPERTY_BUYER_ID = "BUYER_ID"
+ PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"
+ PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG"
+ PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG"
+ PROPERTY_MQ2_FLAG = "MQ2_FLAG"
+ PROPERTY_RECONSUME_TIME = "RECONSUME_TIME"
+ PROPERTY_MSG_REGION = "MSG_REGION"
+ PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY"
+ PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES"
+ PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME"
+
+ COMMUNICATIONMODE_SYNC = "SYNC"
+ COMMUNICATIONMODE_ASYNC = "ASYNC"
+ COMMUNICATIONMODE_ONEWAY = "ONEWAY"
+)
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/message_sys_flag.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/message_sys_flag.go b/rocketmq-go/model/constant/message_sys_flag.go
new file mode 100644
index 0000000..a53c4fd
--- /dev/null
+++ b/rocketmq-go/model/constant/message_sys_flag.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package constant
+
+const (
+ CompressedFlag int32 = (0x1 << 0)
+ MultiTagsFlag int32 = (0x1 << 1)
+ TransactionNotType int32 = (0x0 << 2)
+ TransactionPreparedType int32 = (0x1 << 2)
+ TransactionCommitType int32 = (0x2 << 2)
+ TransactionRollbackType int32 = (0x3 << 2)
+)
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/mix_all.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/mix_all.go b/rocketmq-go/model/constant/mix_all.go
new file mode 100644
index 0000000..6abaabe
--- /dev/null
+++ b/rocketmq-go/model/constant/mix_all.go
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package constant
+
+const (
+ ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME"
+ ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir"
+ NAMESRV_ADDR_ENV = "NAMESRV_ADDR"
+ NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr"
+ MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel"
+ //WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", "jmenv.tbsite.net")
+ //WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr")
+ // http://jmenv.tbsite.net:8080/rocketmq/nsaddr
+ //WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP
+ DEFAULT_TOPIC = "TBW102"
+ BENCHMARK_TOPIC = "BenchmarkTest"
+ DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"
+ DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"
+ TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER"
+ FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER"
+ MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER"
+ CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"
+ SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP"
+ SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP"
+ SELF_TEST_TOPIC = "SELF_TEST_TOPIC"
+ OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT"
+ ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY"
+ CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION"
+ CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER"
+ CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL"
+ CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_"
+
+ //public static final List<String> LocalInetAddrs = getLocalInetAddress()
+ //Localhost = localhost()
+ //DEFAULT_CHARSET = "UTF-8"
+ MASTER_ID int64 = 0
+ CURRENT_JVM_PID
+
+ RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"
+
+ DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"
+ SYSTEM_TOPIC_PREFIX = "rmq_sys_"
+ UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY"
+ MAX_MESSAGE_BODY_SIZE int = 4 * 1024 * 1024 //4m
+ MAX_MESSAGE_TOPIC_SIZE int = 255 //255char
+
+ DEFAULT_TOPIC_QUEUE_NUMS int32 = 4
+)
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/perm.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/perm.go b/rocketmq-go/model/constant/perm.go
new file mode 100644
index 0000000..962d989
--- /dev/null
+++ b/rocketmq-go/model/constant/perm.go
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package constant
+
+const (
+ PERM_PRIORITY = 0x1 << 3
+ PERM_READ = 0x1 << 2
+ PERM_WRITE = 0x1 << 1
+ PERM_INHERIT = 0x1 << 0
+)
+
+func WriteAble(perm int32) (ret bool) {
+ ret = ((perm & PERM_WRITE) == PERM_WRITE)
+ return
+}
+func ReadAble(perm int32) (ret bool) {
+ ret = ((perm & PERM_READ) == PERM_READ)
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/pull_sys_flag.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/pull_sys_flag.go b/rocketmq-go/model/constant/pull_sys_flag.go
new file mode 100644
index 0000000..0a2921c
--- /dev/null
+++ b/rocketmq-go/model/constant/pull_sys_flag.go
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package constant
+
+const (
+ FLAG_COMMIT_OFFSET int32 = 0x1 << 0
+ FLAG_SUSPEND int32 = 0x1 << 1
+ FLAG_SUBSCRIPTION int32 = 0x1 << 2
+ FLAG_CLASS_FILTER int32 = 0x1 << 3
+)
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/consume_concurrently_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/consume_concurrently_result.go b/rocketmq-go/model/consume_concurrently_result.go
new file mode 100644
index 0000000..6e4df7b
--- /dev/null
+++ b/rocketmq-go/model/consume_concurrently_result.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+const (
+ CONSUME_SUCCESS = "CONSUME_SUCCESS"
+ RECONSUME_LATER = "RECONSUME_LATER"
+)
+
+type ConsumeConcurrentlyResult struct {
+ ConsumeConcurrentlyStatus string
+ AckIndex int
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/consume_message_directly_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/consume_message_directly_result.go b/rocketmq-go/model/consume_message_directly_result.go
new file mode 100644
index 0000000..a9af32e
--- /dev/null
+++ b/rocketmq-go/model/consume_message_directly_result.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+type ConsumeMessageDirectlyResult struct {
+ Order bool `json:"order"`
+ AutoCommit bool `json:"autoCommit"`
+ //CR_SUCCESS,
+ //CR_LATER,
+ //CR_ROLLBACK,
+ //CR_COMMIT,
+ //CR_THROW_EXCEPTION,
+ //CR_RETURN_NULL,
+ ConsumeResult string `json:"consumeResult"`
+ Remark string `json:"remark"`
+ SpentTimeMills int64 `json:"spentTimeMills"`
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/consumer_running_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/consumer_running_info.go b/rocketmq-go/model/consumer_running_info.go
new file mode 100644
index 0000000..80c39ae
--- /dev/null
+++ b/rocketmq-go/model/consumer_running_info.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import "encoding/json"
+
+type ConsumerRunningInfo struct {
+ Properties map[string]string `json:"properties"`
+ MqTable map[MessageQueue]ProcessQueueInfo `json:"mqTable"`
+ // todo
+ //private TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>();
+ //
+ //private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>();
+ //
+ //private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>();
+ //
+ //private String jstack;
+}
+
+func (self *ConsumerRunningInfo) Encode() (jsonByte []byte, err error) {
+ mqTableJsonStr := "{"
+ first := true
+ var keyJson []byte
+ var valueJson []byte
+
+ for key, value := range self.MqTable {
+ keyJson, err = json.Marshal(key)
+ if err != nil {
+ return
+ }
+ valueJson, err = json.Marshal(value)
+ if err != nil {
+ return
+ }
+ if first == false {
+ mqTableJsonStr = mqTableJsonStr + ","
+ }
+ mqTableJsonStr = mqTableJsonStr + string(keyJson) + ":" + string(valueJson)
+ first = false
+ }
+ mqTableJsonStr = mqTableJsonStr + "}"
+ var propertiesJson []byte
+ propertiesJson, err = json.Marshal(self.Properties)
+ if err != nil {
+ return
+ }
+ jsonByte = self.formatEncode("\"properties\"", string(propertiesJson), "\"mqTable\"", string(mqTableJsonStr))
+ return
+}
+func (self *ConsumerRunningInfo) formatEncode(kVList ...string) []byte {
+ jsonStr := "{"
+ first := true
+ for i := 0; i+1 < len(kVList); i += 2 {
+ if first == false {
+ jsonStr = jsonStr + ","
+ }
+ keyJson := kVList[i]
+ valueJson := kVList[i+1]
+
+ jsonStr = jsonStr + string(keyJson) + ":" + string(valueJson)
+
+ first = false
+ }
+ jsonStr = jsonStr + "}"
+ return []byte(jsonStr)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/consume_message_directly_result_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/consume_message_directly_result_request_header.go b/rocketmq-go/model/header/consume_message_directly_result_request_header.go
new file mode 100644
index 0000000..a593d51
--- /dev/null
+++ b/rocketmq-go/model/header/consume_message_directly_result_request_header.go
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+type ConsumeMessageDirectlyResultRequestHeader struct {
+ ConsumerGroup string `json:"consumerGroup"`
+ ClientId string `json:"clientId"`
+ MsgId string `json:"msgId"`
+ BrokerName string `json:"brokerName"`
+}
+
+func (self *ConsumeMessageDirectlyResultRequestHeader) FromMap(headerMap map[string]interface{}) {
+ self.ConsumerGroup = headerMap["consumerGroup"].(string)
+ self.ClientId = headerMap["clientId"].(string)
+ self.MsgId = headerMap["msgId"].(string)
+ self.BrokerName = headerMap["brokerName"].(string)
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/consumer_send_msg_back_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/consumer_send_msg_back_request_header.go b/rocketmq-go/model/header/consumer_send_msg_back_request_header.go
new file mode 100644
index 0000000..4e101c6
--- /dev/null
+++ b/rocketmq-go/model/header/consumer_send_msg_back_request_header.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+type ConsumerSendMsgBackRequestHeader struct {
+ Offset int64
+ Group string
+ DelayLevel int32
+ OriginMsgId string
+ OriginTopic string
+ UnitMode bool
+ MaxReconsumeTimes int32
+}
+
+func (self *ConsumerSendMsgBackRequestHeader) FromMap(headerMap map[string]interface{}) {
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_consumer_list_by_group.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/get_consumer_list_by_group.go b/rocketmq-go/model/header/get_consumer_list_by_group.go
new file mode 100644
index 0000000..e06e1fa
--- /dev/null
+++ b/rocketmq-go/model/header/get_consumer_list_by_group.go
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+type GetConsumerListByGroupRequestHeader struct {
+ ConsumerGroup string `json:"consumerGroup"`
+}
+
+func (self *GetConsumerListByGroupRequestHeader) FromMap(headerMap map[string]interface{}) {
+ return
+}
+
+type GetConsumerListByGroupResponseBody struct {
+ ConsumerIdList []string
+}
+
+func (self *GetConsumerListByGroupResponseBody) FromMap(headerMap map[string]interface{}) {
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_consumer_running_info_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/get_consumer_running_info_request_header.go b/rocketmq-go/model/header/get_consumer_running_info_request_header.go
new file mode 100644
index 0000000..5e7487f
--- /dev/null
+++ b/rocketmq-go/model/header/get_consumer_running_info_request_header.go
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+type GetConsumerRunningInfoRequestHeader struct {
+ ConsumerGroup string `json:"consumerGroup"`
+ ClientId string `json:"clientId"`
+ JstackEnable bool `json:"jstackEnable"`
+}
+
+func (self *GetConsumerRunningInfoRequestHeader) FromMap(headerMap map[string]interface{}) {
+ self.ConsumerGroup = headerMap["consumerGroup"].(string)
+ self.ClientId = headerMap["clientId"].(string)
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_max_offset_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/get_max_offset_request_header.go b/rocketmq-go/model/header/get_max_offset_request_header.go
new file mode 100644
index 0000000..6d4723e
--- /dev/null
+++ b/rocketmq-go/model/header/get_max_offset_request_header.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+type GetMaxOffsetRequestHeader struct {
+ Topic string `json:"topic"`
+ QueueId int32 `json:"queueId"`
+}
+
+func (self *GetMaxOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_max_offset_response_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/get_max_offset_response_header.go b/rocketmq-go/model/header/get_max_offset_response_header.go
new file mode 100644
index 0000000..eea6c2c
--- /dev/null
+++ b/rocketmq-go/model/header/get_max_offset_response_header.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+
+type QueryOffsetResponseHeader struct {
+ Offset int64 `json:"offset"`
+}
+
+func (self *QueryOffsetResponseHeader) FromMap(headerMap map[string]interface{}) {
+ self.Offset = util.StrToInt64WithDefaultValue(headerMap["offset"].(string), -1)
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_route_info_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/get_route_info_request_header.go b/rocketmq-go/model/header/get_route_info_request_header.go
new file mode 100644
index 0000000..7c33c25
--- /dev/null
+++ b/rocketmq-go/model/header/get_route_info_request_header.go
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+type GetRouteInfoRequestHeader struct {
+ Topic string `json:"topic"`
+}
+
+func (self *GetRouteInfoRequestHeader) FromMap(headerMap map[string]interface{}) {
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/query_consumer_offset_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/query_consumer_offset_request_header.go b/rocketmq-go/model/header/query_consumer_offset_request_header.go
new file mode 100644
index 0000000..ed455e7
--- /dev/null
+++ b/rocketmq-go/model/header/query_consumer_offset_request_header.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+type QueryConsumerOffsetRequestHeader struct {
+ ConsumerGroup string `json:"consumerGroup"`
+ Topic string `json:"topic"`
+ QueueId int32 `json:"queueId"`
+}
+
+func (self *QueryConsumerOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/reset_offset_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/reset_offset_request_header.go b/rocketmq-go/model/header/reset_offset_request_header.go
new file mode 100644
index 0000000..642b600
--- /dev/null
+++ b/rocketmq-go/model/header/reset_offset_request_header.go
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+ "strconv"
+)
+
+type ResetOffsetRequestHeader struct {
+ Topic string `json:"topic"`
+ Group string `json:"group"`
+ Timestamp int64 `json:"timestamp"`
+ IsForce bool `json:"isForce"`
+}
+
+func (self *ResetOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
+ self.Group = headerMap["group"].(string)
+ self.Topic = headerMap["topic"].(string)
+ self.Timestamp = util.StrToInt64WithDefaultValue(headerMap["timestamp"].(string), -1)
+ self.IsForce, _ = strconv.ParseBool(headerMap["isForce"].(string))
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/search_offset_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/search_offset_request_header.go b/rocketmq-go/model/header/search_offset_request_header.go
new file mode 100644
index 0000000..5088eac
--- /dev/null
+++ b/rocketmq-go/model/header/search_offset_request_header.go
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+import ()
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+
+type SearchOffsetRequestHeader struct {
+ Topic string `json:"topic"`
+ QueueId int32 `json:"queueId"`
+ Timestamp int64 `json:"timestamp"`
+}
+
+func (self *SearchOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
+ self.Topic = headerMap["topic"].(string)
+ self.Topic = headerMap["queueId"].(string)
+ self.Timestamp = util.StrToInt64WithDefaultValue(headerMap["timestamp"].(string), -1)
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/update_consumer_offset_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/update_consumer_offset_request_header.go b/rocketmq-go/model/header/update_consumer_offset_request_header.go
new file mode 100644
index 0000000..42612db
--- /dev/null
+++ b/rocketmq-go/model/header/update_consumer_offset_request_header.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package header
+
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+
+type UpdateConsumerOffsetRequestHeader struct {
+ ConsumerGroup string `json:"consumerGroup"`
+ Topic string `json:"topic"`
+ QueueId int32 `json:"queueId"`
+ CommitOffset int64 `json:"commitOffset"`
+}
+
+func (self *UpdateConsumerOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
+ self.ConsumerGroup = headerMap["consumerGroup"].(string)
+ self.QueueId = util.StrToInt32WithDefaultValue(util.ReadString(headerMap["queueId"]), 0)
+ self.CommitOffset = util.StrToInt64WithDefaultValue(headerMap["commitOffset"].(string), -1)
+ self.Topic = util.ReadString(headerMap["topic"])
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/heart_beat.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/heart_beat.go b/rocketmq-go/model/heart_beat.go
new file mode 100644
index 0000000..fc5eded
--- /dev/null
+++ b/rocketmq-go/model/heart_beat.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+type ConsumerData struct {
+ GroupName string
+ ConsumeType string
+ MessageModel string
+ ConsumeFromWhere string
+ SubscriptionDataSet []*SubscriptionData
+ UnitMode bool
+}
+type ProducerData struct {
+ GroupName string
+}
+type HeartbeatData struct {
+ ClientId string
+ ConsumerDataSet []*ConsumerData
+ ProducerDataSet []*ProducerData
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message.go b/rocketmq-go/model/message.go
new file mode 100644
index 0000000..0cb3d97
--- /dev/null
+++ b/rocketmq-go/model/message.go
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+ "strconv"
+ "strings"
+)
+
+type Message struct {
+ Topic string
+ Flag int
+ Properties map[string]string
+ Body []byte
+}
+
+func (self *Message) SetTag(tag string) {
+ if self.Properties == nil {
+ self.Properties = make(map[string]string)
+ }
+ self.Properties[constant.PROPERTY_TAGS] = tag
+}
+func (self *Message) GetTag() (tag string) {
+ if self.Properties != nil {
+ tag = self.Properties[constant.PROPERTY_TAGS]
+ }
+ return
+}
+
+func (self *Message) SetKeys(keys []string) {
+ if self.Properties == nil {
+ self.Properties = make(map[string]string)
+ }
+ self.Properties[constant.PROPERTY_KEYS] = strings.Join(keys, KEY_SEPARATOR)
+}
+
+func (self *Message) SetDelayTimeLevel(delayTimeLevel int) {
+ if self.Properties == nil {
+ self.Properties = make(map[string]string)
+ }
+ self.Properties[constant.PROPERTY_DELAY_TIME_LEVEL] = util.IntToString(delayTimeLevel)
+}
+func (self *Message) SetWaitStoreMsgOK(waitStoreMsgOK bool) {
+ if self.Properties == nil {
+ self.Properties = make(map[string]string)
+ }
+ self.Properties[constant.PROPERTY_WAIT_STORE_MSG_OK] = strconv.FormatBool(waitStoreMsgOK)
+}
+func (self *Message) GeneratorMsgUniqueKey() {
+ if self.Properties == nil {
+ self.Properties = make(map[string]string)
+ }
+ if len(self.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX]) > 0 {
+ return
+ }
+ self.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX] = util.GeneratorMessageClientId()
+}
+
+func (self *MessageExt) GetMsgUniqueKey() string {
+ if self.Properties != nil {
+ originMessageId := self.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX]
+ if len(originMessageId) > 0 {
+ return originMessageId
+ }
+ }
+ return self.MsgId
+}
+
+func (self *Message) SetOriginMessageId(messageId string) {
+ if self.Properties == nil {
+ self.Properties = make(map[string]string)
+ }
+ self.Properties[constant.PROPERTY_ORIGIN_MESSAGE_ID] = messageId
+}
+
+func (self *Message) SetRetryTopic(retryTopic string) {
+ if self.Properties == nil {
+ self.Properties = make(map[string]string)
+ }
+ self.Properties[constant.PROPERTY_RETRY_TOPIC] = retryTopic
+}
+func (self *Message) SetReconsumeTime(reConsumeTime int) {
+ if self.Properties == nil {
+ self.Properties = make(map[string]string)
+ }
+ self.Properties[constant.PROPERTY_RECONSUME_TIME] = util.IntToString(reConsumeTime)
+}
+
+func (self *Message) GetReconsumeTimes() (reConsumeTime int) {
+ reConsumeTime = 0
+ if self.Properties != nil {
+ reConsumeTimeStr := self.Properties[constant.PROPERTY_RECONSUME_TIME]
+ if len(reConsumeTimeStr) > 0 {
+ reConsumeTime = util.StrToIntWithDefaultValue(reConsumeTimeStr, 0)
+ }
+ }
+ return
+}
+
+func (self *Message) SetMaxReconsumeTimes(maxConsumeTime int) {
+ if self.Properties == nil {
+ self.Properties = make(map[string]string)
+ }
+ self.Properties[constant.PROPERTY_MAX_RECONSUME_TIMES] = util.IntToString(maxConsumeTime)
+}
+
+func (self *Message) GetMaxReconsumeTimes() (maxConsumeTime int) {
+ maxConsumeTime = 0
+ if self.Properties != nil {
+ reConsumeTimeStr := self.Properties[constant.PROPERTY_MAX_RECONSUME_TIMES]
+ if len(reConsumeTimeStr) > 0 {
+ maxConsumeTime = util.StrToIntWithDefaultValue(reConsumeTimeStr, 0)
+ }
+ }
+ return
+}
+
+var KEY_SEPARATOR string = " "
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message/message.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message/message.go b/rocketmq-go/model/message/message.go
index 633446c..1dcd365 100644
--- a/rocketmq-go/model/message/message.go
+++ b/rocketmq-go/model/message/message.go
@@ -255,7 +255,7 @@ func (msg *Message) removeProperty(k, v string) string {
delete(msg.properties, k)
return v
}
- return nil
+ return ""
}
func (msg *Message) String() string {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message_ext.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message_ext.go b/rocketmq-go/model/message_ext.go
new file mode 100644
index 0000000..9a3aacb
--- /dev/null
+++ b/rocketmq-go/model/message_ext.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+ "math"
+ "strconv"
+ "time"
+)
+
+type MessageExt struct {
+ *Message
+ QueueId int32
+ StoreSize int32
+ QueueOffset int64
+ SysFlag int32
+ BornTimestamp int64
+ BornHost string
+ StoreTimestamp int64
+ StoreHost string
+ MsgId string
+ CommitLogOffset int64
+ BodyCRC int32
+ ReconsumeTimes int32
+ PreparedTransactionOffset int64
+
+ propertyConsumeStartTimestamp string // race condition
+}
+
+func (self *MessageExt) GetOriginMessageId() string {
+ if self.Properties != nil {
+ originMessageId := self.Properties[constant.PROPERTY_ORIGIN_MESSAGE_ID]
+ if len(originMessageId) > 0 {
+ return originMessageId
+ }
+ }
+ return self.MsgId
+}
+
+func (self *MessageExt) GetConsumeStartTime() int64 {
+ if len(self.propertyConsumeStartTimestamp) > 0 {
+ return util.StrToInt64WithDefaultValue(self.propertyConsumeStartTimestamp, -1)
+ }
+ return math.MaxInt64
+}
+
+func (self *MessageExt) SetConsumeStartTime() {
+ if self.Properties == nil {
+ self.Properties = make(map[string]string)
+ }
+ nowTime := strconv.FormatInt(time.Now().UnixNano()/1000000, 10)
+ self.Properties[constant.PROPERTY_KEYS] = nowTime
+ self.propertyConsumeStartTimestamp = nowTime
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message_listener.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message_listener.go b/rocketmq-go/model/message_listener.go
new file mode 100644
index 0000000..7ad2054
--- /dev/null
+++ b/rocketmq-go/model/message_listener.go
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+type MessageListener func(msgs []MessageExt) ConsumeConcurrentlyResult
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message_queue.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message_queue.go b/rocketmq-go/model/message_queue.go
new file mode 100644
index 0000000..27d70a6
--- /dev/null
+++ b/rocketmq-go/model/message_queue.go
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+type MessageQueue struct {
+ Topic string `json:"topic"`
+ BrokerName string `json:"brokerName"`
+ QueueId int32 `json:"queueId"`
+}
+
+func (self *MessageQueue) clone() *MessageQueue {
+ no := new(MessageQueue)
+ no.Topic = self.Topic
+ no.QueueId = self.QueueId
+ no.BrokerName = self.BrokerName
+ return no
+}
+
+type MessageQueues []*MessageQueue
+
+func (self MessageQueues) Less(i, j int) bool {
+ imq := self[i]
+ jmq := self[j]
+
+ if imq.Topic < jmq.Topic {
+ return true
+ } else if imq.Topic < jmq.Topic {
+ return false
+ }
+
+ if imq.BrokerName < jmq.BrokerName {
+ return true
+ } else if imq.BrokerName < jmq.BrokerName {
+ return false
+ }
+
+ if imq.QueueId < jmq.QueueId {
+ return true
+ } else {
+ return false
+ }
+}
+
+func (self MessageQueues) Swap(i, j int) {
+ self[i], self[j] = self[j], self[i]
+}
+
+func (self MessageQueues) Len() int {
+ return len(self)
+}
+
+func (self MessageQueue) Equals(messageQueue *MessageQueue) bool {
+ if self.QueueId != messageQueue.QueueId {
+ return false
+ }
+ if self.Topic != messageQueue.Topic {
+ return false
+ }
+ if self.BrokerName != messageQueue.BrokerName {
+ return false
+ }
+ return true
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/process_queue.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/process_queue.go b/rocketmq-go/model/process_queue.go
new file mode 100644
index 0000000..285cbda
--- /dev/null
+++ b/rocketmq-go/model/process_queue.go
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+ "github.com/emirpasic/gods/maps/treemap"
+ "github.com/golang/glog"
+ "sync"
+ "time"
+)
+
+type ProcessQueue struct {
+ msgTreeMap *treemap.Map // int | MessageExt
+ msgCount int
+ lockTreeMap sync.RWMutex
+ locked bool
+ lastPullTimestamp time.Time
+ lastConsumeTimestamp time.Time
+ lastLockTimestamp time.Time
+ lockConsume sync.RWMutex
+ consuming bool
+ queueOffsetMax int64
+ dropped bool
+ msgAccCnt int64 //accumulation message count
+ tryUnlockTimes int64
+ msgTreeMapToBeConsume *treemap.Map
+}
+
+func NewProcessQueue() (processQueue *ProcessQueue) {
+ processQueue = new(ProcessQueue)
+ processQueue.dropped = false
+ processQueue.msgTreeMap = treemap.NewWithIntComparator()
+ processQueue.msgTreeMapToBeConsume = treemap.NewWithIntComparator()
+
+ return
+}
+func (self *ProcessQueue) GetMsgCount() int {
+ defer self.lockTreeMap.Unlock()
+ self.lockTreeMap.Lock()
+ return self.msgCount
+}
+
+func (self *ProcessQueue) Clear() {
+ defer self.lockTreeMap.Unlock()
+ self.lockTreeMap.Lock()
+ self.SetDrop(true)
+ self.msgTreeMap.Clear()
+ self.msgCount = 0
+ self.queueOffsetMax = 0
+
+}
+
+func (self *ProcessQueue) ChangeToProcessQueueInfo() (processQueueInfo ProcessQueueInfo) {
+ defer self.lockTreeMap.Unlock()
+ self.lockTreeMap.Lock()
+ processQueueInfo = ProcessQueueInfo{}
+ minOffset := -1
+ maxOffset := -1
+ minKey, _ := self.msgTreeMap.Min()
+ if minKey != nil {
+ minOffset = minKey.(int)
+ }
+ maxKey, _ := self.msgTreeMap.Max()
+ if maxKey != nil {
+ maxOffset = maxKey.(int)
+ }
+ processQueueInfo.CachedMsgCount = int32(self.msgCount)
+ processQueueInfo.CachedMsgMinOffset = int64(maxOffset)
+ processQueueInfo.CachedMsgMaxOffset = int64(minOffset)
+ //processQueueInfo.CommitOffset = -123 // todo
+ processQueueInfo.Droped = self.dropped
+ processQueueInfo.LastConsumeTimestamp = self.lastConsumeTimestamp.UnixNano()
+ processQueueInfo.LastPullTimestamp = self.lastPullTimestamp.UnixNano()
+ //processQueueInfo.
+
+ return
+}
+
+func (self *ProcessQueue) DeleteExpireMsg(queueOffset int) {
+ defer self.lockTreeMap.Unlock()
+ self.lockTreeMap.Lock()
+ key, _ := self.msgTreeMap.Min()
+ if key == nil {
+ return
+ }
+ offset := key.(int)
+ glog.Infof("look min key and offset %d %s", offset, queueOffset)
+ if queueOffset == offset {
+ self.msgTreeMap.Remove(queueOffset)
+ self.msgCount = self.msgTreeMap.Size()
+ }
+}
+
+func (self *ProcessQueue) GetMinMessageInTree() (offset int, messagePoint *MessageExt) {
+ defer self.lockTreeMap.Unlock()
+ self.lockTreeMap.Lock()
+ key, value := self.msgTreeMap.Min()
+ if key == nil || value == nil {
+ return
+ }
+ offset = key.(int)
+
+ message := value.(MessageExt)
+ messagePoint = &message
+ return
+}
+
+func (self *ProcessQueue) SetDrop(drop bool) {
+ self.dropped = drop
+}
+func (self *ProcessQueue) IsDropped() bool {
+ return self.dropped
+}
+func (self *ProcessQueue) GetMaxSpan() int {
+ defer self.lockTreeMap.Unlock()
+ self.lockTreeMap.Lock()
+ if self.msgTreeMap.Empty() {
+ return 0
+ }
+ minKey, _ := self.msgTreeMap.Min()
+ minOffset := minKey.(int)
+ maxKey, _ := self.msgTreeMap.Max()
+ maxOffset := maxKey.(int)
+ return maxOffset - minOffset
+}
+
+func (self *ProcessQueue) RemoveMessage(msgs []MessageExt) (offset int64) {
+ now := time.Now()
+ offset = -1
+ defer self.lockTreeMap.Unlock()
+ self.lockTreeMap.Lock()
+ self.lastConsumeTimestamp = now
+ if self.msgCount > 0 {
+ maxKey, _ := self.msgTreeMap.Max()
+ offset = int64(maxKey.(int)) + 1
+ for _, msg := range msgs {
+ self.msgTreeMap.Remove(int(msg.QueueOffset))
+ }
+ self.msgCount = self.msgTreeMap.Size()
+ if self.msgCount > 0 {
+ minKey, _ := self.msgTreeMap.Min()
+ offset = int64(minKey.(int))
+ }
+ }
+ return
+}
+
+func (self *ProcessQueue) PutMessage(msgs []MessageExt) (dispatchToConsume bool) {
+ dispatchToConsume = false
+ msgsLen := len(msgs)
+ if msgsLen == 0 {
+ return
+ }
+ defer self.lockTreeMap.Unlock()
+ self.lockTreeMap.Lock()
+
+ for _, msg := range msgs {
+ self.msgTreeMap.Put(int(msg.QueueOffset), msg)
+
+ }
+ self.msgCount = self.msgTreeMap.Size()
+ maxOffset, _ := self.msgTreeMap.Max()
+ self.queueOffsetMax = int64(maxOffset.(int))
+ if self.msgCount > 0 && !self.consuming {
+ dispatchToConsume = true
+ self.consuming = true
+ }
+ lastMsg := msgs[msgsLen-1]
+ remoteMaxOffset := util.StrToInt64WithDefaultValue(lastMsg.Properties[constant.PROPERTY_MAX_OFFSET], -1)
+ if remoteMaxOffset > 0 {
+ accTotal := remoteMaxOffset - lastMsg.QueueOffset
+ if accTotal > 0 {
+ self.msgAccCnt = accTotal
+ }
+ }
+ return
+}
+
+//func (self *ProcessQueue) TakeMessages(batchSize int) (messageToConsumeList []MessageExt) {
+// defer self.lockTreeMap.Unlock()
+// self.lockTreeMap.Lock()
+// self.lastConsumeTimestamp = time.Now()
+// it := self.msgTreeMap.Iterator()
+// nowIndex := 0
+// for it.Next() {
+// offset, message := it.Key(), it.Value()
+// if (nowIndex >= batchSize) {
+// break
+// }
+// self.msgTreeMap.Remove(offset)
+// self.msgTreeMapToBeConsume.Put(offset, message)
+// //messageToConsumeList = append(messageToConsumeList, message)
+// }
+// if (len(messageToConsumeList) == 0) {
+// self.consuming = false
+// }
+// return
+//}
+
+/**
+#
+public final static long RebalanceLockMaxLiveTime =Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
+public final static long RebalanceLockInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
+#并发消费过期的
+ case CONSUME_PASSIVELY:
+ pq.setDropped(true);
+ if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+ it.remove();
+ changed = true;
+ log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
+ consumerGroup, mq);
+ }
+ break;
+private final static long PullMaxIdleTime = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
+private final Logger log = ClientLogger.getLog();
+private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
+
+private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
+private final AtomicLong msgCount = new AtomicLong();
+private final Lock lockConsume = new ReentrantLock();
+private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
+private final AtomicLong tryUnlockTimes = new AtomicLong(0);
+private volatile long queueOffsetMax = 0L;
+private volatile boolean dropped = false;
+private volatile long lastPullTimestamp = System.currentTimeMillis();
+private volatile long lastConsumeTimestamp = System.currentTimeMillis();
+private volatile boolean locked = false;
+private volatile long lastLockTimestamp = System.currentTimeMillis();
+private volatile boolean consuming = false;
+private volatile long msgAccCnt = 0;
+
+ public boolean isLockExpired() {
+ boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > RebalanceLockMaxLiveTime;
+ return result;
+ }
+
+
+ public boolean isPullExpired() {
+ boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PullMaxIdleTime;
+ return result;
+ }
+
+param pushConsumer
+cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
+if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
+return;
+}
+
+int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
+for (int i = 0; i < loop; i++) {
+MessageExt msg = null;
+try {
+this.lockTreeMap.readLock().lockInterruptibly();
+try {
+if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
+msg = msgTreeMap.firstEntry().getValue();
+} else {
+
+break;
+}
+} finally {
+this.lockTreeMap.readLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("getExpiredMsg exception", e);
+}
+
+try {
+
+pushConsumer.sendMessageBack(msg, 3);
+log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
+try {
+msgTreeMap.remove(msgTreeMap.firstKey());
+} catch (Exception e) {
+log.error("send expired msg exception", e);
+}
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("getExpiredMsg exception", e);
+}
+} catch (Exception e) {
+log.error("send expired msg exception", e);
+}
+}
+}
+
+
+public boolean putMessage(final List<MessageExt> msgs) {
+boolean dispatchToConsume = false;
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+int validMsgCnt = 0;
+for (MessageExt msg : msgs) {
+MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
+if (null == old) {
+validMsgCnt++;
+this.queueOffsetMax = msg.getQueueOffset();
+}
+}
+msgCount.addAndGet(validMsgCnt);
+
+if (!msgTreeMap.isEmpty() && !this.consuming) {
+dispatchToConsume = true;
+this.consuming = true;
+}
+
+if (!msgs.isEmpty()) {
+MessageExt messageExt = msgs.get(msgs.size() - 1);
+String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
+if (property != null) {
+long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
+if (accTotal > 0) {
+this.msgAccCnt = accTotal;
+}
+}
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("putMessage exception", e);
+}
+
+return dispatchToConsume;
+}
+
+
+public long getMaxSpan() {
+try {
+this.lockTreeMap.readLock().lockInterruptibly();
+try {
+if (!this.msgTreeMap.isEmpty()) {
+return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
+}
+} finally {
+this.lockTreeMap.readLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("getMaxSpan exception", e);
+}
+
+return 0;
+}
+
+
+public long removeMessage(final List<MessageExt> msgs) { //treeMap是维护了没有消费的 为了处理过期使用
+long result = -1;
+final long now = System.currentTimeMillis();
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+this.lastConsumeTimestamp = now;
+try {
+if (!msgTreeMap.isEmpty()) {
+result = this.queueOffsetMax + 1;
+int removedCnt = 0;
+for (MessageExt msg : msgs) {
+MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
+if (prev != null) {
+removedCnt--;
+}
+}
+msgCount.addAndGet(removedCnt);
+
+if (!msgTreeMap.isEmpty()) {
+result = msgTreeMap.firstKey();
+}
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (Throwable t) {
+log.error("removeMessage exception", t);
+}
+
+return result;
+}
+
+
+public TreeMap<Long, MessageExt> getMsgTreeMap() {
+return msgTreeMap;
+}
+
+
+public AtomicLong getMsgCount() {
+return msgCount;
+}
+
+
+public boolean isDropped() {
+return dropped;
+}
+
+
+public void setDropped(boolean dropped) {
+this.dropped = dropped;
+}
+
+public boolean isLocked() {
+return locked;
+}
+
+public void setLocked(boolean locked) {
+this.locked = locked;
+}
+
+public void rollback() {
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+this.msgTreeMap.putAll(this.msgTreeMapTemp);
+this.msgTreeMapTemp.clear();
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("rollback exception", e);
+}
+}
+
+
+public long commit() {
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+Long offset = this.msgTreeMapTemp.lastKey();
+msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
+this.msgTreeMapTemp.clear();
+if (offset != null) {
+return offset + 1;
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("commit exception", e);
+}
+
+return -1;
+}
+
+
+public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+for (MessageExt msg : msgs) {
+this.msgTreeMapTemp.remove(msg.getQueueOffset());
+this.msgTreeMap.put(msg.getQueueOffset(), msg);
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("makeMessageToCosumeAgain exception", e);
+}
+}
+
+
+public List<MessageExt> takeMessags(final int batchSize) {
+List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
+final long now = System.currentTimeMillis();
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+this.lastConsumeTimestamp = now;
+try {
+if (!this.msgTreeMap.isEmpty()) {
+for (int i = 0; i < batchSize; i++) {
+Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
+if (entry != null) {
+result.add(entry.getValue());
+msgTreeMapTemp.put(entry.getKey(), entry.getValue());
+} else {
+break;
+}
+}
+}
+
+if (result.isEmpty()) {
+consuming = false;
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("take Messages exception", e);
+}
+
+return result;
+}
+
+
+public boolean hasTempMessage() {
+try {
+this.lockTreeMap.readLock().lockInterruptibly();
+try {
+return !this.msgTreeMap.isEmpty();
+} finally {
+this.lockTreeMap.readLock().unlock();
+}
+} catch (InterruptedException e) {
+}
+
+return true;
+}
+
+
+public void clear() {
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+this.msgTreeMap.clear();
+this.msgTreeMapTemp.clear();
+this.msgCount.set(0);
+this.queueOffsetMax = 0L;
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("rollback exception", e);
+}
+}
+
+
+
+
+public void setLastLockTimestamp(long lastLockTimestamp) {
+this.lastLockTimestamp = lastLockTimestamp;
+}
+
+
+public Lock getLockConsume() {
+return lockConsume;
+}
+
+
+
+
+public void setLastPullTimestamp(long lastPullTimestamp) {
+this.lastPullTimestamp = lastPullTimestamp;
+}
+
+
+public long getMsgAccCnt() {
+return msgAccCnt;
+}
+
+
+
+public long getTryUnlockTimes() {
+return this.tryUnlockTimes.get();
+}
+
+
+public void incTryUnlockTimes() {
+this.tryUnlockTimes.incrementAndGet();
+}
+
+
+public void fillProcessQueueInfo(final ProcessQueueInfo info) {
+try {
+this.lockTreeMap.readLock().lockInterruptibly();
+
+if (!this.msgTreeMap.isEmpty()) {
+info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
+info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
+info.setCachedMsgCount(this.msgTreeMap.size());
+}
+
+if (!this.msgTreeMapTemp.isEmpty()) {
+info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
+info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
+info.setTransactionMsgCount(this.msgTreeMapTemp.size());
+}
+
+info.setLocked(this.locked);
+info.setTryUnlockTimes(this.tryUnlockTimes.get());
+info.setLastLockTimestamp(this.lastLockTimestamp);
+
+info.setDroped(this.dropped);
+info.setLastPullTimestamp(this.lastPullTimestamp);
+info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
+} catch (Exception e) {
+} finally {
+this.lockTreeMap.readLock().unlock();
+}
+}
+
+
+
+*/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/process_queue_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/process_queue_info.go b/rocketmq-go/model/process_queue_info.go
new file mode 100644
index 0000000..6bd71bd
--- /dev/null
+++ b/rocketmq-go/model/process_queue_info.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+type ProcessQueueInfo struct {
+ CommitOffset int64 `json:"commitOffset"`
+
+ CachedMsgMinOffset int64 `json:"cachedMsgMinOffset"`
+ CachedMsgMaxOffset int64 `json:"cachedMsgMaxOffset"`
+ CachedMsgCount int32 `json:"cachedMsgCount"`
+
+ TransactionMsgMinOffset int64 `json:"transactionMsgMinOffset"`
+ TransactionMsgMaxOffset int64 `json:"transactionMsgMaxOffset"`
+ TransactionMsgCount int32 `json:"transactionMsgCount"`
+
+ Locked bool `json:"locked"`
+ TryUnlockTimes int64 `json:"tryUnlockTimes"`
+ LastLockTimestamp int64 `json:"lastLockTimestamp"`
+
+ Droped bool `json:"droped"`
+ LastPullTimestamp int64 `json:"lastPullTimestamp"`
+ LastConsumeTimestamp int64 `json:"lastConsumeTimestamp"`
+}
+
+//func (self ProcessQueueInfo) BuildFromProcessQueue(processQueue ProcessQueue) (processQueueInfo ProcessQueueInfo) {
+// processQueueInfo = ProcessQueueInfo{}
+// //processQueueInfo.CommitOffset =
+// processQueueInfo.CachedMsgCount = processQueue.GetMsgCount()
+// processQueueInfo.CachedMsgCount
+// return
+//}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/pull_request.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/pull_request.go b/rocketmq-go/model/pull_request.go
new file mode 100644
index 0000000..bc1a46f
--- /dev/null
+++ b/rocketmq-go/model/pull_request.go
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+type PullRequest struct {
+ ConsumerGroup string
+ MessageQueue *MessageQueue
+ ProcessQueue *ProcessQueue
+ NextOffset int64
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/reset_offset_body.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/reset_offset_body.go b/rocketmq-go/model/reset_offset_body.go
new file mode 100644
index 0000000..1a0221d
--- /dev/null
+++ b/rocketmq-go/model/reset_offset_body.go
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "encoding/json"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+ "github.com/golang/glog"
+)
+
+type ResetOffsetBody struct {
+ OffsetTable map[MessageQueue]int64 `json:"offsetTable"`
+}
+
+func (self *ResetOffsetBody) Decode(data []byte) (err error) {
+ self.OffsetTable = map[MessageQueue]int64{}
+ var kvMap map[string]string
+ kvMap, err = util.GetKvStringMap(string(data))
+ if err != nil {
+ return
+ }
+ glog.Info(kvMap)
+ kvMap, err = util.GetKvStringMap(kvMap["\"offsetTable\""])
+ if err != nil {
+ return
+ }
+ for k, v := range kvMap {
+ messageQueue := &MessageQueue{}
+ var offset int64
+ err = json.Unmarshal([]byte(k), messageQueue)
+ if err != nil {
+ return
+ }
+ offset, err = util.StrToInt64(v)
+ if err != nil {
+ return
+ }
+ self.OffsetTable[*messageQueue] = offset
+ }
+ return
+}