You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/05/07 03:58:58 UTC

[2/3] incubator-rocketmq-externals git commit: Go-Client remoting and RocketMqClient common method implement, closes apache/incubator-rocketmq-externals#17

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/response_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/response_code.go b/rocketmq-go/model/response_code.go
index 957c1e9..ed40a6d 100644
--- a/rocketmq-go/model/response_code.go
+++ b/rocketmq-go/model/response_code.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
 package model
 
 const (

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/send_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/send_result.go b/rocketmq-go/model/send_result.go
index b1af4fb..4d3b31f 100644
--- a/rocketmq-go/model/send_result.go
+++ b/rocketmq-go/model/send_result.go
@@ -52,7 +52,7 @@ func NewSendResult(status SendStatus, msgID, offsetID string, queue *message.Mes
 }
 
 func EncoderSendResultToJson(obj interface{}) string {
-	return nil // TODO
+	return "" // TODO
 }
 
 func DecoderSendResultFromJson(json string) *SendResult {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/subscription_data.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/subscription_data.go b/rocketmq-go/model/subscription_data.go
new file mode 100644
index 0000000..ce5ae74
--- /dev/null
+++ b/rocketmq-go/model/subscription_data.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+type SubscriptionData struct {
+	Topic           string
+	SubString       string
+	ClassFilterMode bool
+	TagsSet         []string
+	CodeSet         []string
+	SubVersion      int64
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_publishInfo.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publishInfo.go b/rocketmq-go/model/topic_publishInfo.go
index b2f711b..b5f9e37 100644
--- a/rocketmq-go/model/topic_publishInfo.go
+++ b/rocketmq-go/model/topic_publishInfo.go
@@ -18,59 +18,59 @@
 package model
 
 import (
-	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+//"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
 )
 
-type TopicPublishInfo struct {
-	orderTopic         bool
-	havaTopicRouteInfo bool
-	messageQueueList   []*message.MessageQueue
-	topicRouteData     *TopicRouteData
-}
-
-func (info *TopicPublishInfo) SetOrderTopic(b bool) {
-	info.orderTopic = b
-}
-
-func (info *TopicPublishInfo) Ok() bool {
-	return false
-}
-
-func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue {
-	return info.messageQueueList
-}
-
-func (info *TopicPublishInfo) HaveTopicRouteInfo() bool {
-	return info.havaTopicRouteInfo
-}
-
-func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) {
-	info.havaTopicRouteInfo = b
-}
-
-func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData {
-	return info.topicRouteData
-}
-
-func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) {
-	info.topicRouteData = routeDate
-}
-
-func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue {
-	return nil //TODO
-}
-
-func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName string) *message.MessageQueue {
-	if brokerName == "" {
-		return info.SelectOneMessageQueue()
-	}
-	return nil //TODO
-}
-
-func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int {
-	return nil //TODO
-}
-
-func (info *TopicPublishInfo) String() string {
-	return nil
-}
+//type TopicPublishInfo struct {
+//	orderTopic         bool
+//	havaTopicRouteInfo bool
+//	messageQueueList   []*message.MessageQueue
+//	topicRouteData     *TopicRouteData
+//}
+//
+//func (info *TopicPublishInfo) SetOrderTopic(b bool) {
+//	info.orderTopic = b
+//}
+//
+//func (info *TopicPublishInfo) Ok() bool {
+//	return false
+//}
+//
+//func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue {
+//	return info.messageQueueList
+//}
+//
+//func (info *TopicPublishInfo) HaveTopicRouteInfo() bool {
+//	return info.havaTopicRouteInfo
+//}
+//
+//func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) {
+//	info.havaTopicRouteInfo = b
+//}
+//
+//func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData {
+//	return info.topicRouteData
+//}
+//
+//func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) {
+//	info.topicRouteData = routeDate
+//}
+//
+//func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue {
+//	return nil //TODO
+//}
+//
+//func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName string) *message.MessageQueue {
+//	if brokerName == "" {
+//		return info.SelectOneMessageQueue()
+//	}
+//	return nil //TODO
+//}
+//
+//func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int {
+//	return 0 //TODO
+//}
+//
+//func (info *TopicPublishInfo) String() string {
+//	return ""
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_publish_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publish_info.go b/rocketmq-go/model/topic_publish_info.go
new file mode 100644
index 0000000..14ec088
--- /dev/null
+++ b/rocketmq-go/model/topic_publish_info.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package model
+
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"sync/atomic"
+)
+
+type TopicPublishInfo struct {
+	OrderTopic             bool
+	HaveTopicRouterInfo    bool
+	MessageQueueList       []MessageQueue
+	TopicRouteDataInstance *TopicRouteData
+	topicQueueIndex        int32
+}
+
+//private boolean orderTopic = false;
+//private boolean haveTopicRouterInfo = false;
+//private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
+//private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0); // todo
+//private TopicRouteData topicRouteData;
+
+func (self *TopicPublishInfo) JudgeTopicPublishInfoOk() (bIsTopicOk bool) {
+	bIsTopicOk = (len(self.MessageQueueList) > 0)
+	return
+}
+func (self *TopicPublishInfo) FetchQueueIndex() (index int) {
+	qLen := len(self.MessageQueueList)
+	if qLen > 0 {
+		qIndex := atomic.AddInt32(&self.topicQueueIndex, 1)
+		qIndex = qIndex % int32(qLen)
+		index = int(qIndex)
+	}
+	return
+}
+func BuildTopicSubscribeInfoFromRoteData(topic string, topicRouteData *TopicRouteData) (mqList []*MessageQueue) {
+	mqList = make([]*MessageQueue, 0)
+	for _, queueData := range topicRouteData.QueueDatas {
+		if !constant.ReadAble(queueData.Perm) {
+			continue
+		}
+		var i int32
+		for i = 0; i < queueData.ReadQueueNums; i++ {
+			mq := &MessageQueue{
+				Topic:      topic,
+				BrokerName: queueData.BrokerName,
+				QueueId:    i,
+			}
+			mqList = append(mqList, mq)
+		}
+	}
+	return
+}
+
+func BuildTopicPublishInfoFromTopicRoteData(topic string, topicRouteData *TopicRouteData) (topicPublishInfo *TopicPublishInfo) {
+	// all order topic is false  todo change
+	topicPublishInfo = &TopicPublishInfo{
+		TopicRouteDataInstance: topicRouteData,
+		OrderTopic:             false,
+		MessageQueueList:       []MessageQueue{}}
+	for _, queueData := range topicRouteData.QueueDatas {
+		if !constant.WriteAble(queueData.Perm) {
+			continue
+		}
+		for _, brokerData := range topicRouteData.BrokerDatas {
+			if brokerData.BrokerName == queueData.BrokerName {
+				if len(brokerData.BrokerAddrs["0"]) == 0 {
+					break
+				}
+				var i int32
+				for i = 0; i < queueData.WriteQueueNums; i++ {
+					messageQueue := MessageQueue{Topic: topic, BrokerName: queueData.BrokerName, QueueId: i}
+					topicPublishInfo.MessageQueueList = append(topicPublishInfo.MessageQueueList, messageQueue)
+					topicPublishInfo.HaveTopicRouterInfo = true
+				}
+				break
+			}
+		}
+	}
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/model/topic_route_data.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_route_data.go b/rocketmq-go/model/topic_route_data.go
index f387529..348479f 100644
--- a/rocketmq-go/model/topic_route_data.go
+++ b/rocketmq-go/model/topic_route_data.go
@@ -18,108 +18,128 @@
 package model
 
 import (
-	"fmt"
-	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+	//"fmt"
+	//"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
+	"sync"
 )
 
-type BrokerData struct {
-}
+//
+//type BrokerData struct {
+//}
+//
+//type TopicRouteData struct {
+//	orderTopicConf    string
+//	queueDatas        []*message.MessageQueue
+//	brokerDatas       []*BrokerData
+//	filterServerTable map[string][]string
+//}
+//
+//func NewTopicRouteData() *TopicRouteData {
+//	return &TopicRouteData{}
+//}
+//
+//func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData *TopicRouteData) {
+//	clonedRouteData = &TopicRouteData{
+//		route.orderTopicConf,
+//		route.queueDatas,
+//		route.brokerDatas,
+//		route.filterServerTable,
+//	}
+//	// TODO: to complete
+//	return
+//}
+//
+//func (route *TopicRouteData) QueueDatas() []*message.MessageQueue {
+//	return route.queueDatas
+//}
+//
+//func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) {
+//	route.queueDatas = data
+//}
+//
+//func (route *TopicRouteData) BrokerDatas() []*BrokerData {
+//	return route.brokerDatas
+//}
+//
+//func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) {
+//	route.brokerDatas = data
+//}
+//
+//func (route *TopicRouteData) FilterServerTable() map[string][]string {
+//	return route.filterServerTable
+//}
+//
+//func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) {
+//	route.filterServerTable = data
+//}
+//
+//func (route *TopicRouteData) OrderTopicConf() string {
+//	return route.orderTopicConf
+//}
+//
+//func (route *TopicRouteData) SetOrderTopicConf(s string) {
+//	route.orderTopicConf = s
+//}
+//
+//func (route *TopicRouteData) HashCode() (result int) {
+//	prime := 31
+//	result = 1
+//	result *= prime
+//	// TODO
+//
+//	return
+//}
+//
+//func (route *TopicRouteData) Equals(route1 interface{}) bool {
+//	if route == nil {
+//		return true
+//	}
+//	if route1 == nil {
+//		return false
+//	}
+//	//value, ok := route1.(TopicRouteData)
+//	//if !ok {
+//	//	return false
+//	//}
+//	// TODO
+//	//if route.brokerDatas == nil && value.brokerDatas != nil || len(route.brokerDatas) != len(value.brokerDatas) {
+//	//	return false
+//	//}
+//	//
+//	//if route.orderTopicConf == "" && value.orderTopicConf != "" || route.orderTopicConf != value.orderTopicConf {
+//	//	return false
+//	//}
+//	//
+//	//if route.queueDatas == nil && value.queueDatas != nil || route.queueDatas != value.queueDatas {
+//	//	return false
+//	//}
+//	//
+//	//if route.filterServerTable == nil && value.filterServerTable != nil ||
+//	//	route.filterServerTable != value.filterServerTable {
+//	//	return false
+//	//}
+//	return true
+//}
+//
+//func (route *TopicRouteData) String() string {
+//	return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, brokerDatas=%s, filterServerTable=%s]",
+//		route.orderTopicConf, route.queueDatas, route.brokerDatas, route.filterServerTable)
+//}
 
 type TopicRouteData struct {
-	orderTopicConf    string
-	queueDatas        []*message.MessageQueue
-	brokerDatas       []*BrokerData
-	filterServerTable map[string][]string
-}
-
-func NewTopicRouteData() *TopicRouteData {
-	return &TopicRouteData{}
-}
-
-func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData *TopicRouteData) {
-	clonedRouteData = &TopicRouteData{
-		route.orderTopicConf,
-		route.queueDatas,
-		route.brokerDatas,
-		route.filterServerTable,
-	}
-	// TODO: to complete
-	return
-}
-
-func (route *TopicRouteData) QueueDatas() []*message.MessageQueue {
-	return route.queueDatas
-}
-
-func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) {
-	route.queueDatas = data
-}
-
-func (route *TopicRouteData) BrokerDatas() []*BrokerData {
-	return route.brokerDatas
-}
-
-func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) {
-	route.brokerDatas = data
+	OrderTopicConf string
+	QueueDatas     []*QueueData
+	BrokerDatas    []*BrokerData
 }
