You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/05/07 03:58:59 UTC

[3/3] incubator-rocketmq-externals git commit: Go-Client remoting and RocketMqClient common method implement, closes apache/incubator-rocketmq-externals#17

Go-Client remoting and RocketMqClient common method implement, closes apache/incubator-rocketmq-externals#17


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/aaa0758e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/aaa0758e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/aaa0758e

Branch: refs/heads/master
Commit: aaa0758e6cdf2d1020a900c236ab84ce4071f2ff
Parents: 7ceba1b
Author: StyleTang <st...@gmail.com>
Authored: Sun May 7 11:58:44 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Sun May 7 11:58:44 2017 +0800

----------------------------------------------------------------------
 rocketmq-go/docs/checklist.md                   |   1 +
 rocketmq-go/docs/package.puml                   |  20 +-
 rocketmq-go/docs/roadmap.md                     |  48 +-
 rocketmq-go/example/consumer_example.go         |  35 ++
 rocketmq-go/example/rocketmq_client_example.go  |  40 ++
 rocketmq-go/model/config/client_config.go       |   3 +-
 rocketmq-go/model/config/consumer_config.go     |  79 +++
 rocketmq-go/model/constant/config.go            |  29 +
 rocketmq-go/model/constant/message_const.go     |  45 ++
 rocketmq-go/model/constant/message_sys_flag.go  |  26 +
 rocketmq-go/model/constant/mix_all.go           |  62 ++
 rocketmq-go/model/constant/perm.go              |  33 +
 rocketmq-go/model/constant/pull_sys_flag.go     |  24 +
 .../model/consume_concurrently_result.go        |  27 +
 .../model/consume_message_directly_result.go    |  31 +
 rocketmq-go/model/consumer_running_info.go      |  81 +++
 ...me_message_directly_result_request_header.go |  32 +
 .../consumer_send_msg_back_request_header.go    |  31 +
 .../model/header/get_consumer_list_by_group.go  |  33 +
 .../get_consumer_running_info_request_header.go |  29 +
 .../header/get_max_offset_request_header.go     |  26 +
 .../header/get_max_offset_response_header.go    |  28 +
 .../header/get_route_info_request_header.go     |  25 +
 .../query_consumer_offset_request_header.go     |  27 +
 .../model/header/reset_offset_request_header.go |  37 ++
 .../header/search_offset_request_header.go      |  33 +
 .../update_consumer_offset_request_header.go    |  34 +
 rocketmq-go/model/heart_beat.go                 |  34 +
 rocketmq-go/model/message.go                    | 134 ++++
 rocketmq-go/model/message/message.go            |   2 +-
 rocketmq-go/model/message_ext.go                |  71 +++
 rocketmq-go/model/message_listener.go           |  19 +
 rocketmq-go/model/message_queue.go              |  77 +++
 rocketmq-go/model/process_queue.go              | 614 +++++++++++++++++++
 rocketmq-go/model/process_queue_info.go         |  45 ++
 rocketmq-go/model/pull_request.go               |  24 +
 rocketmq-go/model/reset_offset_body.go          |  55 ++
 rocketmq-go/model/response_code.go              |  16 +
 rocketmq-go/model/send_result.go                |   2 +-
 rocketmq-go/model/subscription_data.go          |  26 +
 rocketmq-go/model/topic_publishInfo.go          | 108 ++--
 rocketmq-go/model/topic_publish_info.go         |  96 +++
 rocketmq-go/model/topic_route_data.go           | 216 ++++---
 rocketmq-go/mq_client_manager.go                |  56 +-
 rocketmq-go/mq_consumer.go                      |  46 +-
 rocketmq-go/mq_producer.go                      |   6 +-
 rocketmq-go/remoting/custom_header.go           |   1 +
 rocketmq-go/remoting/event_executor.go          | 144 -----
 rocketmq-go/remoting/json_serializable.go       |  42 ++
 rocketmq-go/remoting/remoting_client.go         | 534 ++++++++--------
 rocketmq-go/remoting/remoting_command.go        | 178 +-----
 rocketmq-go/remoting/request_code.go            | 111 ++++
 rocketmq-go/remoting/request_processor.go       |  26 +
 rocketmq-go/remoting/response_code.go           |  53 ++
 rocketmq-go/remoting/response_future.go         |  61 +-
 rocketmq-go/remoting/rocketmq_serializable.go   | 139 +++++
 rocketmq-go/remoting/serializable.go            |  80 +++
 rocketmq-go/service/client_api.go               |  39 +-
 rocketmq-go/service/consume_message_service.go  | 153 +++++
 rocketmq-go/service/consume_messsage_service.go |  20 -
 rocketmq-go/service/mq_client.go                | 343 +++++++++++
 rocketmq-go/util/concurrent_map.go              | 278 +++++++++
 rocketmq-go/util/ip.go                          |  73 +++
 rocketmq-go/util/json_util.go                   | 157 +++++
 rocketmq-go/util/message_client_id_generator.go | 110 ++++
 rocketmq-go/util/string_util.go                 |  91 +++
 rocketmq-go/util/structs/field.go               | 141 +++++
 rocketmq-go/util/structs/structs.go             | 581 ++++++++++++++++++
 rocketmq-go/util/structs/tags.go                |  32 +
 69 files changed, 5135 insertions(+), 818 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/docs/checklist.md
----------------------------------------------------------------------
diff --git a/rocketmq-go/docs/checklist.md b/rocketmq-go/docs/checklist.md
new file mode 100644
index 0000000..0412f86
--- /dev/null
+++ b/rocketmq-go/docs/checklist.md
@@ -0,0 +1 @@
+# Test Check List

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/docs/package.puml
----------------------------------------------------------------------
diff --git a/rocketmq-go/docs/package.puml b/rocketmq-go/docs/package.puml
index 9c78711..f34dd38 100644
--- a/rocketmq-go/docs/package.puml
+++ b/rocketmq-go/docs/package.puml
@@ -37,6 +37,18 @@ serviceState
    clientIP
    instanceName
   }
