You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/06/30 08:27:37 UTC

[GitHub] [rocketmq-client-go] smiletrl commented on issue #668: [Master]How could I pass other arguments to ExecuteLocalTransaction function when I send transaction message

smiletrl commented on issue #668:
URL: https://github.com/apache/rocketmq-client-go/issues/668#issuecomment-871201564


   @ThreeBearsDan  This project kind of moves pretty slow on new pull request.
   
   Also, sometimes it's a bad idea to depend on the mq transaction message directly. For example, if mq is temporarily down, some of the most critical business features are down, just because the mq transaction message has failed to be sent. For example, the order service will be totally broken because mq message can not send order paid/canceled event transaction message. Or the transaction message content somehow depends on the local transaction committed data.
   
   Here's  one solution to make use of the transaction message.
   
   To keep strong consistency, instead of moving all the business logic into the transaction listener's `ExecuteLocalTransaction`, the business logic keeps in the original place. When local transaction has succeeded, the mq message can subscribe to the `local database change` to send a transaction message. For example,  if order's status has changed from `pending_payment` to `success_paid`, then a new mq message should be sent out.
   
   It might be too much work to write a subscription system to subscribe to database binlog change. So the idea is to add a middle table as `mq_pending_messages` as
   
   ```
   id, tag, topic, property, created_at
   
   1, order_paid, order, {order_id: 23, user_id: 12}, 2021-02-12
   ```
    
   When local transaction has made the order paid change, include a new record in above table within this local transaction change. Column `property` is a json field to hold the message fields to be sent out. The mq transaction message will include the extra id value in this table, maybe as `pending_id` field.
   
   Then send a transaction message based on the record from above table. It will delete above record in transaction message's `ExecuteLocalTransaction`, and `CheckLocalTransaction` will look for this middle table and see `pending_id` inside this message has been deleted from this table `mq_pending_messages`.
   
   Here's the code sample
   
   ```
   func (c creation) defaultAfterSuccessPayment() Invoker {
   	return func(ctx Context, order *entity.Order) error {
   		// other local transaction
   		// ...
   
   		// now set order status to be paid
   		status := constants.GetOrderPaidStatus(c.Method())
   		tx := ctx.TX()
   		if err := tx.Model(entity.Order{}).
   			Where("id = ?", order.ID).
   			Update("status", status).Error; err != nil {
   			return errors.Wrapf(err, "error updating order: %d status to be status: %s after success payment", order.ID, status)
   		}
   
   		// create a new record in table `mq_pending_message`
   		property := OrderPaidMessageProperty{
   			OrderID:       order.ID,
   			PaymentMethod: ctx.PaymentMethod(),
   			UserID:        ctx.UserID(),
   			Amount:        order.ActualAmount,
   		}
   
   		bytes, err := json.Marshal(ctx)
   		if err != nil {
   			return errors.Wrapf(err, "error marshal mq pending message property in order creation with %v", property)
   		}
   
   		mm := entity.MqPendingMessage{
   			Tag:       constants.RocketMQTagOrderPaid.String(),
   			Property:  bytes,
   			Topic:     "API",
   			CreatedAt: time.Now(),
   		}
   		if err := tx.Create(&mm).Error; err != nil {
   			return errors.Wrapf(err, "error creation mq pending message in order creation with %v", mm)
   		}
   
   		// commit transaction
   		if err := tx.Commit().Error; err != nil {
   			return errors.Wrapf(err, "error committing in order creation with user id: %d", ctx.UserID())
   		}
   
   		// send transaction message here
   		msg, err := rocketmq.NewMessage(ctx.Context()).
   			SetInt64("order_id", order.ID).
   			SetString("payment_method", ctx.PaymentMethod()).
   			SetInt64("user_id", ctx.UserID()).
   			SetInt("amount", order.ActualAmount).
   			Encode(constants.RocketMQTagPaymentSucceed)
   		if err != nil {
   			logger.Logger.Errorf("order creation: error creating message with order id: %d and err: %v", order.ID, err)
   			return nil
   		}
   		producer := MQProducers[constants.RocketMQProducerGroupPayment]
   		if _, err = producer.SendMessageInTransaction(ctx.Context(), msg); err != nil {
   			logger.Logger.Errorf("order creation: error sending message with order id: %d and err: %v", order.ID, err)
   			return nil
   		}
   		return nil
   	}
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org