You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/10/14 07:48:35 UTC

[GitHub] [rocketmq-client-go] yangjian102621 opened a new issue, #946: Orderly message does not work

yangjian102621 opened a new issue, #946:
URL: https://github.com/apache/rocketmq-client-go/issues/946

   
   
   
   
   **BUG REPORT**  
   When i send some orderly message with producer, there are not in ordered when the consumer pull the message.
   
   **ENV**
        - OS: Ubuntu22.04
        - Golang Version: go1.17.10 linux/amd64
        - rocketmq-client-go version: v2.1.1
        - RocketMQ version: 4.9.4
   
   **Producer code**
   
   ```go
   p, err := rocketmq.NewProducer(
   		producer.WithNameServer(config.NameSrvAddr),
   		producer.WithGroupName(config.ProducerGroup),
   		producer.WithRetry(2),
   	)
   	if err != nil {
   		panic(err)
   	}
   
   	err = p.Start()
   	if err != nil {
   		panic(err)
   	}
   
   	for i := 0; i < 3; i++ {
   		orderId := strconv.Itoa(i)
   		for j := 0; j < 5; j++ {
   			msg := &primitive.Message{
   				Topic: config.Topic,
   				Body:  []byte("Ordered Message Step -> " + strconv.Itoa(j)),
   			}
   			msg.WithShardingKey(orderId)
   			res, err := p.SendSync(context.Background(), msg)
   			if err != nil {
   				fmt.Errorf("send message success: result=%s\n", res.String())
                                   continue
   			}
   
   			fmt.Printf("send message success: result=%s\n", res.String())
   		}
   	}
   
   	// close producer
   	err = p.Shutdown()
   	if err != nil {
   		fmt.Printf("shutdown producer error: %s\n", err.Error())
   	}
   ```
   
   **Consumer code**
   
   ```go
   c, err := rocketmq.NewPushConsumer(
   		consumer.WithGroupName(config.ConsumerGroup),
   		consumer.WithNameServer(config.NameSrvAddr),
   		consumer.WithConsumerModel(consumer.Clustering),
   		consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
   		consumer.WithConsumerOrder(true),
   	)
   	if err != nil {
   		panic(err)
   	}
   
   	err = c.Subscribe(config.Topic, consumer.MessageSelector{}, func(ctx context.Context,
   		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
   		orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
   		logrus.Infof("orderly context: %v\n", orderlyCtx)
   		for i := range msgs {
   			logrus.Infof("Receive message: %s, orderId: %s", msgs[i].Body, msgs[i].GetShardingKey())
   		}
   		return consumer.ConsumeSuccess, nil
   	})
   	if err != nil {
   		panic(err)
   	}
   
   	// Note: start after subscribe
   	err = c.Start()
   	if err != nil {
   		panic(err)
   	}
   
   	logrus.Info("Consumer Started.")
   
   	stop := make(chan os.Signal)
   	signal.Notify(stop, os.Interrupt, os.Kill)
   	select {
   	case <-stop:
   		err = c.Shutdown()
   		if err != nil {
   			fmt.Printf("shutdown Consumer error: %s", err.Error())
   		}
   	}
   ```
   
   I expected the steps of the message for the same order to be in order, but it's not actually
   
   ```
   INFO[0003] Receive message: Ordered Message Step -> 2, orderId: 0 
   INFO[0003] Receive message: Ordered Message Step -> 1, orderId: 1 
   INFO[0003] Receive message: Ordered Message Step -> 0, orderId: 2 
   INFO[0003] Receive message: Ordered Message Step -> 4, orderId: 2 
   INFO[0003] Receive message: Ordered Message Step -> 1, orderId: 0 
   INFO[0003] Receive message: Ordered Message Step -> 0, orderId: 1 
   INFO[0003] Receive message: Ordered Message Step -> 4, orderId: 1 
   INFO[0003] Receive message: Ordered Message Step -> 3, orderId: 2 
   INFO[0003] Receive message: Ordered Message Step -> 3, orderId: 0 
   INFO[0003] Receive message: Ordered Message Step -> 2, orderId: 1 
   INFO[0003] Receive message: Ordered Message Step -> 1, orderId: 2 
   INFO[0003] Receive message: Ordered Message Step -> 0, orderId: 0 
   INFO[0003] Receive message: Ordered Message Step -> 4, orderId: 0 
   INFO[0003] Receive message: Ordered Message Step -> 3, orderId: 1 
   INFO[0003] Receive message: Ordered Message Step -> 2, orderId: 2 
   ```
   
   Some thing else is that i send orderly message with java SDK work OK, the go client consumer could receive orderly message.
   
   **Java SDK producer**
   
   ```java
   ublic class OrderProducer {
   	public static void main(String[] args) throws Exception
   	{
   		DefaultMQProducer producer = new DefaultMQProducer(Config.PRODUCER_GROUP_NAME);
   		producer.setNamesrvAddr(Config.NAME_SRV_ADDR);
   		producer.start();
   
   		for (int i = 1; i < 3; i++) {
   			int orderId = i;
   			for (int j = 1; j <= 5;j++){
   				Message msg = new Message(
   						Config.TOPIC,
   						"Order_"+orderId,
   						"KEY" + orderId,
   						("Order Step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET)
   				);
   
   				SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
   					@Override
   					public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
   						Integer id = (Integer) arg;
   						int index = id % mqs.size();
   						return mqs.get(index);
   					}
   				}, orderId);
   
   				System.out.printf("%s%n", sendResult);
   			}
   		}
   
   		// close producer
   		producer.shutdown();
   	}
   }
   ```
   
   The golang consumer client received ordered messages:
   
   ```
   INFO[0000] Consumer Started.                            
   INFO[0063] Receive message: Order Step 1, orderId: KEY2 
   INFO[0063] Receive message: Order Step 2, orderId: KEY2 
   INFO[0063] Receive message: Order Step 1, orderId: KEY4 
   INFO[0063] Receive message: Order Step 2, orderId: KEY4 
   INFO[0063] Receive message: Order Step 3, orderId: KEY4 
   INFO[0063] Receive message: Order Step 4, orderId: KEY4 
   INFO[0063] Receive message: Order Step 5, orderId: KEY4 
   INFO[0063] Receive message: Order Step 3, orderId: KEY2 
   INFO[0063] Receive message: Order Step 4, orderId: KEY2 
   INFO[0063] Receive message: Order Step 5, orderId: KEY2 
   INFO[0063] Receive message: Order Step 1, orderId: KEY1 
   INFO[0063] Receive message: Order Step 2, orderId: KEY1 
   INFO[0063] Receive message: Order Step 3, orderId: KEY1 
   INFO[0063] Receive message: Order Step 1, orderId: KEY3 
   INFO[0063] Receive message: Order Step 2, orderId: KEY3 
   INFO[0063] Receive message: Order Step 3, orderId: KEY3 
   INFO[0063] Receive message: Order Step 4, orderId: KEY3 
   INFO[0063] Receive message: Order Step 5, orderId: KEY3 
   INFO[0063] Receive message: Order Step 4, orderId: KEY1 
   INFO[0063] Receive message: Order Step 5, orderId: KEY1 
   INFO[0063] Receive message: Order Step 1, orderId: KEY5 
   INFO[0063] Receive message: Order Step 2, orderId: KEY5 
   INFO[0063] Receive message: Order Step 3, orderId: KEY5 
   INFO[0063] Receive message: Order Step 4, orderId: KEY5 
   INFO[0063] Receive message: Order Step 5, orderId: KEY5 
   ```
   
   I do not know what the matter is , i need help, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-client-go] yangjian102621 commented on issue #946: Orderly message does not work

