You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/09/03 09:05:53 UTC

[GitHub] joewee opened a new issue #447: 发送事务消息时执行本地事务方法:executeLocalTransaction返回LocalTransactionState.COMMIT_MESSAGE,回查本地事务方法:checkLocalTransaction但仍然会被执行

joewee opened a new issue #447: 发送事务消息时执行本地事务方法:executeLocalTransaction返回LocalTransactionState.COMMIT_MESSAGE,回查本地事务方法:checkLocalTransaction但仍然会被执行
URL: https://github.com/apache/rocketmq/issues/447
 
 
   **BUG REPORT**
   
   执行发送事务消息,listener代码:
   
   public class TransactionListenerImpl implements TransactionListener {
       private AtomicInteger count =new AtomicInteger(0);
       @Override
       public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
           System.out.println("执行本地事务..."+msg.toString());
           return LocalTransactionState.COMMIT_MESSAGE;
       }
       @Override
       public LocalTransactionState checkLocalTransaction(MessageExt msg) {
           System.out.println("回查本地事务..."+msg.toString());
           System.out.println("第"+count.incrementAndGet()+"次回查事务");
           return LocalTransactionState.COMMIT_MESSAGE;
       }
   }
   
   如上:
   executeLocalTransaction方法返回了LocalTransactionState.COMMIT_MESSAGE,但还是会执行回查本地事务方法checkLocalTransaction,我很疑惑不知道我是否忽视了其他什么配置,调试源码调试到DefaultMQProducerImpl类的
   public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                             final LocalTransactionExecuter localTransactionExecuter, final Object arg)方法如下部分:
   
   LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
           Throwable localException = null;
           switch (sendResult.getSendStatus()) {
               case SEND_OK: {
                   try {
                       if (sendResult.getTransactionId() != null) {
                           msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                       }
                       String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                       if (null != transactionId && !"".equals(transactionId)) {
                           msg.setTransactionId(transactionId);
                       }
                       if (null != localTransactionExecuter) {
                           localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                       } else if (transactionListener != null) {
                           log.debug("Used new transaction API");
                           transactionListener.executeLocalTransaction(msg, arg);
                       }
                       if (null == localTransactionState) {
                           localTransactionState = LocalTransactionState.UNKNOW;
                       }
   
                       if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                           log.info("executeLocalTransactionBranch return {}", localTransactionState);
                           log.info(msg.toString());
                       }
                   } catch (Throwable e) {
                       log.info("executeLocalTransactionBranch exception", e);
                       log.info(msg.toString());
                       localException = e;
                   }
               }
               break;
               case FLUSH_DISK_TIMEOUT:
               case FLUSH_SLAVE_TIMEOUT:
               case SLAVE_NOT_AVAILABLE:
                   localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                   break;
               default:
                   break;
           }
   
   发现
   else if (transactionListener != null) {
                           log.debug("Used new transaction API");
                           transactionListener.executeLocalTransaction(msg, arg);
                       }
   该段代码未将返回结果赋值给localTransactionState,这是否是导致以上问题的原因?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services