You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/12/25 06:59:25 UTC

[GitHub] [rocketmq-client-go] bobo888 commented on issue #341: [Bug]Use comsumer crash

bobo888 commented on issue #341: [Bug]Use comsumer crash
URL: https://github.com/apache/rocketmq-client-go/issues/341#issuecomment-568852600
 
 
   I modify like this.
   
   rocketmq-client-go/internal/namesrv.go
   
   ```go
   func (s *namesrvs) updateBrokerVersion(brokerName string, version int32) {
   	s.lock.Lock()
   	defer s.lock.Unlock()
   
   	v, exist := s.brokerVersionMap.Load(brokerName)
   	var m map[string]int32
   	if exist {
   		m = v.(map[string]int32)
   	} else {
   		m = make(map[string]int32, 4)
   		s.brokerVersionMap.Store(brokerName, m)
   	}
   	m[brokerName] = version
   }
   
   func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
   	s.lock.Lock()
   	defer s.lock.Unlock()
   
   	versions, exist := s.brokerVersionMap.Load(brokerName)
   	if !exist {
   		return 0
   	}
   
   	v, exist := versions.(map[string]int32)[brokerAddr]
   	if exist {
   		return v
   	}
   
   	return 0
   }
   ```
   
   rocketmq-client-go/internal/client.go
   
   ```go
   func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
   	c.hbMutex.Lock()
   	defer c.hbMutex.Unlock()
   	hbData := NewHeartbeatData(c.ClientID())
   
   	c.producerMap.Range(func(key, value interface{}) bool {
   		pData := producerData{
   			GroupName: key.(string),
   		}
   		hbData.ProducerDatas.Add(pData)
   		return true
   	})
   
   	c.consumerMap.Range(func(key, value interface{}) bool {
   		consumer := value.(InnerConsumer)
   		cData := consumerData{
   			GroupName:         key.(string),
   			CType:             "PUSH",
   			MessageModel:      "CLUSTERING",
   			Where:             "CONSUME_FROM_FIRST_OFFSET",
   			UnitMode:          consumer.IsUnitMode(),
   			SubscriptionDatas: consumer.SubscriptionDataList(),
   		}
   		hbData.ConsumerDatas.Add(cData)
   		return true
   	})
   	if hbData.ProducerDatas.Len() == 0 && hbData.ConsumerDatas.Len() == 0 {
   		rlog.Info("sending heartbeat, but no producer and no consumer", nil)
   		return
   	}
   	c.namesrvs.brokerAddressesMap.Range(func(key, value interface{}) bool {
   		brokerName := key.(string)
   		data := value.(*BrokerData)
   		for id, addr := range data.BrokerAddresses {
   			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
   			response, err := c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
   			if err != nil {
   				rlog.Warning("send heart beat to broker error", map[string]interface{}{
   					rlog.LogKeyUnderlayError: err,
   				})
   				return true
   			}
   			if response.Code == ResSuccess {
   				c.namesrvs.updateBrokerVersion(brokerName, int32(response.Version))
   				rlog.Debug("send heart beat to broker success", map[string]interface{}{
   					"brokerName": brokerName,
   					"brokerId":   id,
   					"brokerAddr": addr,
   				})
   			}
   		}
   		return true
   	})
   }
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services