You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/12/03 03:32:42 UTC

[GitHub] [rocketmq-client-go] tt-live commented on issue #741: BUG for shutdown?

tt-live commented on issue #741:
URL: https://github.com/apache/rocketmq-client-go/issues/741#issuecomment-985186805


   @wenfengwang 大佬,帮忙看看这个chan是否需要在dorebalance里面更新队列信息,删除队列的时候,同时关闭chan
   ```
   func (pq *processQueue) WithDropped(dropped bool) {
   	if dropped {
   		close(pq.msgCh)
   	}
   	pq.dropped.Store(dropped)
   }
   ```
   
   
   正常的应该是在队列删除的时候,就把这个chan关闭,不然会一直产生goroutine泄露的问题,另外我看了master分支的代码,在服务shutdown的时候,会关闭chan,那个地方的逻辑应该删除掉,不然服务重启的或者关闭的时候会引发panic,删除一个已经关闭的chan
   
   ```
   func (dc *defaultConsumer) shutdown() error {
   	atomic.StoreInt32(&dc.state, int32(internal.StateShutdown))
   
   	mqs := make([]*primitive.MessageQueue, 0)
   	dc.processQueueTable.Range(func(key, value interface{}) bool {
   		k := key.(primitive.MessageQueue)
   		pq := value.(*processQueue)
   		pq.WithDropped(true)
   		// close msg channel using RWMutex to make sure no data was writing
   		pq.mutex.Lock()
   		**// close(pq.msgCh) 这段代码应该删除掉**
   		pq.mutex.Unlock()
   		mqs = append(mqs, &k)
   		return true
   	})
   	dc.stat.ShutDownStat()
   	dc.storage.persist(mqs)
   	dc.client.Shutdown()
   	return nil
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org