-
-func (route *TopicRouteData) FilterServerTable() map[string][]string {
-	return route.filterServerTable
-}
-
-func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) {
-	route.filterServerTable = data
-}
-
-func (route *TopicRouteData) OrderTopicConf() string {
-	return route.orderTopicConf
-}
-
-func (route *TopicRouteData) SetOrderTopicConf(s string) {
-	route.orderTopicConf = s
+type QueueData struct {
+	BrokerName     string
+	ReadQueueNums  int32
+	WriteQueueNums int32
+	Perm           int32
+	TopicSynFlag   int32
 }
-
-func (route *TopicRouteData) HashCode() (result int) {
-	prime := 31
-	result = 1
-	result *= prime
-	// TODO
-
-	return
-}
-
-func (route *TopicRouteData) Equals(route1 interface{}) bool {
-	if route == nil {
-		return true
-	}
-	if route1 == nil {
-		return false
-	}
-	//value, ok := route1.(TopicRouteData)
-	//if !ok {
-	//	return false
-	//}
-	// TODO
-	//if route.brokerDatas == nil && value.brokerDatas != nil || len(route.brokerDatas) != len(value.brokerDatas) {
-	//	return false
-	//}
-	//
-	//if route.orderTopicConf == "" && value.orderTopicConf != "" || route.orderTopicConf != value.orderTopicConf {
-	//	return false
-	//}
-	//
-	//if route.queueDatas == nil && value.queueDatas != nil || route.queueDatas != value.queueDatas {
-	//	return false
-	//}
-	//
-	//if route.filterServerTable == nil && value.filterServerTable != nil ||
-	//	route.filterServerTable != value.filterServerTable {
-	//	return false
-	//}
-	return true
-}
-
-func (route *TopicRouteData) String() string {
-	return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, brokerDatas=%s, filterServerTable=%s]",
-		route.orderTopicConf, route.queueDatas, route.brokerDatas, route.filterServerTable)
+type BrokerData struct {
+	BrokerName      string
+	BrokerAddrs     map[string]string
+	BrokerAddrsLock sync.RWMutex
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_client_manager.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_client_manager.go b/rocketmq-go/mq_client_manager.go
index c07dcfd..731158f 100644
--- a/rocketmq-go/mq_client_manager.go
+++ b/rocketmq-go/mq_client_manager.go
@@ -16,13 +16,67 @@
  */
 package rocketmq
 
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+	"sync"
+	"time"
+)
 
 type MqClientManager struct {
 	clientFactory          *ClientFactory
 	rocketMqClient         service.RocketMqClient
 	pullMessageController  *PullMessageController
 	defaultProducerService RocketMQProducer //for send back message
+
+	rocketMqManagerLock sync.Mutex
+	//ClientId            string
+	BootTimestamp int64
+
+	NamesrvLock   sync.Mutex
+	HeartBeatLock sync.Mutex
+	//rebalanceControllr       *RebalanceController
+}
+
+type MqClientConfig struct {
+}
+
+func NewMqClientManager(clientConfig *MqClientConfig) (rocketMqManager *MqClientManager) {
+	rocketMqManager = &MqClientManager{}
+	rocketMqManager.BootTimestamp = time.Now().Unix()
+	rocketMqManager.clientFactory = clientFactoryInit()
+	//rocketMqManager.rocketMqClient =
+	//rocketMqManager.pullMessageController = NewPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
+	//rocketMqManager.cleanExpireMsgController = NewCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
+	//rocketMqManager.rebalanceControllr = NewRebalanceController(rocketMqManager.clientFactory)
+
+	return
+}
+
+func (self *MqClientManager) RegisterProducer(producer *DefaultMQProducer) {
+	return
+}
+
+func (self *MqClientManager) RegisterConsumer(consumer RocketMQConsumer) {
+	// todo check config
+	//if (self.defaultProducerService == nil) {
+	//	self.defaultProducerService = service.NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, mq_config.NewProducerConfig(), self.mqClient)
+	//}
+	return
+}
+
+func (self *MqClientManager) Start() {
+	//self.SendHeartbeatToAllBrokerWithLock()//we should send heartbeat first
+	self.startAllScheduledTask()
+}
+func (manager *MqClientManager) startAllScheduledTask() {
+
+}
+
+func clientFactoryInit() (clientFactory *ClientFactory) {
+	clientFactory = &ClientFactory{}
+	clientFactory.ProducerTable = make(map[string]RocketMQProducer)
+	clientFactory.ConsumerTable = make(map[string]RocketMQConsumer)
+	return
 }
 
 type ClientFactory struct {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_consumer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_consumer.go b/rocketmq-go/mq_consumer.go
index 7712ee1..7112537 100644
--- a/rocketmq-go/mq_consumer.go
+++ b/rocketmq-go/mq_consumer.go
@@ -16,7 +16,10 @@
  */
 package rocketmq
 
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+)
 
 type RocketMQConsumer interface {
 }
