You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "kaybinwong (via GitHub)" <gi...@apache.org> on 2023/03/20 15:12:17 UTC

[GitHub] [rocketmq-client-go] kaybinwong opened a new issue, #1016: PullBrokerTimeout happed frequently when pulling msg

kaybinwong opened a new issue, #1016:
URL: https://github.com/apache/rocketmq-client-go/issues/1016

   **BUG REPORT**  
   1. Please describe the issue you observed:
   
   pulling msg with 1.2.4, but got PullBrokerTimeout error, it did't hanppend with spring client.
   ```
   ……
   
   				err := p.pullMQ(market)
   				if err != nil {
   					log.Warnf("StartConsume pullMQ fail, symbol: %s, err: %v, market:%+v", symbol, err, *market)
   					time.Sleep(ERRSLEEP)
   				}
   ……
   
   ```
   
   ```golang
   // 通过pull用offset的方式来避免重复消费
   func (p *rocketMQProvider) pullMQ(market *Market) error {
   	symbol := market.symbol
   	topic := TopicNameOrderProvider(symbol)
   	//----start---------------
   	mqs := p.consumer.FetchSubscriptionMessageQueues(topic)
   	//mqs := p.consumer.MessageQueues(topic)
   	//----end---------------
   	log.Debugf("StartConsume fetch subscription topic:%s, mqs:%v", topic, mqs)
   	if len(mqs) != 1 {
   		//为了确保顺序性,不能用多个queue
   		return errors.New(fmt.Sprintf("topic queue num error, topic:%s, num: %d", topic, len(mqs)))
   	}
   
   	mq := mqs[0]
   	offset := market.offset
   
   	//---------------start-----------------
   	pr := p.consumer.Pull(mq, "*", offset, 800)
   	//pr, _ := p.consumer.PullFrom(context.TODO(), mq, offset, 800)
   	//---------------end-----------------
   
   	log.Debugf("Leader pull after, offset: %d, mq: %s, result: %+v", offset, mq.String(), pr)
   
   	switch pr.Status {
   	case rocketmq.PullNoNewMsg:
   		//case primitive.PullNoNewMsg:
   		log.Debugf("rocketmq pull nonewmasg, market:%+v", market)
   		time.Sleep(time.Millisecond * time.Duration(p.noMsgSleep))
   		return nil
   	case rocketmq.PullFound:
   		//case primitive.PullFound:
   		var offsetOrders *OffsetOrders
   		isFirst := true
   		//for _, msg := range pr.GetMessageExts() {
   		for _, msg := range pr.Messages {
   			order, err := PbToOrder(msg)
   			if err != nil {
   				log.Warnf("rocketmq formatOrder failed, offset:%d, err: %v", msg.QueueOffset, err)
   				continue
   			}
   			if isFirst {
   				offsetOrders = &OffsetOrders{}
   				offsetOrders.BeginOffset = order.Offset
   				isFirst = false
   			}
   			//凑够10个(OffsetOrdersSize)
   			if len(offsetOrders.Orders) >= OffsetOrdersSize {
   				//凑够10个就发出去
   				market.orderCh <- offsetOrders
   				//crete new group
   				offsetOrders = &OffsetOrders{}
   				offsetOrders.BeginOffset = order.Offset
   			}
   			offsetOrders.Orders = append(offsetOrders.Orders, order)
   			offsetOrders.EndOffset = order.Offset
   			log.Debugf("provider send orderCh item offsetOrders:%+v, order:%+v", *offsetOrders, *order)
   		}
   
   		if offsetOrders != nil && len(offsetOrders.Orders) > 0 {
   			//这里来获取的.
   			market.orderCh <- offsetOrders
   		}
   	case rocketmq.PullNoMatchedMsg:
   		//case primitive.PullNoMsgMatched:
   		market.offset = pr.NextBeginOffset
   		log.Errorf("broker PullNoMatchedMsg, offset:%d, pr:%+v", offset, pr)
   		return errors.New("broker PullNoMatchedMsg")
   	case rocketmq.PullOffsetIllegal:
   		//case primitive.PullOffsetIllegal:
   		market.offset = pr.NextBeginOffset
   		log.Errorf("broker PullOffsetIllegal, offset:%d, pr:%+v", offset, pr)
   		return errors.New("broker PullOffsetIllegal")
   	case rocketmq.PullBrokerTimeout:
   		//case primitive.PullBrokerTimeout:
   		log.Errorf("broker timeout occur")
   		return errors.New("broker timeout occur")
   	}
   	market.offset = pr.NextBeginOffset
   	maxOffset := pr.MaxOffset
   	if (maxOffset - market.offset) > 1000 {
   		log.Warnf("MatchMessageBlocked, symbol:%s, blockNum:%d, maxOffset:%d, offset:%d", market.symbol, maxOffset-offset, maxOffset, offset)
   	}
   
   	return nil
   }
   ```
   
   2. Please tell us about your environment:
   
        - What is your OS?
        - kubernetes 1.24
   
        - What is your client version?
        - 1.2.4
   
        - What is your RocketMQ version?
        - 4.7.1
   
   3. Other information (e.g. detailed explanation, logs, related issues, suggestions on how to fix, etc):
   
   ```
   2023-03-20 22:48:24	
   time="2023-03-20T22:48:24+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:48:17	
   time="2023-03-20T22:48:17+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:48:09	
   time="2023-03-20T22:48:09+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:48:04	
   time="2023-03-20T22:48:04+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:46:38	
   time="2023-03-20T22:46:38+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:45:29	
   time="2023-03-20T22:45:29+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:45:06	
   time="2023-03-20T22:45:06+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:43:57	
   time="2023-03-20T22:43:57+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:43:41	
   time="2023-03-20T22:43:41+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:43:23	
   time="2023-03-20T22:43:23+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:40:56	
   time="2023-03-20T22:40:56+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:39:34	
   time="2023-03-20T22:39:34+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:38:18	
   time="2023-03-20T22:38:18+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:36:48	
   time="2023-03-20T22:36:48+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:35:01	
   time="2023-03-20T22:35:01+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:33:19	
   time="2023-03-20T22:33:19+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:32:56	
   time="2023-03-20T22:32:56+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:31:15	
   time="2023-03-20T22:31:15+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:31:01	
   time="2023-03-20T22:31:01+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:30:34	
   time="2023-03-20T22:30:34+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:30:13	
   time="2023-03-20T22:30:13+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:29:41	
   time="2023-03-20T22:29:41+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:29:34	
   time="2023-03-20T22:29:34+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:29:16	
   time="2023-03-20T22:29:16+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:28:03	
   time="2023-03-20T22:28:03+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:27:25	
   time="2023-03-20T22:27:25+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:26:58	
   time="2023-03-20T22:26:58+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:26:06	
   time="2023-03-20T22:26:06+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:25:12	
   time="2023-03-20T22:25:12+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:23:47	
   time="2023-03-20T22:23:47+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:23:03	
   time="2023-03-20T22:23:03+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:22:53	
   time="2023-03-20T22:22:53+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:21:53	
   time="2023-03-20T22:21:53+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   
   
   2023-03-20 22:21:29	
   time="2023-03-20T22:21:29+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:20:13	
   time="2023-03-20T22:20:13+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-client-go] francisoliverlee commented on issue #1016: PullBrokerTimeout happed frequently when pulling msg

Posted by "francisoliverlee (via GitHub)" <gi...@apache.org>.
francisoliverlee commented on issue #1016:
URL: https://github.com/apache/rocketmq-client-go/issues/1016#issuecomment-1520285455

   please upgrade to lastest version, 1.2.4 is still build with CGO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org