You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/12/25 04:14:29 UTC
[GitHub] [rocketmq-client-go] bobo888 opened a new issue #341: [Bug]Use
comsumer crash
bobo888 opened a new issue #341: [Bug]Use comsumer crash
URL: https://github.com/apache/rocketmq-client-go/issues/341
I use comsumer crash.
crash info:
runtime.throw(0xa34bcb, 0x21)
/mnt/go/src/runtime/panic.go:619 +0x81 fp=0xc4215e5a88 sp=0xc4215e5a68 pc=0x42b331
runtime.mapaccess2_faststr(0x965140, 0xc420141e00, 0xc42159a1c0, 0x13, 0xc420141e00, 0xc4217fa801)
/mnt/go/src/runtime/hashmap_fast.go:270 +0x461 fp=0xc4215e5af8 sp=0xc4215e5a88 pc=0x40c5e1
github.com/apache/rocketmq-client-go/internal.(*namesrvs).findBrokerVersion(0xc4200cc000, 0xc4200248c0, 0xa, 0xc42159a1c0, 0x13, 0x92fa01)
/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/internal/route.go:307 +0xa0 fp=0xc4215e5b48 sp=0xc4215e5af8 pc=0x8790a0
github.com/apache/rocketmq-client-go/internal.(*namesrvs).FindBrokerAddressInSubscribe(0xc4200cc000, 0xc4200248c0, 0xa, 0x0, 0x2f2ceb00, 0xc421935f80)
/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/internal/route.go:234 +0x11c fp=0xc4215e5bb0 sp=0xc4215e5b48 pc=0x87868c
github.com/apache/rocketmq-client-go/consumer.(*defaultConsumer).tryFindBroker(0xc42116cb00, 0xc421610960, 0x0)
/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/consumer/consumer.go:1045 +0x72 fp=0xc4215e5bf0 sp=0xc4215e5bb0 pc=0x889602
github.com/apache/rocketmq-client-go/consumer.(*pushConsumer).pullMessage(0xc42016c960, 0xc421674210)
/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/consumer/push_consumer.go:563 +0x103e fp=0xc4215e5fb0 sp=0xc4215e5bf0 pc=0x8904be
github.com/apache/rocketmq-client-go/consumer.(*pushConsumer).Start.func1.2.1(0xc42016c960, 0xc421674210)
/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/consumer/push_consumer.go:168 +0x35 fp=0xc4215e5fd0 sp=0xc4215e5fb0 pc=0x899255
runtime.goexit()
/mnt/go/src/runtime/asm_amd64.s:2361 +0x1 fp=0xc4215e5fd8 sp=0xc4215e5fd0 pc=0x4582e1
created by github.com/apache/rocketmq-client-go/consumer.(*pushConsumer).Start.func1.2
/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/consumer/push_consumer.go:167 +0x63
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [rocketmq-client-go] bobo888 commented on issue #341: [Bug]Use
comsumer crash
Posted by GitBox <gi...@apache.org>.
bobo888 commented on issue #341: [Bug]Use comsumer crash
URL: https://github.com/apache/rocketmq-client-go/issues/341#issuecomment-568852600
I modify like this.
rocketmq-client-go/internal/namesrv.go
```go
func (s *namesrvs) updateBrokerVersion(brokerName string, version int32) {
s.lock.Lock()
defer s.lock.Unlock()
v, exist := s.brokerVersionMap.Load(brokerName)
var m map[string]int32
if exist {
m = v.(map[string]int32)
} else {
m = make(map[string]int32, 4)
s.brokerVersionMap.Store(brokerName, m)
}
m[brokerName] = version
}
func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
s.lock.Lock()
defer s.lock.Unlock()
versions, exist := s.brokerVersionMap.Load(brokerName)
if !exist {
return 0
}
v, exist := versions.(map[string]int32)[brokerAddr]
if exist {
return v
}
return 0
}
```
rocketmq-client-go/internal/client.go
```go
func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
c.hbMutex.Lock()
defer c.hbMutex.Unlock()
hbData := NewHeartbeatData(c.ClientID())
c.producerMap.Range(func(key, value interface{}) bool {
pData := producerData{
GroupName: key.(string),
}
hbData.ProducerDatas.Add(pData)
return true
})
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
cData := consumerData{
GroupName: key.(string),
CType: "PUSH",
MessageModel: "CLUSTERING",
Where: "CONSUME_FROM_FIRST_OFFSET",
UnitMode: consumer.IsUnitMode(),
SubscriptionDatas: consumer.SubscriptionDataList(),
}
hbData.ConsumerDatas.Add(cData)
return true
})
if hbData.ProducerDatas.Len() == 0 && hbData.ConsumerDatas.Len() == 0 {
rlog.Info("sending heartbeat, but no producer and no consumer", nil)
return
}
c.namesrvs.brokerAddressesMap.Range(func(key, value interface{}) bool {
brokerName := key.(string)
data := value.(*BrokerData)
for id, addr := range data.BrokerAddresses {
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
response, err := c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
if err != nil {
rlog.Warning("send heart beat to broker error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return true
}
if response.Code == ResSuccess {
c.namesrvs.updateBrokerVersion(brokerName, int32(response.Version))
rlog.Debug("send heart beat to broker success", map[string]interface{}{
"brokerName": brokerName,
"brokerId": id,
"brokerAddr": addr,
})
}
}
return true
})
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services