Posted by GitBox <gi...@apache.org>.
yangjian102621 commented on issue #946:
URL: https://github.com/apache/rocketmq-client-go/issues/946#issuecomment-1278808542

   I had find the reason. i should have specify a QueueSelector for the producer.
   
   ```go
   type MyQueueSelector struct{}
   
   func (q MyQueueSelector) Select(msg *primitive.Message, mqs []*primitive.MessageQueue) *primitive.MessageQueue {
   	orderId, err := strconv.Atoi(msg.GetShardingKey())
   	if err == nil {
   		return mqs[orderId%len(mqs)]
   	} else {
   		return mqs[0]
   	}
   }
   
   // then create producer with specified queue selector
   p, err := rocketmq.NewProducer(
   		producer.WithNameServer(config.NameSrvAddr),
   		producer.WithGroupName(config.ProducerGroup),
   		producer.WithRetry(2),
   		producer.WithQueueSelector(MyQueueSelector{}),
   	)
   ```
   Then it works.
   
   ```go
   INFO[0063] Receive message: Ordered Message Step -> 0, orderId: 0 
   INFO[0063] Receive message: Ordered Message Step -> 1, orderId: 0 
   INFO[0063] Receive message: Ordered Message Step -> 2, orderId: 0 
   INFO[0063] Receive message: Ordered Message Step -> 3, orderId: 0 
   INFO[0063] Receive message: Ordered Message Step -> 4, orderId: 0 
   INFO[0063] Receive message: Ordered Message Step -> 0, orderId: 1 
   INFO[0063] Receive message: Ordered Message Step -> 1, orderId: 1 
   INFO[0063] Receive message: Ordered Message Step -> 2, orderId: 1 
   INFO[0063] Receive message: Ordered Message Step -> 3, orderId: 1 
   INFO[0063] Receive message: Ordered Message Step -> 4, orderId: 1 
   INFO[0063] Receive message: Ordered Message Step -> 0, orderId: 2 
   INFO[0063] Receive message: Ordered Message Step -> 1, orderId: 2 
   INFO[0063] Receive message: Ordered Message Step -> 2, orderId: 2 
   INFO[0063] Receive message: Ordered Message Step -> 3, orderId: 2 
   INFO[0063] Receive message: Ordered Message Step -> 4, orderId: 2
   ```
   
   And i found it does not had a example for orderly message Producer,  considering add one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-client-go] yangjian102621 commented on issue #946: Orderly message does not work

Posted by GitBox <gi...@apache.org>.
yangjian102621 commented on issue #946:
URL: https://github.com/apache/rocketmq-client-go/issues/946#issuecomment-1296639307

   > 
   
   @Victor1319 got it, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-client-go] yangjian102621 closed issue #946: Orderly message does not work

Posted by GitBox <gi...@apache.org>.
yangjian102621 closed issue #946: Orderly message does not work
URL: https://github.com/apache/rocketmq-client-go/issues/946


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-client-go] Victor1319 commented on issue #946: Orderly message does not work

Posted by GitBox <gi...@apache.org>.
Victor1319 commented on issue #946:
URL: https://github.com/apache/rocketmq-client-go/issues/946#issuecomment-1294527126

   can use `NewHashQueueSelector` as your selector


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org