You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/12/25 04:14:29 UTC

[GitHub] [rocketmq-client-go] bobo888 opened a new issue #341: [Bug]Use comsumer crash

bobo888 opened a new issue #341: [Bug]Use comsumer crash
URL: https://github.com/apache/rocketmq-client-go/issues/341
 
 
   I use comsumer crash.
   crash info:
   
   runtime.throw(0xa34bcb, 0x21)
   	/mnt/go/src/runtime/panic.go:619 +0x81 fp=0xc4215e5a88 sp=0xc4215e5a68 pc=0x42b331
   runtime.mapaccess2_faststr(0x965140, 0xc420141e00, 0xc42159a1c0, 0x13, 0xc420141e00, 0xc4217fa801)
   	/mnt/go/src/runtime/hashmap_fast.go:270 +0x461 fp=0xc4215e5af8 sp=0xc4215e5a88 pc=0x40c5e1
   github.com/apache/rocketmq-client-go/internal.(*namesrvs).findBrokerVersion(0xc4200cc000, 0xc4200248c0, 0xa, 0xc42159a1c0, 0x13, 0x92fa01)
   	/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/internal/route.go:307 +0xa0 fp=0xc4215e5b48 sp=0xc4215e5af8 pc=0x8790a0
   github.com/apache/rocketmq-client-go/internal.(*namesrvs).FindBrokerAddressInSubscribe(0xc4200cc000, 0xc4200248c0, 0xa, 0x0, 0x2f2ceb00, 0xc421935f80)
   	/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/internal/route.go:234 +0x11c fp=0xc4215e5bb0 sp=0xc4215e5b48 pc=0x87868c
   github.com/apache/rocketmq-client-go/consumer.(*defaultConsumer).tryFindBroker(0xc42116cb00, 0xc421610960, 0x0)
   	/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/consumer/consumer.go:1045 +0x72 fp=0xc4215e5bf0 sp=0xc4215e5bb0 pc=0x889602
   github.com/apache/rocketmq-client-go/consumer.(*pushConsumer).pullMessage(0xc42016c960, 0xc421674210)
   	/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/consumer/push_consumer.go:563 +0x103e fp=0xc4215e5fb0 sp=0xc4215e5bf0 pc=0x8904be
   github.com/apache/rocketmq-client-go/consumer.(*pushConsumer).Start.func1.2.1(0xc42016c960, 0xc421674210)
   	/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/consumer/push_consumer.go:168 +0x35 fp=0xc4215e5fd0 sp=0xc4215e5fb0 pc=0x899255
   runtime.goexit()
   	/mnt/go/src/runtime/asm_amd64.s:2361 +0x1 fp=0xc4215e5fd8 sp=0xc4215e5fd0 pc=0x4582e1
   created by github.com/apache/rocketmq-client-go/consumer.(*pushConsumer).Start.func1.2
   	/mnt/iss/.jenkins/workspace/ClePackerBigdata/dispatching-dcmgr/src/lib/third/src/github.com/apache/rocketmq-client-go/consumer/push_consumer.go:167 +0x63

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-client-go] bobo888 commented on issue #341: [Bug]Use comsumer crash

Posted by GitBox <gi...@apache.org>.
bobo888 commented on issue #341: [Bug]Use comsumer crash
URL: https://github.com/apache/rocketmq-client-go/issues/341#issuecomment-568852600
 
 
   I modify like this.
   
   rocketmq-client-go/internal/namesrv.go
   
   ```go
   func (s *namesrvs) updateBrokerVersion(brokerName string, version int32) {
   	s.lock.Lock()
   	defer s.lock.Unlock()
   
   	v, exist := s.brokerVersionMap.Load(brokerName)
   	var m map[string]int32
   	if exist {
   		m = v.(map[string]int32)
   	} else {
   		m = make(map[string]int32, 4)
   		s.brokerVersionMap.Store(brokerName, m)
   	}
   	m[brokerName] = version
   }
   
   func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
   	s.lock.Lock()
   	defer s.lock.Unlock()
   
   	versions, exist := s.brokerVersionMap.Load(brokerName)
   	if !exist {
   		return 0
   	}
   
   	v, exist := versions.(map[string]int32)[brokerAddr]
   	if exist {
   		return v
   	}
   
   	return 0
   }
   ```
   
   rocketmq-client-go/internal/client.go
   
   ```go
   func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
   	c.hbMutex.Lock()
   	defer c.hbMutex.Unlock()
   	hbData := NewHeartbeatData(c.ClientID())
   
   	c.producerMap.Range(func(key, value interface{}) bool {
   		pData := producerData{
   			GroupName: key.(string),
   		}
   		hbData.ProducerDatas.Add(pData)
   		return true
   	})
   
   	c.consumerMap.Range(func(key, value interface{}) bool {
   		consumer := value.(InnerConsumer)
   		cData := consumerData{
   			GroupName:         key.(string),
   			CType:             "PUSH",
   			MessageModel:      "CLUSTERING",
   			Where:             "CONSUME_FROM_FIRST_OFFSET",
   			UnitMode:          consumer.IsUnitMode(),
   			SubscriptionDatas: consumer.SubscriptionDataList(),
   		}
   		hbData.ConsumerDatas.Add(cData)
   		return true
   	})
   	if hbData.ProducerDatas.Len() == 0 && hbData.ConsumerDatas.Len() == 0 {
   		rlog.Info("sending heartbeat, but no producer and no consumer", nil)
   		return
   	}
   	c.namesrvs.brokerAddressesMap.Range(func(key, value interface{}) bool {
   		brokerName := key.(string)
   		data := value.(*BrokerData)
   		for id, addr := range data.BrokerAddresses {
   			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
   			response, err := c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
   			if err != nil {
   				rlog.Warning("send heart beat to broker error", map[string]interface{}{
   					rlog.LogKeyUnderlayError: err,
   				})
   				return true
   			}
   			if response.Code == ResSuccess {
   				c.namesrvs.updateBrokerVersion(brokerName, int32(response.Version))
   				rlog.Debug("send heart beat to broker success", map[string]interface{}{
   					"brokerName": brokerName,
   					"brokerId":   id,
   					"brokerAddr": addr,
   				})
   			}
   		}
   		return true
   	})
   }
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services