@@ -28,5 +31,44 @@ type DefaultMQPushConsumer struct {
 	mqClient              service.RocketMqClient
 	rebalance             *service.Rebalance //Rebalance's impl depend on offsetStore
 	consumeMessageService service.ConsumeMessageService
-	ConsumerConfig        *MqConsumerConfig
+	consumerConfig        *MqConsumerConfig
+
+	consumerGroup string
+	//consumeFromWhere      string
+	consumeType  string
+	messageModel string
+	unitMode     bool
+
+	subscription    map[string]string   //topic|subExpression
+	subscriptionTag map[string][]string // we use it filter again
+	// 分配策略
+	pause bool //when reset offset we need pause
+}
+
+func NewDefaultMQPushConsumer(consumerGroup string, mqConsumerConfig *MqConsumerConfig) (defaultMQPushConsumer *DefaultMQPushConsumer) {
+	defaultMQPushConsumer = &DefaultMQPushConsumer{}
+	defaultMQPushConsumer.consumerConfig = mqConsumerConfig
+	return
+}
+
+func (self *DefaultMQPushConsumer) RegisterMessageListener(messageListener model.MessageListener) {
+	self.consumeMessageService = service.NewConsumeMessageConcurrentlyServiceImpl(messageListener)
+}
+func (self *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) {
+	//self.subscription[topic] = subExpression
+	//if len(subExpression) == 0 || subExpression == "*" {
+	//	return
+	//}
+	//tags := strings.Split(subExpression, "||")
+	//tagsList := []string{}
+	//for _, tag := range tags {
+	//	t := strings.TrimSpace(tag)
+	//	if len(t) == 0 {
+	//		continue
+	//	}
+	//	tagsList = append(tagsList, t)
+	//}
+	//if len(tagsList) > 0 {
+	//	self.subscriptionTag[topic] = tagsList
+	//}
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/mq_producer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_producer.go b/rocketmq-go/mq_producer.go
index 3677939..d1a011b 100644
--- a/rocketmq-go/mq_producer.go
+++ b/rocketmq-go/mq_producer.go
@@ -25,7 +25,7 @@ type MqProducerConfig struct {
 }
 
 type DefaultMQProducer struct {
-	producerGroup   string
-	ProducerConfig  *MqProducerConfig
-	producerService service.ProducerService
+	producerGroup    string
+	mqProducerConfig *MqProducerConfig
+	producerService  service.ProducerService
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/custom_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/custom_header.go b/rocketmq-go/remoting/custom_header.go
index 40feade..04a46be 100644
--- a/rocketmq-go/remoting/custom_header.go
+++ b/rocketmq-go/remoting/custom_header.go
@@ -18,4 +18,5 @@ package remoting
 
 type CustomerHeader interface {
 	FromMap(headerMap map[string]interface{})
+	//ToMap()(headerMap map[string]interface{})
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/event_executor.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/event_executor.go b/rocketmq-go/remoting/event_executor.go
deleted file mode 100644
index 38e1ee6..0000000
--- a/rocketmq-go/remoting/event_executor.go
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package remoting
-
-import (
-	"fmt"
-	"github.com/golang/glog"
-	"net"
-	"sync"
-)
-
-type Runnable interface {
-	Run()
-}
-
-type NetEventType int
-
-const (
-	Connect NetEventType = iota
-	Close
-	Idle
-	Exception // TODO error?
-)
-
-type NetEvent struct {
-	eType         NetEventType
-	remoteAddress string
-	conn          net.Conn
-}
-
-func NewEventType(eType NetEventType, remoteAddr string, conn net.Conn) *NetEvent {
-	return &NetEvent{eType, remoteAddr, conn}
-}
-
-func (event *NetEvent) Type() NetEventType {
-	return event.eType
-}
-
-func (event *NetEvent) RemoteAddress() string {
-	return event.remoteAddress
-}
-
-func (event *NetEvent) Conn() net.Conn {
-	return event.conn
-}
-
-func (event *NetEvent) String() string {
-	return fmt.Sprintf("NettyEvent [type=%s, remoteAddr=%s, channel=%s]",
-		event.eType, event.remoteAddress, event.conn)
-}
-
-type NetEventExecutor struct {
-	hasNotified bool
-	running     bool
-	stopped     chan int
-	mu          sync.RWMutex // TODO need init?
-	client      *RemotingClient
-
-	eventQueue chan *NetEvent
-	maxSize    int
-}
-
-func NewNetEventExecutor(client *RemotingClient) *NetEventExecutor {
-	return &NetEventExecutor{
-		hasNotified: false,
-		running:     false,
-		stopped:     make(chan int),
-		client:      client,
-		eventQueue:  make(chan *NetEvent, 100), // TODO confirm size
-		maxSize:     10000,
-	}
-}
-
-func (executor *NetEventExecutor) Start() {
-	go executor.run()
-}
-
-func (executor *NetEventExecutor) Shutdown() {
-	executor.stopped <- 0
-}
-
-func (executor *NetEventExecutor) PutEvent(event *NetEvent) {
-	if len(executor.eventQueue) <= executor.maxSize {
-		executor.eventQueue <- event //append(executor.eventQueue, event)
-	} else {
-		fmt.Sprintf("event queue size[%s] enough, so drop this event %s", len(executor.eventQueue), event.String())
-	}
-}
-
-func (executor *NetEventExecutor) ServiceName() string {
-	// TODO
-	return nil
-}
-
-func (executor *NetEventExecutor) run() {
-	glog.Infof("%s service started", executor.ServiceName())
-
-	executor.mu.Lock()
-	executor.running = true
-	executor.mu.Unlock()
-
-	listener := executor.client.ConnEventListener()
-	for executor.running { // TODO optimize
-		select {
-		case event := <-executor.eventQueue:
-			if event != nil && listener != nil {
-				switch event.Type() {
-				case Connect:
-					listener.OnConnConnect(event.remoteAddress, event.Conn())
-				case Close:
-					listener.OnConnClose(event.remoteAddress, event.Conn())
-				case Idle:
-					listener.OnConnIdle(event.remoteAddress, event.Conn())
-				case Exception:
-					listener.OnConnException(event.remoteAddress, event.Conn())
-				default:
-					break
-				}
-			}
-		case <-executor.stopped:
-			executor.mu.Lock()
-			executor.running = false
-			executor.mu.Unlock()
-			break
-		}
-	}
-
-	glog.Infof("%s service exit.", executor.ServiceName())
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/json_serializable.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/json_serializable.go b/rocketmq-go/remoting/json_serializable.go
new file mode 100644
index 0000000..c2c5ea0
--- /dev/null
+++ b/rocketmq-go/remoting/json_serializable.go
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+import (
+	"encoding/json"
+)
+
+type JsonSerializer struct {
+}
+
+func (self *JsonSerializer) EncodeHeaderData(command *RemotingCommand) []byte {
+	buf, err := json.Marshal(command)
+	if err != nil {
+		return nil
+	}
+	return buf
+}
+func (self *JsonSerializer) DecodeRemoteCommand(header, body []byte) *RemotingCommand {
+	cmd := &RemotingCommand{}
+	cmd.ExtFields = make(map[string]interface{})
+	err := json.Unmarshal(header, cmd)
+	if err != nil {
+		return nil
+	}
+	cmd.Body = body
+	return cmd
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/remoting_client.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/remoting_client.go b/rocketmq-go/remoting/remoting_client.go
index 38685cb..206fdcf 100644
--- a/rocketmq-go/remoting/remoting_client.go
+++ b/rocketmq-go/remoting/remoting_client.go
@@ -14,305 +14,357 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-
 package remoting
 
 import (
+	"bytes"
+	"encoding/binary"
 	"errors"
-	"fmt"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
 	"github.com/golang/glog"
 	"math/rand"
 	"net"
+	"strconv"
+	"strings"
 	"sync"
 	"time"
 )
 
-type ConnEventListener interface {
-	OnConnConnect(remoteAddress string, conn net.Conn)
-	OnConnClose(remoteAddress string, conn net.Conn)
-	OnConnIdle(remoteAddress string, conn net.Conn)
-	OnConnException(remoteAddress string, conn net.Conn)
+type RemotingClient interface {
+	InvokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err error)
+	InvokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error
+	InvokeOneWay(addr string, request *RemotingCommand, timeoutMillis int64) error
 }
-
-type NetRequestProcessor struct {
+type DefalutRemotingClient struct {
+	clientId     string
+	clientConfig *config.ClientConfig
+
+	connTable     map[string]net.Conn
+	connTableLock sync.RWMutex
+
+	responseTable  util.ConcurrentMap //map[int32]*ResponseFuture
+	processorTable util.ConcurrentMap //map[int]ClientRequestProcessor //requestCode|ClientRequestProcessor
+	//	protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
+	//new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
+	namesrvAddrList          []string
+	namesrvAddrSelectedAddr  string
+	namesrvAddrSelectedIndex int                    //how to chose. done
+	namesvrLockRW            sync.RWMutex           //
+	clientRequestProcessor   ClientRequestProcessor //mange register the processor here
+	serializerHandler        SerializerHandler      //rocketmq encode decode
 }
 
-type NetConfig struct {
-	clientWorkerNumber           int
-	clientCallbackExecutorNumber int
-	clientOneWaySemaphoreValue   int
-	clientAsyncSemaphoreValue    int
-	connectTimeoutMillis         time.Duration
-	channelNotActiveInterval     time.Duration
-
-	clientChannelMaxIdleTimeSeconds    time.Duration
-	clientSocketSndBufSize             int
-	clientSocketRcvBufSize             int
-	clientPooledByteBufAllocatorEnable bool
-	clientCloseSocketIfTimeout         bool
+func RemotingClientInit(clientConfig *config.ClientConfig, clientRequestProcessor ClientRequestProcessor) (client *DefalutRemotingClient) {
+	client = &DefalutRemotingClient{}
+	client.connTable = map[string]net.Conn{}
+	client.responseTable = util.New()
+	client.clientConfig = clientConfig
+
+	client.namesrvAddrList = strings.Split(clientConfig.NameServerAddress(), ";")
+	client.namesrvAddrSelectedIndex = -1
+	client.clientRequestProcessor = clientRequestProcessor
+	client.serializerHandler = NewSerializerHandler()
+	return
 }
 
-type Pair struct {
-	o1 *NetRequestProcessor
-	o2 *ExecutorService
+func (self *DefalutRemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err error) {
+	var conn net.Conn
+	conn, err = self.GetOrCreateConn(addr)
+	response := &ResponseFuture{
+		SendRequestOK:  false,
+		Opaque:         request.Opaque,
+		TimeoutMillis:  timeoutMillis,
+		BeginTimestamp: time.Now().Unix(),
+		Done:           make(chan bool),
+	}
+	header := self.serializerHandler.EncodeHeader(request)
+	body := request.Body
+	self.SetResponse(request.Opaque, response)
+	err = self.sendRequest(header, body, conn, addr)
+	if err != nil {
+		glog.Error(err)
+		return
+	}
+	select {
+	case <-response.Done:
+		remotingCommand = response.ResponseCommand
+		return
+	case <-time.After(time.Duration(timeoutMillis) * time.Millisecond):
+		err = errors.New("invoke sync timeout:" + strconv.FormatInt(timeoutMillis, 10) + " Millisecond")
+		return
+	}
 }
-
-type RemotingClient struct {
-	semaphoreOneWay         sync.Mutex // TODO right? use chan?
-	semaphoreAsync          sync.Mutex
-	processorTable          map[int]*Pair
-	netEventExecutor        *NetEventExecutor
-	defaultRequestProcessor *Pair
-
-	config             NetConfig
-	connTable          map[string]net.Conn
-	connTableLock      sync.RWMutex
-	timer              *time.Timer
-	namesrvAddrList    []string
-	namesrvAddrChoosed string
-
-	callBackExecutor  *ExecutorService
-	listener          ConnEventListener
-	rpcHook           RPCHook
-	responseTable     map[int32]*ResponseFuture
-	responseTableLock sync.RWMutex
+func (self *DefalutRemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error {
+	conn, err := self.GetOrCreateConn(addr)
+	if err != nil {
+		return err
+	}
+	response := &ResponseFuture{
+		SendRequestOK:  false,
+		Opaque:         request.Opaque,
+		TimeoutMillis:  timeoutMillis,
+		BeginTimestamp: time.Now().Unix(),
+		InvokeCallback: invokeCallback,
+	}
+	self.SetResponse(request.Opaque, response)
+	header := self.serializerHandler.EncodeHeader(request)
+	body := request.Body
+	err = self.sendRequest(header, body, conn, addr)
+	if err != nil {
+		glog.Error(err)
+		return err
+	}
+	return err
 }
-
-type ConnHandlerContext struct {
+func (self *DefalutRemotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeoutMillis int64) error {
+	conn, err := self.GetOrCreateConn(addr)
+	if err != nil {
+		return err
+	}
+	header := self.serializerHandler.EncodeHeader(request)
+	body := request.Body
+	err = self.sendRequest(header, body, conn, addr)
+	if err != nil {
+		glog.Error(err)
+		return err
+	}
+	return err
 }
 
-type ExecutorService struct {
-	callBackChannel chan func()
-	quit            chan bool
+func (self *DefalutRemotingClient) sendRequest(header, body []byte, conn net.Conn, addr string) error {
+	var requestBytes []byte
+	requestBytes = append(requestBytes, header...)
+	if body != nil && len(body) > 0 {
+		requestBytes = append(requestBytes, body...)
+	}
+	_, err := conn.Write(requestBytes)
+	if err != nil {
+		glog.Error(err)
+		if len(addr) > 0 {
+			self.ReleaseConn(addr, conn)
+		}
+		return err
+	}
+	return nil
 }
-
-func (exec *ExecutorService) submit(callback func()) {
-	exec.callBackChannel <- callback
+func (self *DefalutRemotingClient) GetNamesrvAddrList() []string {
+	return self.namesrvAddrList
 }
 
-func (exec *ExecutorService) run() {
-	go func() {
-		glog.Info("Callback Executor routing start.")
-		for {
-			select {
-			case invoke := <-exec.callBackChannel:
-				invoke()
-			case <-exec.quit:
-				break
-			}
-		}
-		glog.Info("Callback Executor routing quit.")
-	}()
+func (self *DefalutRemotingClient) SetResponse(index int32, response *ResponseFuture) {
+	self.responseTable.Set(strconv.Itoa(int(index)), response)
 }
-
-func NewRemotingClient(cfg NetConfig) *RemotingClient {
-	client := &RemotingClient{
-		config:        cfg,
-		connTable:     make(map[string]net.Conn),
-		timer:         time.NewTimer(10 * time.Second),
-		responseTable: make(map[int32]*ResponseFuture),
+func (self *DefalutRemotingClient) getResponse(index int32) (response *ResponseFuture, err error) {
+	obj, ok := self.responseTable.Get(strconv.Itoa(int(index)))
+	if !ok {
+		err = errors.New("get conn from responseTable error")
+		return
 	}
-	// java: super(xxxx)
-	return client
+	response = obj.(*ResponseFuture)
+	return
 }
-
-func (rc *RemotingClient) PutNetEvent(event *NetEvent) {
-	rc.netEventExecutor.PutEvent(event)
+func (self *DefalutRemotingClient) removeResponse(index int32) {
+	self.responseTable.Remove(strconv.Itoa(int(index)))
 }
-
-func (rc *RemotingClient) executeInvokeCallback(future *ResponseFuture) {
-	executor := rc.CallbackExecutor()
-	if executor != nil {
-		executor.submit(func() {
-			future.invokeCallback(future)
-		})
+func (self *DefalutRemotingClient) GetOrCreateConn(address string) (conn net.Conn, err error) {
+	if len(address) == 0 {
+		conn, err = self.getNamesvrConn()
 		return
 	}
-	future.executeInvokeCallback()
-}
-
-// check timeout future
-func (rc *RemotingClient) scanResponseTable() {
-	rfMap := make(map[int]*ResponseFuture)
-	for k, future := range rc.responseTable { // TODO safety?
-		if int64(future.beginTimestamp)+int64(future.timeoutMillis)+1e9 <= time.Now().Unix() {
-			future.Done()
-			delete(rc.responseTable, k)
-			rfMap[int(k)] = future
-			glog.Warningf("remove timeout request, ", future.String())
-		}
+	conn = self.GetConn(address)
+	if conn != nil {
+		return
 	}
-
-	go func() {
-		for _, future := range rfMap {
-			rc.executeInvokeCallback(future) // TODO if still failed, how to deal with the message ?
-		}
-	}()
+	conn, err = self.CreateConn(address)
+	return
 }
-
-func (rc *RemotingClient) CallbackExecutor() *ExecutorService {
-	return rc.callBackExecutor
-}
-
-func (rc *RemotingClient) RPCHook() RPCHook {
-	return rc.rpcHook
+func (self *DefalutRemotingClient) GetConn(address string) (conn net.Conn) {
+	self.connTableLock.RLock()
+	conn = self.connTable[address]
+	self.connTableLock.RUnlock()
+	return
 }
-
-func (rc *RemotingClient) ConnEventListener() ConnEventListener {
-	return rc.listener
-}
-
-func (rc *RemotingClient) invokeSync(addr string, request *RemotingCommand,
-	timeout time.Duration) (*RemotingCommand, error) {
-	conn := rc.getAndCreateConn(addr)
+func (self *DefalutRemotingClient) CreateConn(address string) (conn net.Conn, err error) {
+	defer self.connTableLock.Unlock()
+	self.connTableLock.Lock()
+	conn = self.connTable[address]
 	if conn != nil {
-		if rc.rpcHook != nil {
-			rc.rpcHook.DoBeforeRequest(addr, request)
-		}
-		opaque := request.opaque
-		//defer delete(rc.responseTable, opaque) TODO should in listener
-
-		future := &ResponseFuture{
-			opaque:        opaque,
-			timeoutMillis: timeout,
-		}
-		rc.responseTable[opaque] = future
-
-		conn.Write(request.encode()) // TODO register listener
-
-		response := future.WaitResponse(timeout)
-		if response == nil {
-			if future.sendRequestOK {
-				return nil, errors.New(fmt.Sprintf("RemotingTimeout error: %s", future.err.Error()))
-			} else {
-				return nil, errors.New(fmt.Sprintf("RemotingSend error: %s", future.err.Error()))
-			}
-		}
-
-		if rc.rpcHook != nil {
-			rc.rpcHook.DoBeforeResponse(addr, response)
-		}
-		return response, nil
-	} else {
-		rc.CloseConn(addr) // TODO
-		return nil, errors.New(fmt.Sprintf("Connection to %s ERROR!", addr))
+		return
 	}
+	conn, err = self.createAndHandleTcpConn(address)
+	self.connTable[address] = conn
+	return
 }
 
-func (rc *RemotingClient) invokeAsync(addr string, request *RemotingCommand,
-	timeout time.Duration, callback InvokeCallback) error {
-	conn := rc.getAndCreateConn(addr)
-	if conn != nil { // TODO how to confirm conn active?
-		if rc.rpcHook != nil {
-			rc.rpcHook.DoBeforeRequest(addr, request)
+func (self *DefalutRemotingClient) getNamesvrConn() (conn net.Conn, err error) {
+	self.namesvrLockRW.RLock()
+	address := self.namesrvAddrSelectedAddr
+	self.namesvrLockRW.RUnlock()
+	if len(address) != 0 {
+		conn = self.GetConn(address)
+		if conn != nil {
+			return
 		}
+	}
 
-		opaque := request.opaque
-		acquired := false // TODO semaphore.tryAcquire...
-		if acquired {
-			//final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
-
-			future := NewResponseFuture(opaque, timeout, callback)
-			rc.responseTable[opaque] = future
-
-			future.WaitResponse(timeout) // TODO add listener
+	defer self.namesvrLockRW.Unlock()
+	self.namesvrLockRW.Lock()
+	//already connected by another write lock owner
+	address = self.namesrvAddrSelectedAddr
+	if len(address) != 0 {
+		conn = self.GetConn(address)
+		if conn != nil {
+			return
 		}
-		return nil
-	} else {
-		rc.CloseConn(addr) // TODO
-		return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr))
 	}
-}
 
-func (rc *RemotingClient) invokeOneWay(addr string, request *RemotingCommand,
-	timeout time.Duration) error {
-	conn := rc.getAndCreateConn(addr)
-	if conn != nil {
-		if rc.rpcHook != nil {
-			rc.rpcHook.DoBeforeRequest(addr, request)
+	addressCount := len(self.namesrvAddrList)
+	if self.namesrvAddrSelectedIndex < 0 {
+		self.namesrvAddrSelectedIndex = rand.Intn(addressCount)
+	}
+	for i := 1; i <= addressCount; i++ {
+		selectedIndex := (self.namesrvAddrSelectedIndex + i) % addressCount
+		selectAddress := self.namesrvAddrList[selectedIndex]
+		if len(selectAddress) == 0 {
+			continue
+		}
+		conn, err = self.CreateConn(selectAddress)
+		if err == nil {
+			self.namesrvAddrSelectedAddr = selectAddress
+			self.namesrvAddrSelectedIndex = selectedIndex
+			return
 		}
-
-		request.MarkOneWayRpc()
-		// TODO boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
-		return nil
-	} else {
-		rc.CloseConn(addr) // TODO
-		return errors.New(fmt.Sprintf("Connection to %s ERROR!", addr))
 	}
+	err = errors.New("all namesvrAddress can't use!,address:" + self.clientConfig.NameServerAddress())
+	return
 }
-
-func initValueIndex() int {
-	r := rand.Int()
-	if r < 0 { // math.Abs para is float64
-		r = -r
+func (self *DefalutRemotingClient) createAndHandleTcpConn(address string) (conn net.Conn, err error) {
+	conn, err = net.Dial("tcp", address)
+	if err != nil {
+		glog.Error(err)
+		return nil, err
 	}
-	return r % 999 % 999
+	go self.handlerReceiveLoop(conn, address) //handler连接 处理这个连接返回的结果
+	return
 }
-
-func (rc *RemotingClient) Start() {
-	// TODO
-}
-
-func (rc *RemotingClient) Shutdown() {
-	// TODO
-	rc.timer.Stop()
-}
-
-func (rc *RemotingClient) registerRPCHook(hk RPCHook) {
-	rc.rpcHook = hk
-}
-
-func (rc *RemotingClient) CloseConn(addr string) {
-	// TODO
+func (self *DefalutRemotingClient) ReleaseConn(addr string, conn net.Conn) {
+	defer self.connTableLock.Unlock()
+	conn.Close()
+	self.connTableLock.Lock()
+	delete(self.connTable, addr)
 }
 
-func (rc *RemotingClient) updateNameServerAddressList(addrs []string) {
-	old, update := rc.namesrvAddrList, false
-
-	if addrs != nil && len(addrs) > 0 {
-		if old == nil || len(addrs) != len(old) {
-			update = true
-		} else {
-			for i := 0; i < len(addrs) && !update; i++ {
-				if contains(old, addrs[i]) {
-					update = true
+func (self *DefalutRemotingClient) handlerReceiveLoop(conn net.Conn, addr string) (err error) {
+	defer func() {
+		//when for is break releaseConn
+		glog.Error(err, addr)
+		self.ReleaseConn(addr, conn)
+	}()
+	b := make([]byte, 1024)
+	var length, headerLength, bodyLength int32
+	var buf = bytes.NewBuffer([]byte{})
+	var header, body []byte
+	var readTotalLengthFlag = true //readLen when true,read data when false
+	for {
+		var n int
+		n, err = conn.Read(b)
+		if err != nil {
+			return
+		}
+		_, err = buf.Write(b[:n])
+		if err != nil {
+			return
+		}
+		for {
+			if readTotalLengthFlag {
+				//we read 4 bytes of allDataLength
+				if buf.Len() >= 4 {
+					err = binary.Read(buf, binary.BigEndian, &length)
+					if err != nil {
+						return
+					}
+					readTotalLengthFlag = false //now turn to read data
+				} else {
+					break //wait bytes we not got
+				}
+			}
+			if !readTotalLengthFlag {
+				if buf.Len() < int(length) {
+					// judge all data received.if not,loop to wait
+					break
 				}
 			}
+			//now all data received, we can read totalLen again
+			readTotalLengthFlag = true
+
+			//get the data,and handler it
+			//header len
+			err = binary.Read(buf, binary.BigEndian, &headerLength)
+			var realHeaderLen = (headerLength & 0x00ffffff)
+			//headerData the first ff is about serializable type
+			var headerSerializableType = byte(headerLength >> 24)
+			header = make([]byte, realHeaderLen)
+			_, err = buf.Read(header)
+			bodyLength = length - 4 - realHeaderLen
+			body = make([]byte, int(bodyLength))
+			if bodyLength == 0 {
+				// no body
+			} else {
+				_, err = buf.Read(body)
+			}
+			go self.handlerReceivedMessage(conn, headerSerializableType, header, body)
 		}
 	}
-
-	if update {
-		rc.namesrvAddrList = addrs // TODO safe?
-	}
-
 }
-
-func (rc *RemotingClient) getAndCreateConn(addr string) net.Conn {
-	return nil
-}
-
-func (rc *RemotingClient) getAndCreateNamesrvConn() net.Conn {
-	return nil
-}
-
-func (rc *RemotingClient) createConn(addr string) net.Conn {
-
-	return nil
+func (self *DefalutRemotingClient) handlerReceivedMessage(conn net.Conn, headerSerializableType byte, headBytes []byte, bodyBytes []byte) {
+	cmd := self.serializerHandler.DecodeRemoteCommand(headerSerializableType, headBytes, bodyBytes)
+	if cmd.IsResponseType() {
+		self.handlerResponse(cmd)
+		return
+	}
+	go self.handlerRequest(conn, cmd)
 }
-
-func (rc *RemotingClient) RegisterProcessor(requestCode int, processor *NetRequestProcessor, executor ExecutorService) {
-	// TODO
+func (self *DefalutRemotingClient) handlerRequest(conn net.Conn, cmd *RemotingCommand) {
+	responseCommand := self.clientRequestProcessor(cmd)
+	if responseCommand == nil {
+		return
+	}
+	responseCommand.Opaque = cmd.Opaque
+	responseCommand.MarkResponseType()
+	header := self.serializerHandler.EncodeHeader(responseCommand)
+	body := responseCommand.Body
+	err := self.sendRequest(header, body, conn, "")
+	if err != nil {
+		glog.Error(err)
+	}
 }
+func (self *DefalutRemotingClient) handlerResponse(cmd *RemotingCommand) {
+	response, err := self.getResponse(cmd.Opaque)
+	self.removeResponse(cmd.Opaque)
+	if err != nil {
+		return
+	}
+	response.ResponseCommand = cmd
+	if response.InvokeCallback != nil {
+		response.InvokeCallback(response)
+	}
 
-func (rc *RemotingClient) String() string {
-	return nil // TODO
+	if response.Done != nil {
+		response.Done <- true
+	}
 }
 
-func contains(s []string, o string) bool { // TODO optimize
-	for _, v := range s {
-		if o == v {
-			return true
+func (self *DefalutRemotingClient) ClearExpireResponse() {
+	for seq, responseObj := range self.responseTable.Items() {
+		response := responseObj.(*ResponseFuture)
+		if (response.BeginTimestamp + 30) <= time.Now().Unix() {
+			//30 mins expire
+			self.responseTable.Remove(seq)
+			if response.InvokeCallback != nil {
+				response.InvokeCallback(nil)
+				glog.Warningf("remove time out request %v", response)
+			}
 		}
 	}
-	return false
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/remoting_command.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/remoting_command.go b/rocketmq-go/remoting/remoting_command.go
index b57f1e9..a8361bd 100644
--- a/rocketmq-go/remoting/remoting_command.go
+++ b/rocketmq-go/remoting/remoting_command.go
@@ -14,168 +14,56 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-
 package remoting
 
-// TODO: refactor
 import (
-	"bytes"
-	"encoding/binary"
-	"encoding/json"
-	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
-	"log"
-	"os"
-	"strconv"
-	"sync"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util/structs"
 	"sync/atomic"
 )
 
-func init() {
-	// TODO
-}
-
-const (
-	SerializeTypeProperty = "rocketmq.serialize.type"
-	SerializeTypeEnv      = "ROCKETMQ_SERIALIZE_TYPE"
-	RemotingVersionKey    = "rocketmq.remoting.version"
-	rpcType               = 0 // 0, request command
-	rpcOneWay             = 1 // 0, RPC
-)
-
-type RemotingCommandType int
+var opaque int32
 
-const (
-	ResponseCommand RemotingCommandType = iota
-	RqeusetCommand
-)
+var RPC_TYPE int = 0   // 0, REQUEST_COMMAND
+var RPC_ONEWAY int = 1 // 0, RPC
 
-var configVersion int = -1
-var requestId int32
-var decodeLock sync.Mutex
+//var RESPONSE_TYPE int= 1 << RPC_TYPE
+var RESPONSE_TYPE int = 1
 
 type RemotingCommand struct {
 	//header
-	code      int               `json:"code"`
-	language  string            `json:"language"`
-	version   int               `json:"version"`
-	opaque    int32             `json:"opaque"`
-	flag      int               `json:"flag"`
-	remark    string            `json:"remark"`
-	extFields map[string]string `json:"extFields"`
-	header    CustomerHeader    // transient
-	//body
-	body []byte `json:"body,omitempty"`
+	Code      int16                  `json:"code"`
+	Language  string                 `json:"language"` //int 8
+	Version   int16                  `json:"version"`
+	Opaque    int32                  `json:"opaque"`
+	Flag      int                    `json:"flag"`
+	Remark    string                 `json:"remark"`
+	ExtFields map[string]interface{} `json:"extFields"` //java's ExtFields and customHeader is use this key word
+	Body      []byte                 `json:"body,omitempty"`
 }
 
-func NewRemotingCommand(code int, header CustomerHeader) *RemotingCommand {
-	cmd := &RemotingCommand{
-		code:   code,
-		header: header,
-	}
-	setCmdVersion(cmd)
-	return cmd
+func NewRemotingCommand(commandCode int16, customerHeader CustomerHeader) *RemotingCommand {
+	return NewRemotingCommandWithBody(commandCode, customerHeader, nil)
 }
 
-func setCmdVersion(cmd *RemotingCommand) {
-	if configVersion >= 0 {
-		cmd.version = configVersion // safety
-	} else if v := os.Getenv(RemotingVersionKey); v != "" {
-		value, err := strconv.Atoi(v)
-		if err != nil {
-			// TODO log
-		}
-		cmd.version = value
-		configVersion = value
+func NewRemotingCommandWithBody(commandCode int16, customerHeader CustomerHeader, body []byte) *RemotingCommand {
+	remotingCommand := new(RemotingCommand)
+	remotingCommand.Code = commandCode
+	currOpaque := atomic.AddInt32(&opaque, 1)
+	remotingCommand.Opaque = currOpaque
+	remotingCommand.Flag = constant.REMOTING_COMMAND_FLAG
+	remotingCommand.Language = constant.REMOTING_COMMAND_LANGUAGE
+	remotingCommand.Version = constant.REMOTING_COMMAND_VERSION
+	if customerHeader != nil {
+		remotingCommand.ExtFields = structs.Map(customerHeader)
 	}
+	remotingCommand.Body = body
+	return remotingCommand
 }
 
-func (cmd *RemotingCommand) encodeHeader() []byte {
-	length := 4
-	headerData := cmd.buildHeader()
-	length += len(headerData)
-
-	if cmd.body != nil {
-		length += len(cmd.body)
-	}
-
-	buf := bytes.NewBuffer([]byte{})
-	binary.Write(buf, binary.BigEndian, length)
-	binary.Write(buf, binary.BigEndian, len(cmd.body))
-	buf.Write(headerData)
-
-	return buf.Bytes()
+func (self *RemotingCommand) IsResponseType() bool {
+	return self.Flag&(RESPONSE_TYPE) == RESPONSE_TYPE
 }
-
-func (cmd *RemotingCommand) buildHeader() []byte {
-	buf, err := json.Marshal(cmd)
-	if err != nil {
-		return nil
-	}
-	return buf
-}
-
-func (cmd *RemotingCommand) encode() []byte {
-	length := 4
-
-	headerData := cmd.buildHeader()
-	length += len(headerData)
-
-	if cmd.body != nil {
-		length += len(cmd.body)
-	}
-
-	buf := bytes.NewBuffer([]byte{})
-	binary.Write(buf, binary.LittleEndian, length)
-	binary.Write(buf, binary.LittleEndian, len(cmd.body))
-	buf.Write(headerData)
-
-	if cmd.body != nil {
-		buf.Write(cmd.body)
-	}
-
-	return buf.Bytes()
-}
-
-func decodeRemoteCommand(header, body []byte) *RemotingCommand {
-	decodeLock.Lock()
-	defer decodeLock.Unlock()
-
-	cmd := &RemotingCommand{}
-	cmd.extFields = make(map[string]string)
-	err := json.Unmarshal(header, cmd)
-	if err != nil {
-		log.Print(err)
-		return nil
-	}
-	cmd.body = body
-	return cmd
-}
-
-func CreateRemotingCommand(code int, requestHeader *header.SendMessageRequestHeader) *RemotingCommand {
-	cmd := &RemotingCommand{}
-	cmd.code = code
-	cmd.header = requestHeader
-	cmd.version = 1
-	cmd.opaque = atomic.AddInt32(&requestId, 1) // TODO: safety?
-	return cmd
-}
-
-func (cmd *RemotingCommand) SetBody(body []byte) {
-	cmd.body = body
-}
-
-func (cmd *RemotingCommand) Type() RemotingCommandType {
-	bits := 1 << rpcType
-	if (cmd.flag & bits) == bits {
-		return ResponseCommand
-	}
-	return RqeusetCommand
-}
-
-func (cmd *RemotingCommand) MarkOneWayRpc() {
-	cmd.flag |= (1 << rpcOneWay)
-}
-
-func (cmd *RemotingCommand) String() string {
-	return nil // TODO
+func (self *RemotingCommand) MarkResponseType() {
+	self.Flag = (self.Flag | RESPONSE_TYPE)
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/request_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/request_code.go b/rocketmq-go/remoting/request_code.go
new file mode 100644
index 0000000..52965d5
--- /dev/null
+++ b/rocketmq-go/remoting/request_code.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+const (
+	SEND_MESSAGE                        = 10
+	PULL_MESSAGE                        = 11
+	QUERY_MESSAGE                       = 12
+	QUERY_BROKER_OFFSET                 = 13
+	QUERY_CONSUMER_OFFSET               = 14
+	UPDATE_CONSUMER_OFFSET              = 15
+	UPDATE_AND_CREATE_TOPIC             = 17
+	GET_ALL_TOPIC_CONFIG                = 21
+	GET_TOPIC_CONFIG_LIST               = 22
+	GET_TOPIC_NAME_LIST                 = 23
+	UPDATE_BROKER_CONFIG                = 25
+	GET_BROKER_CONFIG                   = 26
+	TRIGGER_DELETE_FILES                = 27
+	GET_BROKER_RUNTIME_INFO             = 28
+	SEARCH_OFFSET_BY_TIMESTAMP          = 29
+	GET_MAX_OFFSET                      = 30
+	GET_MIN_OFFSET                      = 31
+	GET_EARLIEST_MSG_STORETIME          = 32
+	VIEW_MESSAGE_BY_ID                  = 33
+	HEART_BEAT                          = 34
+	UNREGISTER_CLIENT                   = 35
+	CONSUMER_SEND_MSG_BACK              = 36
+	END_TRANSACTION                     = 37
+	GET_CONSUMER_LIST_BY_GROUP          = 38
+	CHECK_TRANSACTION_STATE             = 39
+	NOTIFY_CONSUMER_IDS_CHANGED         = 40
+	LOCK_BATCH_MQ                       = 41
+	UNLOCK_BATCH_MQ                     = 42
+	GET_ALL_CONSUMER_OFFSET             = 43
+	GET_ALL_DELAY_OFFSET                = 45
+	PUT_KV_CONFIG                       = 100
+	GET_KV_CONFIG                       = 101
+	DELETE_KV_CONFIG                    = 102
+	REGISTER_BROKER                     = 103
+	UNREGISTER_BROKER                   = 104
+	GET_ROUTEINTO_BY_TOPIC              = 105
+	GET_BROKER_CLUSTER_INFO             = 106
+	UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200
+	GET_ALL_SUBSCRIPTIONGROUP_CONFIG    = 201
+	GET_TOPIC_STATS_INFO                = 202
+	GET_CONSUMER_CONNECTION_LIST        = 203
+	GET_PRODUCER_CONNECTION_LIST        = 204
+	WIPE_WRITE_PERM_OF_BROKER           = 205
+
+	GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206
+	DELETE_SUBSCRIPTIONGROUP           = 207
+	GET_CONSUME_STATS                  = 208
+	SUSPEND_CONSUMER                   = 209
+	RESUME_CONSUMER                    = 210
+	RESET_CONSUMER_OFFSET_IN_CONSUMER  = 211
+	RESET_CONSUMER_OFFSET_IN_BROKER    = 212
+	ADJUST_CONSUMER_THREAD_POOL        = 213
+	WHO_CONSUME_THE_MESSAGE            = 214
+
+	DELETE_TOPIC_IN_BROKER    = 215
+	DELETE_TOPIC_IN_NAMESRV   = 216
+	GET_KV_CONFIG_BY_VALUE    = 217
+	DELETE_KV_CONFIG_BY_VALUE = 218
+	GET_KVLIST_BY_NAMESPACE   = 219
+
+	RESET_CONSUMER_CLIENT_OFFSET         = 220
+	GET_CONSUMER_STATUS_FROM_CLIENT      = 221
+	INVOKE_BROKER_TO_RESET_OFFSET        = 222
+	INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223
+
+	QUERY_TOPIC_CONSUME_BY_WHO = 300
+
+	GET_TOPICS_BY_CLUSTER = 224
+
+	REGISTER_FILTER_SERVER            = 301
+	REGISTER_MESSAGE_FILTER_CLASS     = 302
+	QUERY_CONSUME_TIME_SPAN           = 303
+	GET_SYSTEM_TOPIC_LIST_FROM_NS     = 304
+	GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305
+
+	CLEAN_EXPIRED_CONSUMEQUEUE = 306
+
+	GET_CONSUMER_RUNNING_INFO = 307
+
+	QUERY_CORRECTION_OFFSET = 308
+
+	CONSUME_MESSAGE_DIRECTLY = 309
+
+	SEND_MESSAGE_V2 = 310
+
+	GET_UNIT_TOPIC_LIST                = 311
+	GET_HAS_UNIT_SUB_TOPIC_LIST        = 312
+	GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313
+	CLONE_GROUP_OFFSET                 = 314
+
+	VIEW_BROKER_STATS_DATA = 315
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/request_processor.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/request_processor.go b/rocketmq-go/remoting/request_processor.go
new file mode 100644
index 0000000..eec8cd8
--- /dev/null
+++ b/rocketmq-go/remoting/request_processor.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+type ClientRequestProcessor func(remotingCommand *RemotingCommand) (responseCommand *RemotingCommand)
+
+//CHECK_TRANSACTION_STATE
+//NOTIFY_CONSUMER_IDS_CHANGED
+//RESET_CONSUMER_CLIENT_OFFSET
+//GET_CONSUMER_STATUS_FROM_CLIENT
+//GET_CONSUMER_RUNNING_INFO
+//CONSUME_MESSAGE_DIRECTLY

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/response_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/response_code.go b/rocketmq-go/remoting/response_code.go
new file mode 100644
index 0000000..6a49c77
--- /dev/null
+++ b/rocketmq-go/remoting/response_code.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+const (
+	SUCCESS                       = 0
+	SYSTEM_ERROR                  = 1
+	SYSTEM_BUSY                   = 2
+	REQUEST_CODE_NOT_SUPPORTED    = 3
+	TRANSACTION_FAILED            = 4
+	FLUSH_DISK_TIMEOUT            = 10
+	SLAVE_NOT_AVAILABLE           = 11
+	FLUSH_SLAVE_TIMEOUT           = 12
+	MESSAGE_ILLEGAL               = 13
+	SERVICE_NOT_AVAILABLE         = 14
+	VERSION_NOT_SUPPORTED         = 15
+	NO_PERMISSION                 = 16
+	TOPIC_NOT_EXIST               = 17
+	TOPIC_EXIST_ALREADY           = 18
+	PULL_NOT_FOUND                = 19
+	PULL_RETRY_IMMEDIATELY        = 20
+	PULL_OFFSET_MOVED             = 21
+	QUERY_NOT_FOUND               = 22
+	SUBSCRIPTION_PARSE_FAILED     = 23
+	SUBSCRIPTION_NOT_EXIST        = 24
+	SUBSCRIPTION_NOT_LATEST       = 25
+	SUBSCRIPTION_GROUP_NOT_EXIST  = 26
+	TRANSACTION_SHOULD_COMMIT     = 200
+	TRANSACTION_SHOULD_ROLLBACK   = 201
+	TRANSACTION_STATE_UNKNOW      = 202
+	TRANSACTION_STATE_GROUP_WRONG = 203
+	NO_BUYER_ID                   = 204
+
+	NOT_IN_CURRENT_UNIT = 205
+
+	CONSUMER_NOT_ONLINE = 206
+
+	CONSUME_MSG_TIMEOUT = 207
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/response_future.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/response_future.go b/rocketmq-go/remoting/response_future.go
index a8c2aec..1bece6d 100644
--- a/rocketmq-go/remoting/response_future.go
+++ b/rocketmq-go/remoting/response_future.go
@@ -16,57 +16,14 @@
  */
 package remoting
 
-import (
-	"sync"
-	"time"
-)
-
-type InvokeCallback func(responseFuture *ResponseFuture)
-
 type ResponseFuture struct {
-	opaque          int32
-	timeoutMillis   time.Duration
-	invokeCallback  InvokeCallback
-	beginTimestamp  int64
-	responseCommand *RemotingCommand
-	sendRequestOK   bool
-	done            chan bool
-	latch           sync.WaitGroup
-	err             error
-}
-
-func NewResponseFuture(opaque int32, timeout time.Duration, callback InvokeCallback) *ResponseFuture {
-	future := &ResponseFuture{
-		opaque:         opaque,
-		timeoutMillis:  timeout,
-		invokeCallback: callback,
-		latch:          sync.WaitGroup{},
-	}
-	future.latch.Add(1)
-	return future
-}
-func (future *ResponseFuture) SetResponseFuture(cmd *RemotingCommand) {
-	future.responseCommand = cmd
-}
-
-func (future *ResponseFuture) Done() {
-	future.latch.Done()
-	future.done <- true
-}
-
-func (future *ResponseFuture) executeInvokeCallback() {
-	future.invokeCallback(nil) // TODO
-}
-
-func (future *ResponseFuture) WaitResponse(timeout time.Duration) *RemotingCommand {
-	go func() { // TODO optimize
-		time.Sleep(timeout)
-		future.latch.Add(-1) // TODO whats happened when counter less than 0
-	}()
-	future.latch.Wait()
-	return future.responseCommand
-}
-
-func (future *ResponseFuture) String() string {
-	return nil
+	ResponseCommand *RemotingCommand
+	SendRequestOK   bool
+	Rrr             error
+	Opaque          int32
+	TimeoutMillis   int64
+	InvokeCallback  InvokeCallback
+	BeginTimestamp  int64
+	Done            chan bool
 }
+type InvokeCallback func(responseFuture *ResponseFuture)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/rocketmq_serializable.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/rocketmq_serializable.go b/rocketmq-go/remoting/rocketmq_serializable.go
new file mode 100644
index 0000000..22dc41f
--- /dev/null
+++ b/rocketmq-go/remoting/rocketmq_serializable.go
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+import (
+	"bytes"
+	"encoding/binary"
+	"fmt"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+)
+
+type RocketMqSerializer struct {
+}
+
+func (self *RocketMqSerializer) EncodeHeaderData(cmd *RemotingCommand) []byte {
+	var (
+		remarkBytes       []byte
+		remarkBytesLen    int
+		extFieldsBytes    []byte
+		extFieldsBytesLen int
+	)
+	remarkBytesLen = 0
+	if len(cmd.Remark) > 0 {
+		remarkBytes = []byte(cmd.Remark)
+		remarkBytesLen = len(remarkBytes)
+	}
+	if cmd.ExtFields != nil {
+		extFieldsBytes = rocketMqCustomHeaderSerialize(cmd.ExtFields)
+		extFieldsBytesLen = len(extFieldsBytes)
+	}
+	buf := bytes.NewBuffer([]byte{})
+	binary.Write(buf, binary.BigEndian, int16(cmd.Code))       //code(~32767) 2
+	binary.Write(buf, binary.BigEndian, int8(0))               //JAVA
+	binary.Write(buf, binary.BigEndian, int16(cmd.Version))    //2
+	binary.Write(buf, binary.BigEndian, int32(cmd.Opaque))     //opaque 4
+	binary.Write(buf, binary.BigEndian, int32(cmd.Flag))       //4
+	binary.Write(buf, binary.BigEndian, int32(remarkBytesLen)) //4
+	if remarkBytesLen > 0 {
+		buf.Write(remarkBytes)
+	}
+	binary.Write(buf, binary.BigEndian, int32(extFieldsBytesLen)) //4
+	if extFieldsBytesLen > 0 {
+		buf.Write(extFieldsBytes)
+	}
+	fmt.Println(buf.Bytes())
+	return buf.Bytes()
+}
+
+func (self *RocketMqSerializer) DecodeRemoteCommand(headerArray, body []byte) (cmd *RemotingCommand) {
+	cmd = &RemotingCommand{}
+	buf := bytes.NewBuffer(headerArray)
+	// int code(~32767)
+	binary.Read(buf, binary.BigEndian, &cmd.Code)
+	// LanguageCode language
+	var LanguageCodeNope byte
+	binary.Read(buf, binary.BigEndian, &LanguageCodeNope)
+	cmd.Language = constant.REMOTING_COMMAND_LANGUAGE //todo use code from remote
+	// int version(~32767)
+	binary.Read(buf, binary.BigEndian, &cmd.Version)
+	// int opaque
+	binary.Read(buf, binary.BigEndian, &cmd.Opaque)
+	// int flag
+	binary.Read(buf, binary.BigEndian, &cmd.Flag)
+	// String remark
+	var remarkLen, extFieldsLen int32
+	binary.Read(buf, binary.BigEndian, &remarkLen)
+	if remarkLen > 0 {
+		var remarkData = make([]byte, remarkLen)
+		binary.Read(buf, binary.BigEndian, &remarkData)
+		cmd.Remark = string(remarkData)
+	}
+	//map ext
+	// HashMap<String, String> extFields
+	binary.Read(buf, binary.BigEndian, &extFieldsLen)
+	if extFieldsLen > 0 {
+		var extFieldsData = make([]byte, extFieldsLen)
+		binary.Read(buf, binary.BigEndian, &extFieldsData)
+		extFiledMap := customHeaderDeserialize(extFieldsData)
+		cmd.ExtFields = extFiledMap
+	}
+	cmd.Body = body
+	return
+}
+
+func rocketMqCustomHeaderSerialize(extFiled map[string]interface{}) (byteData []byte) {
+	buf := bytes.NewBuffer([]byte{})
+	for key, value := range extFiled {
+		keyBytes := []byte(fmt.Sprintf("%v", key))
+		valueBytes := []byte(fmt.Sprintf("%v", value))
+		binary.Write(buf, binary.BigEndian, int16(len(keyBytes)))
+		buf.Write(keyBytes)
+		binary.Write(buf, binary.BigEndian, int32(len(valueBytes)))
+		buf.Write(valueBytes)
+	}
+	byteData = buf.Bytes()
+	return
+}
+
+func customHeaderDeserialize(extFiledDataBytes []byte) (extFiledMap map[string]interface{}) {
+	extFiledMap = make(map[string]interface{})
+	buf := bytes.NewBuffer(extFiledDataBytes)
+	for buf.Len() > 0 {
+		var key = getItemFormExtFiledDataBytes(buf, "key")
+		var value = getItemFormExtFiledDataBytes(buf, "value")
+		extFiledMap[key] = value
+	}
+	return
+}
+func getItemFormExtFiledDataBytes(buff *bytes.Buffer, itemType string) (item string) {
+	if itemType == "key" {
+		var len int16
+		binary.Read(buff, binary.BigEndian, &len)
+		var data = make([]byte, len)
+		binary.Read(buff, binary.BigEndian, &data)
+		item = string(data)
+	}
+	if itemType == "value" {
+		var len int32
+		binary.Read(buff, binary.BigEndian, &len)
+		var data = make([]byte, len)
+		binary.Read(buff, binary.BigEndian, &data)
+		item = string(data)
+	}
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/remoting/serializable.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/remoting/serializable.go b/rocketmq-go/remoting/serializable.go
new file mode 100644
index 0000000..24420cd
--- /dev/null
+++ b/rocketmq-go/remoting/serializable.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remoting
+
+import (
+	"bytes"
+	"encoding/binary"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/golang/glog"
+)
+
+type SerializerHandler struct {
+	serializer Serializer //which serializer this client use, depend on  constant.USE_HEADER_SERIALIZETYPE
+}
+
+type Serializer interface {
+	EncodeHeaderData(request *RemotingCommand) []byte
+	DecodeRemoteCommand(header, body []byte) *RemotingCommand
+}
+
+var JSON_SERIALIZER = &JsonSerializer{}
+var ROCKETMQ_SERIALIZER = &RocketMqSerializer{}
+
+func NewSerializerHandler() SerializerHandler {
+	serializerHandler := SerializerHandler{}
+	switch constant.USE_HEADER_SERIALIZETYPE {
+	case constant.JSON_SERIALIZE:
+		serializerHandler.serializer = JSON_SERIALIZER
+		break
+
+	case constant.ROCKETMQ_SERIALIZE:
+		serializerHandler.serializer = ROCKETMQ_SERIALIZER
+		break
+	default:
+		panic("illeage serializer type")
+	}
+	return serializerHandler
+}
+func (self *SerializerHandler) EncodeHeader(request *RemotingCommand) []byte {
+	length := 4
+	headerData := self.serializer.EncodeHeaderData(request)
+	length += len(headerData)
+	if request.Body != nil {
+		length += len(request.Body)
+	}
+	buf := bytes.NewBuffer([]byte{})
+	binary.Write(buf, binary.BigEndian, int32(length))                                                       // len
+	binary.Write(buf, binary.BigEndian, int32(len(headerData)|(int(constant.USE_HEADER_SERIALIZETYPE)<<24))) // header len
+	buf.Write(headerData)
+	return buf.Bytes()
+}
+
+func (self *SerializerHandler) DecodeRemoteCommand(headerSerializableType byte, header, body []byte) *RemotingCommand {
+	var serializer Serializer
+	switch headerSerializableType {
+	case constant.JSON_SERIALIZE:
+		serializer = JSON_SERIALIZER
+		break
+	case constant.ROCKETMQ_SERIALIZE:
+		serializer = ROCKETMQ_SERIALIZER
+		break
+	default:
+		glog.Error("Unknow headerSerializableType", headerSerializableType)
+	}
+	return serializer.DecodeRemoteCommand(header, body)
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/client_api.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/client_api.go b/rocketmq-go/service/client_api.go
index ff750ba..6901f4c 100644
--- a/rocketmq-go/service/client_api.go
+++ b/rocketmq-go/service/client_api.go
@@ -20,7 +20,6 @@ package service
 import (
 	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
 	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
-	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
 	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
 	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
 )
@@ -45,25 +44,25 @@ type MQClientAPI struct {
 	config            *config.ClientConfig
 }
 
-func NewMQClientAPI(cfg *config.ClientConfig, processor *ClientRemotingProcessor, hook remoting.RPCHook) *MQClientAPI {
-	api := &MQClientAPI{
-		remotingClient: &remoting.RemotingClient{}, //TODO
-		topAddress:     &TopAddress{},              // TODO
-		crp:            processor,
-		config:         cfg,
-	}
-
-	// TODO register
-	return api
-}
-
-func (api *MQClientAPI) SendMessage(addr, brokerName string,
-	msg message.Message, requestHeader header.SendMessageRequestHeader, timeout int64) *model.SendResult {
-	var request *remoting.RemotingCommand
-	request = remoting.CreateRemotingCommand(model.SendMsg, &requestHeader)
-	request.SetBody(msg.Body)
-	return api.sendMessageSync(addr, brokerName, msg, timeout, request)
-}
+//func NewMQClientAPI(cfg *config.ClientConfig, processor *ClientRemotingProcessor, hook remoting.RPCHook) *MQClientAPI {
+//	api := &MQClientAPI{
+//		remotingClient: &remoting.RemotingClient{}, //TODO
+//		topAddress:     &TopAddress{},              // TODO
+//		crp:            processor,
+//		config:         cfg,
+//	}
+//
+//	// TODO register
+//	return api
+//}
+//
+//func (api *MQClientAPI) SendMessage(addr, brokerName string,
+//	msg message.Message, requestHeader header.SendMessageRequestHeader, timeout int64) *model.SendResult {
+//	var request *remoting.RemotingCommand
+//	request = remoting.CreateRemotingCommand(model.SendMsg, &requestHeader)
+//	request.SetBody(msg.Body)
+//	return api.sendMessageSync(addr, brokerName, msg, timeout, request)
+//}
 
 func (api *MQClientAPI) sendMessageSync(addr, brokerName string,
 	msg message.Message,

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/consume_message_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/consume_message_service.go b/rocketmq-go/service/consume_message_service.go
new file mode 100644
index 0000000..09be61c
--- /dev/null
+++ b/rocketmq-go/service/consume_message_service.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service
+
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/golang/glog"
+	"time"
+)
+
+type ConsumeMessageService interface {
+	//ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
+
+	Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig)
+	SubmitConsumeRequest(msgs []model.MessageExt, processQueue *model.ProcessQueue,
+		messageQueue *model.MessageQueue, dispathToConsume bool)
+	SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error)
+	ConsumeMessageDirectly(messageExt *model.MessageExt, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error)
+}
+
+type ConsumeMessageConcurrentlyServiceImpl struct {
+	consumerGroup   string
+	messageListener model.MessageListener
+	//sendMessageBackProducerService SendMessageBackProducerService //for send retry Message
+	offsetStore    OffsetStore
+	consumerConfig *config.RocketMqConsumerConfig
+}
+
+func NewConsumeMessageConcurrentlyServiceImpl(messageListener model.MessageListener) (consumeService ConsumeMessageService) {
+	//consumeService = &ConsumeMessageConcurrentlyServiceImpl{messageListener:messageListener, sendMessageBackProducerService:&SendMessageBackProducerServiceImpl{}}
+	return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) {
+	self.consumerGroup = consumerGroup
+	self.offsetStore = offsetStore
+	//self.sendMessageBackProducerService.InitSendMessageBackProducerService(consumerGroup, mqClient,defaultProducerService,consumerConfig)
+	self.consumerConfig = consumerConfig
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) SubmitConsumeRequest(msgs []model.MessageExt, processQueue *model.ProcessQueue, messageQueue *model.MessageQueue, dispathToConsume bool) {
+	msgsLen := len(msgs)
+	for i := 0; i < msgsLen; {
+		begin := i
+		end := i + self.consumerConfig.ConsumeMessageBatchMaxSize
+		if end > msgsLen {
+			end = msgsLen
+		}
+		go func() {
+			glog.V(2).Infof("look slice begin %d end %d msgsLen %d", begin, end, msgsLen)
+			batchMsgs := transformMessageToConsume(self.consumerGroup, msgs[begin:end])
+			consumeState := self.messageListener(batchMsgs)
+			self.processConsumeResult(consumeState, batchMsgs, messageQueue, processQueue)
+		}()
+		i = end
+	}
+	return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error) {
+	//err = self.sendMessageBackProducerService.SendMessageBack(messageExt, 0, brokerName)
+	return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) ConsumeMessageDirectly(messageExt *model.MessageExt, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error) {
+	start := time.Now().UnixNano() / 1000000
+	consumeResult := self.messageListener([]model.MessageExt{*messageExt})
+	consumeMessageDirectlyResult.AutoCommit = true
+	consumeMessageDirectlyResult.Order = false
+	consumeMessageDirectlyResult.SpentTimeMills = time.Now().UnixNano()/1000000 - start
+	if consumeResult.ConsumeConcurrentlyStatus == "CONSUME_SUCCESS" && consumeResult.AckIndex >= 0 {
+		consumeMessageDirectlyResult.ConsumeResult = "CR_SUCCESS"
+
+	} else {
+		consumeMessageDirectlyResult.ConsumeResult = "CR_THROW_EXCEPTION"
+	}
+	return
+}
+
+func (self *ConsumeMessageConcurrentlyServiceImpl) processConsumeResult(result model.ConsumeConcurrentlyResult, msgs []model.MessageExt, messageQueue *model.MessageQueue, processQueue *model.ProcessQueue) {
+	if processQueue.IsDropped() {
+		glog.Warning("processQueue is dropped without process consume result. ", msgs)
+		return
+	}
+	if len(msgs) == 0 {
+		return
+	}
+	ackIndex := result.AckIndex
+	if model.CONSUME_SUCCESS == result.ConsumeConcurrentlyStatus {
+		if ackIndex >= len(msgs) {
+			ackIndex = len(msgs) - 1
+		} else {
+			if result.AckIndex < 0 {
+				ackIndex = -1
+			}
+		}
+	}
+	var failedMessages []model.MessageExt
+	successMessages := []model.MessageExt{}
+	if ackIndex >= 0 {
+		successMessages = msgs[:ackIndex+1]
+	}
+	for i := ackIndex + 1; i < len(msgs); i++ {
+		err := self.SendMessageBack(&msgs[i], 0, messageQueue.BrokerName)
+		if err != nil {
+			msgs[i].ReconsumeTimes = msgs[i].ReconsumeTimes + 1
+			failedMessages = append(failedMessages, msgs[i])
+		} else {
+			successMessages = append(successMessages, msgs[i])
+		}
+	}
+	if len(failedMessages) > 0 {
+		self.SubmitConsumeRequest(failedMessages, processQueue, messageQueue, true)
+	}
+	//commitOffset := processQueue.RemoveMessage(successMessages)
+	//if (commitOffset > 0 && ! processQueue.IsDropped()) {
+	//	self.offsetStore.UpdateOffset(messageQueue, commitOffset, true)
+	//}
+
+}
+
+func transformMessageToConsume(consumerGroup string, msgs []model.MessageExt) []model.MessageExt {
+	retryTopicName := constant.RETRY_GROUP_TOPIC_PREFIX + consumerGroup
+
+	for _, msg := range msgs {
+		//reset retry topic name
+		if msg.Message.Topic == retryTopicName {
+			retryTopic := msg.Properties[constant.PROPERTY_RETRY_TOPIC]
+			if len(retryTopic) > 0 {
+				msg.Message.Topic = retryTopic
+			}
+		}
+		//set consume start time
+		msg.SetConsumeStartTime()
+	}
+	return msgs
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/consume_messsage_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/consume_messsage_service.go b/rocketmq-go/service/consume_messsage_service.go
deleted file mode 100644
index d3b28fc..0000000
--- a/rocketmq-go/service/consume_messsage_service.go
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package service
-
-type ConsumeMessageService struct {
-}