+  class remoting.ClientRequestProcessor{
+     receive request and invoke the method.
+     GET_CONSUMER_STATUS_FROM_CLIENT
+     GET_CONSUMER_RUNNING_INFO
+     ...
+    }
+    class remoting.SerializerHandler{
+       JsonSerializer
+       RocketMqSerializer
+    }
+
+
 namespace service{
 
 
@@ -61,8 +73,10 @@ namespace service{
 }
 
 namespace remoting {
-  RemotingClient  *-- RemotingCommand:contains
-  RemotingClient *-- ClientConfig:contains
+  RemotingClient  *-- RemotingCommand
+  RemotingClient *-- ClientConfig
+    RemotingClient  *-- ClientRequestProcessor
+    RemotingClient *-- SerializerHandler
 
 
 }
@@ -75,5 +89,5 @@ namespace rocketmq_go{
 
 note top of remoting.RemotingClient :(sync|aysc|oneWay)
 note top of remoting :net,serialize,connect,request response
-note top of service.MqClient :mq method
+note top of service.MqClient :mq common method
 @enduml
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/docs/roadmap.md
----------------------------------------------------------------------
diff --git a/rocketmq-go/docs/roadmap.md b/rocketmq-go/docs/roadmap.md
index e6eabff..0db9033 100644
--- a/rocketmq-go/docs/roadmap.md
+++ b/rocketmq-go/docs/roadmap.md
@@ -1,19 +1,5 @@
 # RoadMap-Milestone1
 
-## Producer
-- [ ] ProducerType
-    - [ ] DefaultProducer
-- [ ] API
-    - [ ] Send
-        - [ ] Sync
-- [ ] Other
-    - [ ] DelayMessage
-    - [ ] Config
-    - [ ] MessageId Generate
-    - [ ] CompressMsg
-    - [ ] TimeOut
-    - [ ] LoadBalance
-    - [ ] DefaultTopic
 ## Consumer
 - [ ] ConsumerType
     - [ ] PushConsumer
@@ -47,23 +33,23 @@
     - [ ] UpdateTopicRouteInfoFromNameServer
     - [ ] PersistAllConsumerOffset
     - [ ] ClearExpiredMessage(form consumer consumeMessageService)
-
-
-## Remoting
-- [ ] MqClientRequest
-    - [ ] InvokeSync
-    - [ ] InvokeAsync
-    - [ ] InvokeOneWay
 - [ ] ClientRemotingProcessor
+    - [ ] CHECK_TRANSACTION_STATE
     - [ ] NOTIFY_CONSUMER_IDS_CHANGED
     - [ ] RESET_CONSUMER_CLIENT_OFFSET
     - [ ] GET_CONSUMER_STATUS_FROM_CLIENT
     - [ ] GET_CONSUMER_RUNNING_INFO
     - [ ] CONSUME_MESSAGE_DIRECTLY
-- [ ] Serialize
-    - [ ] JSON
-    - [ ] ROCKETMQ
-- [ ] NamesrvAddrChoosed(HA)
+
+## Remoting
+- [x] MqClientRequest
+    - [x] InvokeSync
+    - [x] InvokeAsync
+    - [x] InvokeOneWay
+- [x] Serialize
+    - [x] JSON
+    - [x] ROCKETMQ
+- [x] NamesrvAddrChoosed(HA)
 
 
 # RoadMap-ALL
@@ -143,13 +129,6 @@
     - [ ] PersistAllConsumerOffset
     - [ ] ClearExpiredMessage(form consumer consumeMessageService)
     - [ ] UploadFilterClassSource(FromHeartBeat/But Golang Not Easy To do this(Java Source))
-
-
-## Remoting
-- [ ] MqClientRequest
-    - [ ] InvokeSync
-    - [ ] InvokeAsync
-    - [ ] InvokeOneWay
 - [ ] ClientRemotingProcessor
     - [ ] CHECK_TRANSACTION_STATE
     - [ ] NOTIFY_CONSUMER_IDS_CHANGED
@@ -157,6 +136,11 @@
     - [ ] GET_CONSUMER_STATUS_FROM_CLIENT
     - [ ] GET_CONSUMER_RUNNING_INFO
     - [ ] CONSUME_MESSAGE_DIRECTLY
+## Remoting
+- [ ] MqClientRequest
+    - [ ] InvokeSync
+    - [ ] InvokeAsync
+    - [ ] InvokeOneWay
 - [ ] Serialize
     - [ ] JSON
     - [ ] ROCKETMQ

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/example/consumer_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/consumer_example.go b/rocketmq-go/example/consumer_example.go
index 5c0044f..af74c01 100644
--- a/rocketmq-go/example/consumer_example.go
+++ b/rocketmq-go/example/consumer_example.go
@@ -16,5 +16,40 @@
  */
 package main
 
+import (
+	"errors"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/golang/glog"
+)
+
 func main() {
+
+	// create a mqClientManager instance
+	var mqClientConfig = &rocketmq.MqClientConfig{}
+	var mqClientManager = rocketmq.NewMqClientManager(mqClientConfig)
+
+	// create rocketMq consumer
+	var consumerConfig = &rocketmq.MqConsumerConfig{}
+	var consumer1 = rocketmq.NewDefaultMQPushConsumer("testGroup", consumerConfig)
+	consumer1.Subscribe("testTopic", "*")
+	consumer1.RegisterMessageListener(func(msgs []model.MessageExt) model.ConsumeConcurrentlyResult {
+		var index = -1
+		for i, msg := range msgs {
+			// your code here,for example,print msg
+			glog.Info(msg)
+			var err = errors.New("error")
+			if err != nil {
+				break
+			}
+			index = i
+		}
+		return model.ConsumeConcurrentlyResult{ConsumeConcurrentlyStatus: model.CONSUME_SUCCESS, AckIndex: index}
+	})
+
+	//register consumer to mqClientManager
+	mqClientManager.RegisterConsumer(consumer1)
+
+	//start it
+	mqClientManager.Start()
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/example/rocketmq_client_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/rocketmq_client_example.go b/rocketmq-go/example/rocketmq_client_example.go
new file mode 100644
index 0000000..c6828c8
--- /dev/null
+++ b/rocketmq-go/example/rocketmq_client_example.go
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package main
+
+import (
+	"fmt"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+)
+
+func main() {
+
+	var clienConfig = config.NewClientConfig()
+	clienConfig.SetNameServerAddress("120.55.113.35:9876")
+
+	//use json serializer
+	var mqClient = service.MqClientInit(clienConfig, nil)
+	fmt.Println(mqClient.TryToFindTopicPublishInfo("GoLang"))
+	//&{false true [{GoLang broker-a 0} {GoLang broker-a 1} {GoLang broker-a 2} {GoLang broker-a 3}] 0xc420016800 0} <nil>
+
+	//use rocketmq serializer
+	constant.USE_HEADER_SERIALIZETYPE = constant.ROCKETMQ_SERIALIZE
+	var mqClient2 = service.MqClientInit(clienConfig, nil)
+	fmt.Println(mqClient2.TryToFindTopicPublishInfo("GoLang"))
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/config/client_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/client_config.go b/rocketmq-go/model/config/client_config.go
index d9ad88c..8a415d8 100644
--- a/rocketmq-go/model/config/client_config.go
+++ b/rocketmq-go/model/config/client_config.go
@@ -24,7 +24,8 @@ import (
 
 // client common config
 type ClientConfig struct {
-	nameServerAddress             string
+	nameServerAddress string // only this is in use
+
 	clientIP                      string
 	instanceName                  string
 	clientCallbackExecutorThreads int // TODO: clientCallbackExecutorThreads

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/config/consumer_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/consumer_config.go b/rocketmq-go/model/config/consumer_config.go
index a37eaa0..25f7585 100644
--- a/rocketmq-go/model/config/consumer_config.go
+++ b/rocketmq-go/model/config/consumer_config.go
@@ -16,5 +16,84 @@
  */
 package config
 
+import "time"
+
 type RocketMqConsumerConfig struct {
+	ConsumeFromWhere string
+	/**
+	 * Minimum consumer thread number
+	 */
+	//consumeThreadMin                  int
+	//					/**
+	//					 * Max consumer thread number
+	//					 */
+	//consumeThreadMax                  int
+
+	/**
+	 * Threshold for dynamic adjustment of the number of thread pool
+	 */
+	//adjustThreadPoolNumsThreshold     int   // = 100000;
+
+	/**
+	 * Concurrently max span offset.it has no effect on sequential consumption
+	 */
+	ConsumeConcurrentlyMaxSpan int // = 2000;
+	/**
+	 * Flow control threshold
+	 */
+	PullThresholdForQueue int //= 1000;
+	/**
+	 * Message pull Interval
+	 */
+	PullInterval int64 //= 0;
+	/**
+	 * Batch consumption size
+	 */
+	ConsumeMessageBatchMaxSize int //= 1;
+	/**
+	 * Batch pull size
+	 */
+	PullBatchSize int //= 32;
+
+	/**
+	 * Whether update subscription relationship when every pull
+	 */
+	PostSubscriptionWhenPull bool //= false; //get subExpression
+
+	/**
+	 * Whether the unit of subscription group
+	 */
+	UnitMode                      bool  // = false;
+	MaxReconsumeTimes             int   //= 16;
+	SuspendCurrentQueueTimeMillis int64 //= 1000;
+	ConsumeTimeout                int64 //= 15 //minutes
+
+	//=========can not change
+	/**
+	 * Delay some time when exception occur
+	 */
+	PullTimeDelayMillsWhenException int64 //= 3000;
+	/**
+	 * Flow control interval
+	 */
+	PullTimeDelayMillsWhenFlowControl int64 //= 50;
+	/**
+	 * Delay some time when suspend pull service
+	 */
+	PullTimeDelayMillsWhenSuspend    int64 //= 1000;
+	BrokerSuspendMaxTimeMillis       int64 //1000 * 15;
+	ConsumerTimeoutMillisWhenSuspend int64 //= 1000 * 30;
+
+	/**
+	* Backtracking consumption time with second precision.time format is
+	* 20131223171201
+	* Implying Seventeen twelve and 01 seconds on December 23, 2013 year
+	* Default backtracking consumption time Half an hour ago
+	 */
+	ConsumeTimestamp time.Time //when use CONSUME_FROM_TIMESTAMP
+}
+
+func NewRocketMqConsumerConfig() (consumerConfig *RocketMqConsumerConfig) {
+	consumerConfig = &RocketMqConsumerConfig{}
+	return
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/config.go b/rocketmq-go/model/constant/config.go
new file mode 100644
index 0000000..c48dfa5
--- /dev/null
+++ b/rocketmq-go/model/constant/config.go
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package constant
+
+//-------SerializeType-------
+var JSON_SERIALIZE byte = 0
+var ROCKETMQ_SERIALIZE byte = 1
+
+//-------SerializeType-------
+
+var USE_HEADER_SERIALIZETYPE = JSON_SERIALIZE
+
+var REMOTING_COMMAND_FLAG = 0
+var REMOTING_COMMAND_LANGUAGE = "OTHER"
+var REMOTING_COMMAND_VERSION int16 = 137

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/message_const.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/message_const.go b/rocketmq-go/model/constant/message_const.go
new file mode 100644
index 0000000..402d328
--- /dev/null
+++ b/rocketmq-go/model/constant/message_const.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package constant
+
+const (
+	PROPERTY_KEYS                          = "KEYS"
+	PROPERTY_TAGS                          = "TAGS"
+	PROPERTY_WAIT_STORE_MSG_OK             = "WAIT"
+	PROPERTY_DELAY_TIME_LEVEL              = "DELAY"
+	PROPERTY_RETRY_TOPIC                   = "RETRY_TOPIC"
+	PROPERTY_REAL_TOPIC                    = "REAL_TOPIC"
+	PROPERTY_REAL_QUEUE_ID                 = "REAL_QID"
+	PROPERTY_TRANSACTION_PREPARED          = "TRAN_MSG"
+	PROPERTY_PRODUCER_GROUP                = "PGROUP"
+	PROPERTY_MIN_OFFSET                    = "MIN_OFFSET"
+	PROPERTY_MAX_OFFSET                    = "MAX_OFFSET"
+	PROPERTY_BUYER_ID                      = "BUYER_ID"
+	PROPERTY_ORIGIN_MESSAGE_ID             = "ORIGIN_MESSAGE_ID"
+	PROPERTY_TRANSFER_FLAG                 = "TRANSFER_FLAG"
+	PROPERTY_CORRECTION_FLAG               = "CORRECTION_FLAG"
+	PROPERTY_MQ2_FLAG                      = "MQ2_FLAG"
+	PROPERTY_RECONSUME_TIME                = "RECONSUME_TIME"
+	PROPERTY_MSG_REGION                    = "MSG_REGION"
+	PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY"
+	PROPERTY_MAX_RECONSUME_TIMES           = "MAX_RECONSUME_TIMES"
+	PROPERTY_CONSUME_START_TIMESTAMP       = "CONSUME_START_TIME"
+
+	COMMUNICATIONMODE_SYNC   = "SYNC"
+	COMMUNICATIONMODE_ASYNC  = "ASYNC"
+	COMMUNICATIONMODE_ONEWAY = "ONEWAY"
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/message_sys_flag.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/message_sys_flag.go b/rocketmq-go/model/constant/message_sys_flag.go
new file mode 100644
index 0000000..a53c4fd
--- /dev/null
+++ b/rocketmq-go/model/constant/message_sys_flag.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package constant
+
+const (
+	CompressedFlag          int32 = (0x1 << 0)
+	MultiTagsFlag           int32 = (0x1 << 1)
+	TransactionNotType      int32 = (0x0 << 2)
+	TransactionPreparedType int32 = (0x1 << 2)
+	TransactionCommitType   int32 = (0x2 << 2)
+	TransactionRollbackType int32 = (0x3 << 2)
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/mix_all.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/mix_all.go b/rocketmq-go/model/constant/mix_all.go
new file mode 100644
index 0000000..6abaabe
--- /dev/null
+++ b/rocketmq-go/model/constant/mix_all.go
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package constant
+
+const (
+	ROCKETMQ_HOME_ENV      = "ROCKETMQ_HOME"
+	ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir"
+	NAMESRV_ADDR_ENV       = "NAMESRV_ADDR"
+	NAMESRV_ADDR_PROPERTY  = "rocketmq.namesrv.addr"
+	MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel"
+	//WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", "jmenv.tbsite.net")
+	//WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr")
+	// http://jmenv.tbsite.net:8080/rocketmq/nsaddr
+	//WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP
+	DEFAULT_TOPIC               = "TBW102"
+	BENCHMARK_TOPIC             = "BenchmarkTest"
+	DEFAULT_PRODUCER_GROUP      = "DEFAULT_PRODUCER"
+	DEFAULT_CONSUMER_GROUP      = "DEFAULT_CONSUMER"
+	TOOLS_CONSUMER_GROUP        = "TOOLS_CONSUMER"
+	FILTERSRV_CONSUMER_GROUP    = "FILTERSRV_CONSUMER"
+	MONITOR_CONSUMER_GROUP      = "__MONITOR_CONSUMER"
+	CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"
+	SELF_TEST_PRODUCER_GROUP    = "SELF_TEST_P_GROUP"
+	SELF_TEST_CONSUMER_GROUP    = "SELF_TEST_C_GROUP"
+	SELF_TEST_TOPIC             = "SELF_TEST_TOPIC"
+	OFFSET_MOVED_EVENT          = "OFFSET_MOVED_EVENT"
+	ONS_HTTP_PROXY_GROUP        = "CID_ONS-HTTP-PROXY"
+	CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION"
+	CID_ONSAPI_OWNER_GROUP      = "CID_ONSAPI_OWNER"
+	CID_ONSAPI_PULL_GROUP       = "CID_ONSAPI_PULL"
+	CID_RMQ_SYS_PREFIX          = "CID_RMQ_SYS_"
+
+	//public static final List<String> LocalInetAddrs = getLocalInetAddress()
+	//Localhost = localhost()
+	//DEFAULT_CHARSET = "UTF-8"
+	MASTER_ID int64 = 0
+	CURRENT_JVM_PID
+
+	RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"
+
+	DLQ_GROUP_TOPIC_PREFIX     = "%DLQ%"
+	SYSTEM_TOPIC_PREFIX        = "rmq_sys_"
+	UNIQUE_MSG_QUERY_FLAG      = "_UNIQUE_KEY_QUERY"
+	MAX_MESSAGE_BODY_SIZE  int = 4 * 1024 * 1024 //4m
+	MAX_MESSAGE_TOPIC_SIZE int = 255             //255char
+
+	DEFAULT_TOPIC_QUEUE_NUMS int32 = 4
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/perm.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/perm.go b/rocketmq-go/model/constant/perm.go
new file mode 100644
index 0000000..962d989
--- /dev/null
+++ b/rocketmq-go/model/constant/perm.go
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package constant
+
+const (
+	PERM_PRIORITY = 0x1 << 3
+	PERM_READ     = 0x1 << 2
+	PERM_WRITE    = 0x1 << 1
+	PERM_INHERIT  = 0x1 << 0
+)
+
+func WriteAble(perm int32) (ret bool) {
+	ret = ((perm & PERM_WRITE) == PERM_WRITE)
+	return
+}
+func ReadAble(perm int32) (ret bool) {
+	ret = ((perm & PERM_READ) == PERM_READ)
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/constant/pull_sys_flag.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/pull_sys_flag.go b/rocketmq-go/model/constant/pull_sys_flag.go
new file mode 100644
index 0000000..0a2921c
--- /dev/null
+++ b/rocketmq-go/model/constant/pull_sys_flag.go
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package constant
+
+const (
+	FLAG_COMMIT_OFFSET int32 = 0x1 << 0
+	FLAG_SUSPEND       int32 = 0x1 << 1
+	FLAG_SUBSCRIPTION  int32 = 0x1 << 2
+	FLAG_CLASS_FILTER  int32 = 0x1 << 3
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/consume_concurrently_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/consume_concurrently_result.go b/rocketmq-go/model/consume_concurrently_result.go
new file mode 100644
index 0000000..6e4df7b
--- /dev/null
+++ b/rocketmq-go/model/consume_concurrently_result.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+const (
+	CONSUME_SUCCESS = "CONSUME_SUCCESS"
+	RECONSUME_LATER = "RECONSUME_LATER"
+)
+
+type ConsumeConcurrentlyResult struct {
+	ConsumeConcurrentlyStatus string
+	AckIndex                  int
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/consume_message_directly_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/consume_message_directly_result.go b/rocketmq-go/model/consume_message_directly_result.go
new file mode 100644
index 0000000..a9af32e
--- /dev/null
+++ b/rocketmq-go/model/consume_message_directly_result.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+type ConsumeMessageDirectlyResult struct {
+	Order      bool `json:"order"`
+	AutoCommit bool `json:"autoCommit"`
+	//CR_SUCCESS,
+	//CR_LATER,
+	//CR_ROLLBACK,
+	//CR_COMMIT,
+	//CR_THROW_EXCEPTION,
+	//CR_RETURN_NULL,
+	ConsumeResult  string `json:"consumeResult"`
+	Remark         string `json:"remark"`
+	SpentTimeMills int64  `json:"spentTimeMills"`
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/consumer_running_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/consumer_running_info.go b/rocketmq-go/model/consumer_running_info.go
new file mode 100644
index 0000000..80c39ae
--- /dev/null
+++ b/rocketmq-go/model/consumer_running_info.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+import "encoding/json"
+
+type ConsumerRunningInfo struct {
+	Properties map[string]string                 `json:"properties"`
+	MqTable    map[MessageQueue]ProcessQueueInfo `json:"mqTable"`
+	// todo
+	//private TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>();
+	//
+	//private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>();
+	//
+	//private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>();
+	//
+	//private String jstack;
+}
+
+func (self *ConsumerRunningInfo) Encode() (jsonByte []byte, err error) {
+	mqTableJsonStr := "{"
+	first := true
+	var keyJson []byte
+	var valueJson []byte
+
+	for key, value := range self.MqTable {
+		keyJson, err = json.Marshal(key)
+		if err != nil {
+			return
+		}
+		valueJson, err = json.Marshal(value)
+		if err != nil {
+			return
+		}
+		if first == false {
+			mqTableJsonStr = mqTableJsonStr + ","
+		}
+		mqTableJsonStr = mqTableJsonStr + string(keyJson) + ":" + string(valueJson)
+		first = false
+	}
+	mqTableJsonStr = mqTableJsonStr + "}"
+	var propertiesJson []byte
+	propertiesJson, err = json.Marshal(self.Properties)
+	if err != nil {
+		return
+	}
+	jsonByte = self.formatEncode("\"properties\"", string(propertiesJson), "\"mqTable\"", string(mqTableJsonStr))
+	return
+}
+func (self *ConsumerRunningInfo) formatEncode(kVList ...string) []byte {
+	jsonStr := "{"
+	first := true
+	for i := 0; i+1 < len(kVList); i += 2 {
+		if first == false {
+			jsonStr = jsonStr + ","
+		}
+		keyJson := kVList[i]
+		valueJson := kVList[i+1]
+
+		jsonStr = jsonStr + string(keyJson) + ":" + string(valueJson)
+
+		first = false
+	}
+	jsonStr = jsonStr + "}"
+	return []byte(jsonStr)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/consume_message_directly_result_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/consume_message_directly_result_request_header.go b/rocketmq-go/model/header/consume_message_directly_result_request_header.go
new file mode 100644
index 0000000..a593d51
--- /dev/null
+++ b/rocketmq-go/model/header/consume_message_directly_result_request_header.go
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+type ConsumeMessageDirectlyResultRequestHeader struct {
+	ConsumerGroup string `json:"consumerGroup"`
+	ClientId      string `json:"clientId"`
+	MsgId         string `json:"msgId"`
+	BrokerName    string `json:"brokerName"`
+}
+
+func (self *ConsumeMessageDirectlyResultRequestHeader) FromMap(headerMap map[string]interface{}) {
+	self.ConsumerGroup = headerMap["consumerGroup"].(string)
+	self.ClientId = headerMap["clientId"].(string)
+	self.MsgId = headerMap["msgId"].(string)
+	self.BrokerName = headerMap["brokerName"].(string)
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/consumer_send_msg_back_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/consumer_send_msg_back_request_header.go b/rocketmq-go/model/header/consumer_send_msg_back_request_header.go
new file mode 100644
index 0000000..4e101c6
--- /dev/null
+++ b/rocketmq-go/model/header/consumer_send_msg_back_request_header.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+type ConsumerSendMsgBackRequestHeader struct {
+	Offset            int64
+	Group             string
+	DelayLevel        int32
+	OriginMsgId       string
+	OriginTopic       string
+	UnitMode          bool
+	MaxReconsumeTimes int32
+}
+
+func (self *ConsumerSendMsgBackRequestHeader) FromMap(headerMap map[string]interface{}) {
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_consumer_list_by_group.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/get_consumer_list_by_group.go b/rocketmq-go/model/header/get_consumer_list_by_group.go
new file mode 100644
index 0000000..e06e1fa
--- /dev/null
+++ b/rocketmq-go/model/header/get_consumer_list_by_group.go
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+type GetConsumerListByGroupRequestHeader struct {
+	ConsumerGroup string `json:"consumerGroup"`
+}
+
+func (self *GetConsumerListByGroupRequestHeader) FromMap(headerMap map[string]interface{}) {
+	return
+}
+
+type GetConsumerListByGroupResponseBody struct {
+	ConsumerIdList []string
+}
+
+func (self *GetConsumerListByGroupResponseBody) FromMap(headerMap map[string]interface{}) {
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_consumer_running_info_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/get_consumer_running_info_request_header.go b/rocketmq-go/model/header/get_consumer_running_info_request_header.go
new file mode 100644
index 0000000..5e7487f
--- /dev/null
+++ b/rocketmq-go/model/header/get_consumer_running_info_request_header.go
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+type GetConsumerRunningInfoRequestHeader struct {
+	ConsumerGroup string `json:"consumerGroup"`
+	ClientId      string `json:"clientId"`
+	JstackEnable  bool   `json:"jstackEnable"`
+}
+
+func (self *GetConsumerRunningInfoRequestHeader) FromMap(headerMap map[string]interface{}) {
+	self.ConsumerGroup = headerMap["consumerGroup"].(string)
+	self.ClientId = headerMap["clientId"].(string)
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_max_offset_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/get_max_offset_request_header.go b/rocketmq-go/model/header/get_max_offset_request_header.go
new file mode 100644
index 0000000..6d4723e
--- /dev/null
+++ b/rocketmq-go/model/header/get_max_offset_request_header.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+type GetMaxOffsetRequestHeader struct {
+	Topic   string `json:"topic"`
+	QueueId int32  `json:"queueId"`
+}
+
+func (self *GetMaxOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_max_offset_response_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/get_max_offset_response_header.go b/rocketmq-go/model/header/get_max_offset_response_header.go
new file mode 100644
index 0000000..eea6c2c
--- /dev/null
+++ b/rocketmq-go/model/header/get_max_offset_response_header.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+
+type QueryOffsetResponseHeader struct {
+	Offset int64 `json:"offset"`
+}
+
+func (self *QueryOffsetResponseHeader) FromMap(headerMap map[string]interface{}) {
+	self.Offset = util.StrToInt64WithDefaultValue(headerMap["offset"].(string), -1)
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/get_route_info_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/get_route_info_request_header.go b/rocketmq-go/model/header/get_route_info_request_header.go
new file mode 100644
index 0000000..7c33c25
--- /dev/null
+++ b/rocketmq-go/model/header/get_route_info_request_header.go
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+type GetRouteInfoRequestHeader struct {
+	Topic string `json:"topic"`
+}
+
+func (self *GetRouteInfoRequestHeader) FromMap(headerMap map[string]interface{}) {
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/query_consumer_offset_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/query_consumer_offset_request_header.go b/rocketmq-go/model/header/query_consumer_offset_request_header.go
new file mode 100644
index 0000000..ed455e7
--- /dev/null
+++ b/rocketmq-go/model/header/query_consumer_offset_request_header.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+type QueryConsumerOffsetRequestHeader struct {
+	ConsumerGroup string `json:"consumerGroup"`
+	Topic         string `json:"topic"`
+	QueueId       int32  `json:"queueId"`
+}
+
+func (self *QueryConsumerOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/reset_offset_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/reset_offset_request_header.go b/rocketmq-go/model/header/reset_offset_request_header.go
new file mode 100644
index 0000000..642b600
--- /dev/null
+++ b/rocketmq-go/model/header/reset_offset_request_header.go
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+	"strconv"
+)
+
+type ResetOffsetRequestHeader struct {
+	Topic     string `json:"topic"`
+	Group     string `json:"group"`
+	Timestamp int64  `json:"timestamp"`
+	IsForce   bool   `json:"isForce"`
+}
+
+func (self *ResetOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
+	self.Group = headerMap["group"].(string)
+	self.Topic = headerMap["topic"].(string)
+	self.Timestamp = util.StrToInt64WithDefaultValue(headerMap["timestamp"].(string), -1)
+	self.IsForce, _ = strconv.ParseBool(headerMap["isForce"].(string))
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/search_offset_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/search_offset_request_header.go b/rocketmq-go/model/header/search_offset_request_header.go
new file mode 100644
index 0000000..5088eac
--- /dev/null
+++ b/rocketmq-go/model/header/search_offset_request_header.go
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+import ()
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+
+type SearchOffsetRequestHeader struct {
+	Topic     string `json:"topic"`
+	QueueId   int32  `json:"queueId"`
+	Timestamp int64  `json:"timestamp"`
+}
+
+func (self *SearchOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
+	self.Topic = headerMap["topic"].(string)
+	self.Topic = headerMap["queueId"].(string)
+	self.Timestamp = util.StrToInt64WithDefaultValue(headerMap["timestamp"].(string), -1)
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/header/update_consumer_offset_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/update_consumer_offset_request_header.go b/rocketmq-go/model/header/update_consumer_offset_request_header.go
new file mode 100644
index 0000000..42612db
--- /dev/null
+++ b/rocketmq-go/model/header/update_consumer_offset_request_header.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package header
+
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+
+type UpdateConsumerOffsetRequestHeader struct {
+	ConsumerGroup string `json:"consumerGroup"`
+	Topic         string `json:"topic"`
+	QueueId       int32  `json:"queueId"`
+	CommitOffset  int64  `json:"commitOffset"`
+}
+
+func (self *UpdateConsumerOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
+	self.ConsumerGroup = headerMap["consumerGroup"].(string)
+	self.QueueId = util.StrToInt32WithDefaultValue(util.ReadString(headerMap["queueId"]), 0)
+	self.CommitOffset = util.StrToInt64WithDefaultValue(headerMap["commitOffset"].(string), -1)
+	self.Topic = util.ReadString(headerMap["topic"])
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/heart_beat.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/heart_beat.go b/rocketmq-go/model/heart_beat.go
new file mode 100644
index 0000000..fc5eded
--- /dev/null
+++ b/rocketmq-go/model/heart_beat.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+type ConsumerData struct {
+	GroupName           string
+	ConsumeType         string
+	MessageModel        string
+	ConsumeFromWhere    string
+	SubscriptionDataSet []*SubscriptionData
+	UnitMode            bool
+}
+type ProducerData struct {
+	GroupName string
+}
+type HeartbeatData struct {
+	ClientId        string
+	ConsumerDataSet []*ConsumerData
+	ProducerDataSet []*ProducerData
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message.go b/rocketmq-go/model/message.go
new file mode 100644
index 0000000..0cb3d97
--- /dev/null
+++ b/rocketmq-go/model/message.go
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+	"strconv"
+	"strings"
+)
+
+type Message struct {
+	Topic      string
+	Flag       int
+	Properties map[string]string
+	Body       []byte
+}
+
+func (self *Message) SetTag(tag string) {
+	if self.Properties == nil {
+		self.Properties = make(map[string]string)
+	}
+	self.Properties[constant.PROPERTY_TAGS] = tag
+}
+func (self *Message) GetTag() (tag string) {
+	if self.Properties != nil {
+		tag = self.Properties[constant.PROPERTY_TAGS]
+	}
+	return
+}
+
+func (self *Message) SetKeys(keys []string) {
+	if self.Properties == nil {
+		self.Properties = make(map[string]string)
+	}
+	self.Properties[constant.PROPERTY_KEYS] = strings.Join(keys, KEY_SEPARATOR)
+}
+
+func (self *Message) SetDelayTimeLevel(delayTimeLevel int) {
+	if self.Properties == nil {
+		self.Properties = make(map[string]string)
+	}
+	self.Properties[constant.PROPERTY_DELAY_TIME_LEVEL] = util.IntToString(delayTimeLevel)
+}
+func (self *Message) SetWaitStoreMsgOK(waitStoreMsgOK bool) {
+	if self.Properties == nil {
+		self.Properties = make(map[string]string)
+	}
+	self.Properties[constant.PROPERTY_WAIT_STORE_MSG_OK] = strconv.FormatBool(waitStoreMsgOK)
+}
+func (self *Message) GeneratorMsgUniqueKey() {
+	if self.Properties == nil {
+		self.Properties = make(map[string]string)
+	}
+	if len(self.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX]) > 0 {
+		return
+	}
+	self.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX] = util.GeneratorMessageClientId()
+}
+
+func (self *MessageExt) GetMsgUniqueKey() string {
+	if self.Properties != nil {
+		originMessageId := self.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX]
+		if len(originMessageId) > 0 {
+			return originMessageId
+		}
+	}
+	return self.MsgId
+}
+
+func (self *Message) SetOriginMessageId(messageId string) {
+	if self.Properties == nil {
+		self.Properties = make(map[string]string)
+	}
+	self.Properties[constant.PROPERTY_ORIGIN_MESSAGE_ID] = messageId
+}
+
+func (self *Message) SetRetryTopic(retryTopic string) {
+	if self.Properties == nil {
+		self.Properties = make(map[string]string)
+	}
+	self.Properties[constant.PROPERTY_RETRY_TOPIC] = retryTopic
+}
+func (self *Message) SetReconsumeTime(reConsumeTime int) {
+	if self.Properties == nil {
+		self.Properties = make(map[string]string)
+	}
+	self.Properties[constant.PROPERTY_RECONSUME_TIME] = util.IntToString(reConsumeTime)
+}
+
+func (self *Message) GetReconsumeTimes() (reConsumeTime int) {
+	reConsumeTime = 0
+	if self.Properties != nil {
+		reConsumeTimeStr := self.Properties[constant.PROPERTY_RECONSUME_TIME]
+		if len(reConsumeTimeStr) > 0 {
+			reConsumeTime = util.StrToIntWithDefaultValue(reConsumeTimeStr, 0)
+		}
+	}
+	return
+}
+
+func (self *Message) SetMaxReconsumeTimes(maxConsumeTime int) {
+	if self.Properties == nil {
+		self.Properties = make(map[string]string)
+	}
+	self.Properties[constant.PROPERTY_MAX_RECONSUME_TIMES] = util.IntToString(maxConsumeTime)
+}
+
+func (self *Message) GetMaxReconsumeTimes() (maxConsumeTime int) {
+	maxConsumeTime = 0
+	if self.Properties != nil {
+		reConsumeTimeStr := self.Properties[constant.PROPERTY_MAX_RECONSUME_TIMES]
+		if len(reConsumeTimeStr) > 0 {
+			maxConsumeTime = util.StrToIntWithDefaultValue(reConsumeTimeStr, 0)
+		}
+	}
+	return
+}
+
+var KEY_SEPARATOR string = " "

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message/message.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message/message.go b/rocketmq-go/model/message/message.go
index 633446c..1dcd365 100644
--- a/rocketmq-go/model/message/message.go
+++ b/rocketmq-go/model/message/message.go
@@ -255,7 +255,7 @@ func (msg *Message) removeProperty(k, v string) string {
 		delete(msg.properties, k)
 		return v
 	}
-	return nil
+	return ""
 }
 
 func (msg *Message) String() string {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message_ext.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message_ext.go b/rocketmq-go/model/message_ext.go
new file mode 100644
index 0000000..9a3aacb
--- /dev/null
+++ b/rocketmq-go/model/message_ext.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+	"math"
+	"strconv"
+	"time"
+)
+
+type MessageExt struct {
+	*Message
+	QueueId                   int32
+	StoreSize                 int32
+	QueueOffset               int64
+	SysFlag                   int32
+	BornTimestamp             int64
+	BornHost                  string
+	StoreTimestamp            int64
+	StoreHost                 string
+	MsgId                     string
+	CommitLogOffset           int64
+	BodyCRC                   int32
+	ReconsumeTimes            int32
+	PreparedTransactionOffset int64
+
+	propertyConsumeStartTimestamp string // race condition
+}
+
+func (self *MessageExt) GetOriginMessageId() string {
+	if self.Properties != nil {
+		originMessageId := self.Properties[constant.PROPERTY_ORIGIN_MESSAGE_ID]
+		if len(originMessageId) > 0 {
+			return originMessageId
+		}
+	}
+	return self.MsgId
+}
+
+func (self *MessageExt) GetConsumeStartTime() int64 {
+	if len(self.propertyConsumeStartTimestamp) > 0 {
+		return util.StrToInt64WithDefaultValue(self.propertyConsumeStartTimestamp, -1)
+	}
+	return math.MaxInt64
+}
+
+func (self *MessageExt) SetConsumeStartTime() {
+	if self.Properties == nil {
+		self.Properties = make(map[string]string)
+	}
+	nowTime := strconv.FormatInt(time.Now().UnixNano()/1000000, 10)
+	self.Properties[constant.PROPERTY_KEYS] = nowTime
+	self.propertyConsumeStartTimestamp = nowTime
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message_listener.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message_listener.go b/rocketmq-go/model/message_listener.go
new file mode 100644
index 0000000..7ad2054
--- /dev/null
+++ b/rocketmq-go/model/message_listener.go
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+type MessageListener func(msgs []MessageExt) ConsumeConcurrentlyResult

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/message_queue.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message_queue.go b/rocketmq-go/model/message_queue.go
new file mode 100644
index 0000000..27d70a6
--- /dev/null
+++ b/rocketmq-go/model/message_queue.go
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+type MessageQueue struct {
+	Topic      string `json:"topic"`
+	BrokerName string `json:"brokerName"`
+	QueueId    int32  `json:"queueId"`
+}
+
+func (self *MessageQueue) clone() *MessageQueue {
+	no := new(MessageQueue)
+	no.Topic = self.Topic
+	no.QueueId = self.QueueId
+	no.BrokerName = self.BrokerName
+	return no
+}
+
+type MessageQueues []*MessageQueue
+
+func (self MessageQueues) Less(i, j int) bool {
+	imq := self[i]
+	jmq := self[j]
+
+	if imq.Topic < jmq.Topic {
+		return true
+	} else if imq.Topic < jmq.Topic {
+		return false
+	}
+
+	if imq.BrokerName < jmq.BrokerName {
+		return true
+	} else if imq.BrokerName < jmq.BrokerName {
+		return false
+	}
+
+	if imq.QueueId < jmq.QueueId {
+		return true
+	} else {
+		return false
+	}
+}
+
+func (self MessageQueues) Swap(i, j int) {
+	self[i], self[j] = self[j], self[i]
+}
+
+func (self MessageQueues) Len() int {
+	return len(self)
+}
+
+func (self MessageQueue) Equals(messageQueue *MessageQueue) bool {
+	if self.QueueId != messageQueue.QueueId {
+		return false
+	}
+	if self.Topic != messageQueue.Topic {
+		return false
+	}
+	if self.BrokerName != messageQueue.BrokerName {
+		return false
+	}
+	return true
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/process_queue.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/process_queue.go b/rocketmq-go/model/process_queue.go
new file mode 100644
index 0000000..285cbda
--- /dev/null
+++ b/rocketmq-go/model/process_queue.go
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+	"github.com/emirpasic/gods/maps/treemap"
+	"github.com/golang/glog"
+	"sync"
+	"time"
+)
+
+type ProcessQueue struct {
+	msgTreeMap            *treemap.Map // int | MessageExt
+	msgCount              int
+	lockTreeMap           sync.RWMutex
+	locked                bool
+	lastPullTimestamp     time.Time
+	lastConsumeTimestamp  time.Time
+	lastLockTimestamp     time.Time
+	lockConsume           sync.RWMutex
+	consuming             bool
+	queueOffsetMax        int64
+	dropped               bool
+	msgAccCnt             int64 //accumulation message count
+	tryUnlockTimes        int64
+	msgTreeMapToBeConsume *treemap.Map
+}
+
+func NewProcessQueue() (processQueue *ProcessQueue) {
+	processQueue = new(ProcessQueue)
+	processQueue.dropped = false
+	processQueue.msgTreeMap = treemap.NewWithIntComparator()
+	processQueue.msgTreeMapToBeConsume = treemap.NewWithIntComparator()
+
+	return
+}
+func (self *ProcessQueue) GetMsgCount() int {
+	defer self.lockTreeMap.Unlock()
+	self.lockTreeMap.Lock()
+	return self.msgCount
+}
+
+func (self *ProcessQueue) Clear() {
+	defer self.lockTreeMap.Unlock()
+	self.lockTreeMap.Lock()
+	self.SetDrop(true)
+	self.msgTreeMap.Clear()
+	self.msgCount = 0
+	self.queueOffsetMax = 0
+
+}
+
+func (self *ProcessQueue) ChangeToProcessQueueInfo() (processQueueInfo ProcessQueueInfo) {
+	defer self.lockTreeMap.Unlock()
+	self.lockTreeMap.Lock()
+	processQueueInfo = ProcessQueueInfo{}
+	minOffset := -1
+	maxOffset := -1
+	minKey, _ := self.msgTreeMap.Min()
+	if minKey != nil {
+		minOffset = minKey.(int)
+	}
+	maxKey, _ := self.msgTreeMap.Max()
+	if maxKey != nil {
+		maxOffset = maxKey.(int)
+	}
+	processQueueInfo.CachedMsgCount = int32(self.msgCount)
+	processQueueInfo.CachedMsgMinOffset = int64(maxOffset)
+	processQueueInfo.CachedMsgMaxOffset = int64(minOffset)
+	//processQueueInfo.CommitOffset = -123 // todo
+	processQueueInfo.Droped = self.dropped
+	processQueueInfo.LastConsumeTimestamp = self.lastConsumeTimestamp.UnixNano()
+	processQueueInfo.LastPullTimestamp = self.lastPullTimestamp.UnixNano()
+	//processQueueInfo.
+
+	return
+}
+
+func (self *ProcessQueue) DeleteExpireMsg(queueOffset int) {
+	defer self.lockTreeMap.Unlock()
+	self.lockTreeMap.Lock()
+	key, _ := self.msgTreeMap.Min()
+	if key == nil {
+		return
+	}
+	offset := key.(int)
+	glog.Infof("look min key and offset  %d  %s", offset, queueOffset)
+	if queueOffset == offset {
+		self.msgTreeMap.Remove(queueOffset)
+		self.msgCount = self.msgTreeMap.Size()
+	}
+}
+
+func (self *ProcessQueue) GetMinMessageInTree() (offset int, messagePoint *MessageExt) {
+	defer self.lockTreeMap.Unlock()
+	self.lockTreeMap.Lock()
+	key, value := self.msgTreeMap.Min()
+	if key == nil || value == nil {
+		return
+	}
+	offset = key.(int)
+
+	message := value.(MessageExt)
+	messagePoint = &message
+	return
+}
+
+func (self *ProcessQueue) SetDrop(drop bool) {
+	self.dropped = drop
+}
+func (self *ProcessQueue) IsDropped() bool {
+	return self.dropped
+}
+func (self *ProcessQueue) GetMaxSpan() int {
+	defer self.lockTreeMap.Unlock()
+	self.lockTreeMap.Lock()
+	if self.msgTreeMap.Empty() {
+		return 0
+	}
+	minKey, _ := self.msgTreeMap.Min()
+	minOffset := minKey.(int)
+	maxKey, _ := self.msgTreeMap.Max()
+	maxOffset := maxKey.(int)
+	return maxOffset - minOffset
+}
+
+func (self *ProcessQueue) RemoveMessage(msgs []MessageExt) (offset int64) {
+	now := time.Now()
+	offset = -1
+	defer self.lockTreeMap.Unlock()
+	self.lockTreeMap.Lock()
+	self.lastConsumeTimestamp = now
+	if self.msgCount > 0 {
+		maxKey, _ := self.msgTreeMap.Max()
+		offset = int64(maxKey.(int)) + 1
+		for _, msg := range msgs {
+			self.msgTreeMap.Remove(int(msg.QueueOffset))
+		}
+		self.msgCount = self.msgTreeMap.Size()
+		if self.msgCount > 0 {
+			minKey, _ := self.msgTreeMap.Min()
+			offset = int64(minKey.(int))
+		}
+	}
+	return
+}
+
+func (self *ProcessQueue) PutMessage(msgs []MessageExt) (dispatchToConsume bool) {
+	dispatchToConsume = false
+	msgsLen := len(msgs)
+	if msgsLen == 0 {
+		return
+	}
+	defer self.lockTreeMap.Unlock()
+	self.lockTreeMap.Lock()
+
+	for _, msg := range msgs {
+		self.msgTreeMap.Put(int(msg.QueueOffset), msg)
+
+	}
+	self.msgCount = self.msgTreeMap.Size()
+	maxOffset, _ := self.msgTreeMap.Max()
+	self.queueOffsetMax = int64(maxOffset.(int))
+	if self.msgCount > 0 && !self.consuming {
+		dispatchToConsume = true
+		self.consuming = true
+	}
+	lastMsg := msgs[msgsLen-1]
+	remoteMaxOffset := util.StrToInt64WithDefaultValue(lastMsg.Properties[constant.PROPERTY_MAX_OFFSET], -1)
+	if remoteMaxOffset > 0 {
+		accTotal := remoteMaxOffset - lastMsg.QueueOffset
+		if accTotal > 0 {
+			self.msgAccCnt = accTotal
+		}
+	}
+	return
+}
+
+//func (self *ProcessQueue) TakeMessages(batchSize int) (messageToConsumeList  []MessageExt) {
+//	defer self.lockTreeMap.Unlock()
+//	self.lockTreeMap.Lock()
+//	self.lastConsumeTimestamp = time.Now()
+//	it := self.msgTreeMap.Iterator()
+//	nowIndex := 0
+//	for it.Next() {
+//		offset, message := it.Key(), it.Value()
+//		if (nowIndex >= batchSize) {
+//			break
+//		}
+//		self.msgTreeMap.Remove(offset)
+//		self.msgTreeMapToBeConsume.Put(offset, message)
+//		//messageToConsumeList = append(messageToConsumeList, message)
+//	}
+//	if (len(messageToConsumeList) == 0) {
+//		self.consuming = false
+//	}
+//	return
+//}
+
+/**
+#
+public final static long RebalanceLockMaxLiveTime =Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
+public final static long RebalanceLockInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
+#并发消费过期的
+        case CONSUME_PASSIVELY:
+                            pq.setDropped(true);
+                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+                                it.remove();
+                                changed = true;
+                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
+                                        consumerGroup, mq);
+                            }
+                            break;
+private final static long PullMaxIdleTime = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
+private final Logger log = ClientLogger.getLog();
+private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
+
+private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
+private final AtomicLong msgCount = new AtomicLong();
+private final Lock lockConsume = new ReentrantLock();
+private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
+private final AtomicLong tryUnlockTimes = new AtomicLong(0);
+private volatile long queueOffsetMax = 0L;
+private volatile boolean dropped = false;
+private volatile long lastPullTimestamp = System.currentTimeMillis();
+private volatile long lastConsumeTimestamp = System.currentTimeMillis();
+private volatile boolean locked = false;
+private volatile long lastLockTimestamp = System.currentTimeMillis();
+private volatile boolean consuming = false;
+private volatile long msgAccCnt = 0;
+
+  public boolean isLockExpired() {
+        boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > RebalanceLockMaxLiveTime;
+        return result;
+    }
+
+
+    public boolean isPullExpired() {
+        boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PullMaxIdleTime;
+        return result;
+    }
+
+param pushConsumer
+cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
+if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
+return;
+}
+
+int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
+for (int i = 0; i < loop; i++) {
+MessageExt msg = null;
+try {
+this.lockTreeMap.readLock().lockInterruptibly();
+try {
+if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
+msg = msgTreeMap.firstEntry().getValue();
+} else {
+
+break;
+}
+} finally {
+this.lockTreeMap.readLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("getExpiredMsg exception", e);
+}
+
+try {
+
+pushConsumer.sendMessageBack(msg, 3);
+log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
+try {
+msgTreeMap.remove(msgTreeMap.firstKey());
+} catch (Exception e) {
+log.error("send expired msg exception", e);
+}
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("getExpiredMsg exception", e);
+}
+} catch (Exception e) {
+log.error("send expired msg exception", e);
+}
+}
+}
+
+
+public boolean putMessage(final List<MessageExt> msgs) {
+boolean dispatchToConsume = false;
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+int validMsgCnt = 0;
+for (MessageExt msg : msgs) {
+MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
+if (null == old) {
+validMsgCnt++;
+this.queueOffsetMax = msg.getQueueOffset();
+}
+}
+msgCount.addAndGet(validMsgCnt);
+
+if (!msgTreeMap.isEmpty() && !this.consuming) {
+dispatchToConsume = true;
+this.consuming = true;
+}
+
+if (!msgs.isEmpty()) {
+MessageExt messageExt = msgs.get(msgs.size() - 1);
+String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
+if (property != null) {
+long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
+if (accTotal > 0) {
+this.msgAccCnt = accTotal;
+}
+}
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("putMessage exception", e);
+}
+
+return dispatchToConsume;
+}
+
+
+public long getMaxSpan() {
+try {
+this.lockTreeMap.readLock().lockInterruptibly();
+try {
+if (!this.msgTreeMap.isEmpty()) {
+return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
+}
+} finally {
+this.lockTreeMap.readLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("getMaxSpan exception", e);
+}
+
+return 0;
+}
+
+
+public long removeMessage(final List<MessageExt> msgs) { //treeMap是维护了没有消费的 为了处理过期使用
+long result = -1;
+final long now = System.currentTimeMillis();
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+this.lastConsumeTimestamp = now;
+try {
+if (!msgTreeMap.isEmpty()) {
+result = this.queueOffsetMax + 1;
+int removedCnt = 0;
+for (MessageExt msg : msgs) {
+MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
+if (prev != null) {
+removedCnt--;
+}
+}
+msgCount.addAndGet(removedCnt);
+
+if (!msgTreeMap.isEmpty()) {
+result = msgTreeMap.firstKey();
+}
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (Throwable t) {
+log.error("removeMessage exception", t);
+}
+
+return result;
+}
+
+
+public TreeMap<Long, MessageExt> getMsgTreeMap() {
+return msgTreeMap;
+}
+
+
+public AtomicLong getMsgCount() {
+return msgCount;
+}
+
+
+public boolean isDropped() {
+return dropped;
+}
+
+
+public void setDropped(boolean dropped) {
+this.dropped = dropped;
+}
+
+public boolean isLocked() {
+return locked;
+}
+
+public void setLocked(boolean locked) {
+this.locked = locked;
+}
+
+public void rollback() {
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+this.msgTreeMap.putAll(this.msgTreeMapTemp);
+this.msgTreeMapTemp.clear();
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("rollback exception", e);
+}
+}
+
+
+public long commit() {
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+Long offset = this.msgTreeMapTemp.lastKey();
+msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
+this.msgTreeMapTemp.clear();
+if (offset != null) {
+return offset + 1;
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("commit exception", e);
+}
+
+return -1;
+}
+
+
+public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+for (MessageExt msg : msgs) {
+this.msgTreeMapTemp.remove(msg.getQueueOffset());
+this.msgTreeMap.put(msg.getQueueOffset(), msg);
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("makeMessageToCosumeAgain exception", e);
+}
+}
+
+
+public List<MessageExt> takeMessags(final int batchSize) {
+List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
+final long now = System.currentTimeMillis();
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+this.lastConsumeTimestamp = now;
+try {
+if (!this.msgTreeMap.isEmpty()) {
+for (int i = 0; i < batchSize; i++) {
+Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
+if (entry != null) {
+result.add(entry.getValue());
+msgTreeMapTemp.put(entry.getKey(), entry.getValue());
+} else {
+break;
+}
+}
+}
+
+if (result.isEmpty()) {
+consuming = false;
+}
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("take Messages exception", e);
+}
+
+return result;
+}
+
+
+public boolean hasTempMessage() {
+try {
+this.lockTreeMap.readLock().lockInterruptibly();
+try {
+return !this.msgTreeMap.isEmpty();
+} finally {
+this.lockTreeMap.readLock().unlock();
+}
+} catch (InterruptedException e) {
+}
+
+return true;
+}
+
+
+public void clear() {
+try {
+this.lockTreeMap.writeLock().lockInterruptibly();
+try {
+this.msgTreeMap.clear();
+this.msgTreeMapTemp.clear();
+this.msgCount.set(0);
+this.queueOffsetMax = 0L;
+} finally {
+this.lockTreeMap.writeLock().unlock();
+}
+} catch (InterruptedException e) {
+log.error("rollback exception", e);
+}
+}
+
+
+
+
+public void setLastLockTimestamp(long lastLockTimestamp) {
+this.lastLockTimestamp = lastLockTimestamp;
+}
+
+
+public Lock getLockConsume() {
+return lockConsume;
+}
+
+
+
+
+public void setLastPullTimestamp(long lastPullTimestamp) {
+this.lastPullTimestamp = lastPullTimestamp;
+}
+
+
+public long getMsgAccCnt() {
+return msgAccCnt;
+}
+
+
+
+public long getTryUnlockTimes() {
+return this.tryUnlockTimes.get();
+}
+
+
+public void incTryUnlockTimes() {
+this.tryUnlockTimes.incrementAndGet();
+}
+
+
+public void fillProcessQueueInfo(final ProcessQueueInfo info) {
+try {
+this.lockTreeMap.readLock().lockInterruptibly();
+
+if (!this.msgTreeMap.isEmpty()) {
+info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
+info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
+info.setCachedMsgCount(this.msgTreeMap.size());
+}
+
+if (!this.msgTreeMapTemp.isEmpty()) {
+info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
+info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
+info.setTransactionMsgCount(this.msgTreeMapTemp.size());
+}
+
+info.setLocked(this.locked);
+info.setTryUnlockTimes(this.tryUnlockTimes.get());
+info.setLastLockTimestamp(this.lastLockTimestamp);
+
+info.setDroped(this.dropped);
+info.setLastPullTimestamp(this.lastPullTimestamp);
+info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
+} catch (Exception e) {
+} finally {
+this.lockTreeMap.readLock().unlock();
+}
+}
+
+
+
+*/

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/process_queue_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/process_queue_info.go b/rocketmq-go/model/process_queue_info.go
new file mode 100644
index 0000000..6bd71bd
--- /dev/null
+++ b/rocketmq-go/model/process_queue_info.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+type ProcessQueueInfo struct {
+	CommitOffset int64 `json:"commitOffset"`
+
+	CachedMsgMinOffset int64 `json:"cachedMsgMinOffset"`
+	CachedMsgMaxOffset int64 `json:"cachedMsgMaxOffset"`
+	CachedMsgCount     int32 `json:"cachedMsgCount"`
+
+	TransactionMsgMinOffset int64 `json:"transactionMsgMinOffset"`
+	TransactionMsgMaxOffset int64 `json:"transactionMsgMaxOffset"`
+	TransactionMsgCount     int32 `json:"transactionMsgCount"`
+
+	Locked            bool  `json:"locked"`
+	TryUnlockTimes    int64 `json:"tryUnlockTimes"`
+	LastLockTimestamp int64 `json:"lastLockTimestamp"`
+
+	Droped               bool  `json:"droped"`
+	LastPullTimestamp    int64 `json:"lastPullTimestamp"`
+	LastConsumeTimestamp int64 `json:"lastConsumeTimestamp"`
+}
+
+//func (self ProcessQueueInfo) BuildFromProcessQueue(processQueue ProcessQueue) (processQueueInfo ProcessQueueInfo) {
+//	processQueueInfo = ProcessQueueInfo{}
+//	//processQueueInfo.CommitOffset =
+//	processQueueInfo.CachedMsgCount = processQueue.GetMsgCount()
+//	processQueueInfo.CachedMsgCount
+//	return
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/pull_request.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/pull_request.go b/rocketmq-go/model/pull_request.go
new file mode 100644
index 0000000..bc1a46f
--- /dev/null
+++ b/rocketmq-go/model/pull_request.go
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+type PullRequest struct {
+	ConsumerGroup string
+	MessageQueue  *MessageQueue
+	ProcessQueue  *ProcessQueue
+	NextOffset    int64
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/reset_offset_body.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/reset_offset_body.go b/rocketmq-go/model/reset_offset_body.go
new file mode 100644
index 0000000..1a0221d
--- /dev/null
+++ b/rocketmq-go/model/reset_offset_body.go
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+import (
+	"encoding/json"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+	"github.com/golang/glog"
+)
+
+type ResetOffsetBody struct {
+	OffsetTable map[MessageQueue]int64 `json:"offsetTable"`
+}
+
+func (self *ResetOffsetBody) Decode(data []byte) (err error) {
+	self.OffsetTable = map[MessageQueue]int64{}
+	var kvMap map[string]string
+	kvMap, err = util.GetKvStringMap(string(data))
+	if err != nil {
+		return
+	}
+	glog.Info(kvMap)
+	kvMap, err = util.GetKvStringMap(kvMap["\"offsetTable\""])
+	if err != nil {
+		return
+	}
+	for k, v := range kvMap {
+		messageQueue := &MessageQueue{}
+		var offset int64
+		err = json.Unmarshal([]byte(k), messageQueue)
+		if err != nil {
+			return
+		}
+		offset, err = util.StrToInt64(v)
+		if err != nil {
+			return
+		}
+		self.OffsetTable[*messageQueue] = offset
+	}
+	return
+}