You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/12/09 06:02:36 UTC

[GitHub] [rocketmq] largeTree opened a new issue #2496: when i use the `TransactionSendResult` to call `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)` i got an exception

largeTree opened a new issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496


   The issue tracker is **ONLY** used for bug report(feature request need to follow [RIP process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)). Keep in mind, please check whether there is an existing same report before your raise a new one.
   
   Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration and other improvements.
   
   Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:
   
   **BUG REPORT**
   1. `TransactionSendResult.offsetMsgId` doesn't be seted in method `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendMessageInTransaction(Message, LocalTransactionExecuter, Object)`
   2. when i use the `TransactionSendResult` to call `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)`  i got an exception, like this 
   ``` java
   java.lang.IllegalArgumentException: port out of range:1393301684
   	at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
   	at java.net.InetSocketAddress.<init>(InetSocketAddress.java:188)
   	at org.apache.rocketmq.common.message.MessageDecoder.decodeMessageId(MessageDecoder.java:87)
   	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(DefaultMQProducerImpl.java:1236)
   	at com.qiuxs.boot.demo.mq.ProducerUtils.endTransaction(ProducerUtils.java:58)
   	at com.qiuxs.boot.demo.mq.ProducerUtils.commit(ProducerUtils.java:46)
   	at com.qiuxs.boot.demo.service.TxService$1.afterCompletion(TxService.java:21)
   	at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:171)
   	at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:989)
   	at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:964)
   	at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:785)
   	at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
   	at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:633)
   	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:386)
   	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118)
   	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
   	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
   	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
   	at com.qiuxs.boot.demo.service.TxService$$EnhancerBySpringCGLIB$$4f492874.doInTx(<generated>)
   	at com.qiuxs.boot.demo.ctrl.MqController.send(MqController.java:20)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   ```
   ![image](https://user-images.githubusercontent.com/15036686/101591228-fcd64080-3a26-11eb-8880-5d22e3fb1663.png)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] francisoliverlee edited a comment on issue #2496: when i use the `TransactionSendResult` to call `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)` i got an exception

Posted by GitBox <gi...@apache.org>.
francisoliverlee edited a comment on issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496#issuecomment-745722822


   if you want to end transaction,  return LocalTransactionState.ROLLBACK_MESSAGE in executeLocalTransaction()
   
   what's that mean for bellowing
   ```
   when i use the TransactionSendResult to call org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable) i got an exception, like this
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] largeTree edited a comment on issue #2496: when i use the `TransactionSendResult` to call `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)` i got an exception

Posted by GitBox <gi...@apache.org>.
largeTree edited a comment on issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496#issuecomment-741552908


   i want manual commit or rollback my transactionMessage before broker send check message to check the transaction status!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] largeTree edited a comment on issue #2496: when i use the `TransactionSendResult` to call `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)` i got an exception

Posted by GitBox <gi...@apache.org>.
largeTree edited a comment on issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496#issuecomment-747846243


   I use this because I need to construct and send messages during the transaction, but I need these messages to be delivered after the transaction is committed
   
   Just like the demo below
   
   ```
   public class ProducerUtils {
   
   	private static TransactionMQProducer producer;
   	private static ThreadLocal<Map<String, SendResult>> txMessageCache = new ThreadLocal<>();
   
   	private static void init() {
   		try {
   			producer = new TransactionMQProducer("transactionMQProducer");
   			producer.setNamesrvAddr("ip:port");
   			producer.setTransactionListener(new SelfTransactionListener());
   			producer.start();
   		} catch (MQClientException e) {
   			e.printStackTrace();
   		}
   	}
   
   	/**
   	 * send message in transacion and cache sendResult
   	 * 
   	 * @param msg
   	 * @param bizKey
   	 */
   	public static void sendPrepare(String msg, String bizKey) {
   		if (producer == null) {
   			init();
   		}
   		Message message = new Message("TransactionTopic", "transactionTest", bizKey, msg.getBytes());
   		try {
   			TransactionSendResult sendResult = producer.sendMessageInTransaction(message, bizKey);
   			
   			// cached in threadlocal 
   			Map<String, SendResult> map = txMessageCache.get();
   			if (map == null) {
   				map = new HashMap<>();
   				txMessageCache.set(map);
   			}
   			map.put(sendResult.getMsgId(), sendResult);
   			
   			System.out.println("cache sendResult:" + sendResult.getMsgId() + ", txId = " + sendResult.getTransactionId());
   		} catch (MQClientException e) {
   			e.printStackTrace();
   		}
   	}
   
   	public static void commit() {
   		endTransaction(LocalTransactionState.COMMIT_MESSAGE);
   	}
   
   	public static void rollback() {
   		endTransaction(LocalTransactionState.ROLLBACK_MESSAGE);
   	}
   	
   	public static Map<String, SendResult> getTxMessageCacheMap() {
   		return txMessageCache.get();
   	}
   
   	private static void endTransaction(LocalTransactionState localTransactionState) {
   		Map<String, SendResult> cacheMap = txMessageCache.get();
   		if (cacheMap == null) {
   			return;
   		}
   		
   		DefaultMQProducerImpl defaultMQProducerImpl = producer.getDefaultMQProducerImpl();
   		for (Iterator<Map.Entry<String, SendResult>> iter = cacheMap.entrySet().iterator(); iter.hasNext();) {
   			try {
   				
   				Entry<String, SendResult> entry = iter.next();
   				System.out.println("doEndTransaction : " + localTransactionState);
   				// Manually commit or roll back
   				defaultMQProducerImpl.endTransaction(entry.getValue(), localTransactionState, null);
   				iter.remove();
   			} catch (Exception e) {
   				e.printStackTrace();
   			}
   		}
   	}
   }
   
   public class SelfTransactionListener implements TransactionListener {
   
   	@Override
   	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
   		return LocalTransactionState.UNKNOW;
   	}
   
   	@Override
   	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
   		System.out.println("check local transaction status :" + new String(msg.getBody()));
   		String transactionId = msg.getTransactionId();
   		System.out.println("transactionId:" + transactionId);
   
   		Map<String, SendResult> cacheMap = ProducerUtils.getTxMessageCacheMap();
   		SendResult sendResult = null;
   		if (cacheMap != null) {
   			sendResult = cacheMap.get(msg.getMsgId());
   		}
   
   		LocalTransactionState state = LocalTransactionState.UNKNOW;
   		if (sendResult == null && checkTxCommited(msg)) {
   			state = LocalTransactionState.COMMIT_MESSAGE;
   		} else if (sendResult == null) { // sendResult not in the cache anymore
   			state = LocalTransactionState.ROLLBACK_MESSAGE;
   		}
   
   		return state;
   	}
   
   	/**
   	 * check tx state from database
   	 * 
   	 * @param msg
   	 * @return
   	 */
   	private boolean checkTxCommited(MessageExt msg) {
   		return false;  // or true 
   	}
   }
   
   @Service
   public class TxService {
   
   	@Transactional
   	public void doInTx(String msg, String key) {
   		
   		// Do some operations on the database with transaction 
   		
   		// send transaction message and executeLocalTransaction always return LocalTransactionState.UNKNOW
                   // Because it's in localTransaction right now
   		ProducerUtils.sendPrepare(msg, key);
   
   		TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
   			// When the spring transaction is complete, I will manually commit or roll back the transaction message sent earlier
   			@Override
   			public void afterCompletion(int state) {
   				if (TransactionSynchronization.STATUS_COMMITTED == state) {
   					ProducerUtils.commit(); // commit 
   				} else if (TransactionSynchronization.STATUS_ROLLED_BACK == state) {
   					ProducerUtils.rollback(); // rollback
   				}
   			}
   		});
   	}
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] vongosling closed issue #2496: when i use the `TransactionSendResult` to call `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)` i got an exception

Posted by GitBox <gi...@apache.org>.
vongosling closed issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] largeTree commented on issue #2496: when i use the `TransactionSendResult` to call `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)` i got an exception

Posted by GitBox <gi...@apache.org>.
largeTree commented on issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496#issuecomment-747846243


   I use this because I need to construct and send messages during the transaction, but I need these messages to be delivered after the transaction is committed
   
   Just like the demo below
   
   ```
   public class ProducerUtils {
   
   	private static TransactionMQProducer producer;
   	private static ThreadLocal<Map<String, SendResult>> txMessageCache = new ThreadLocal<>();
   
   	private static void init() {
   		try {
   			producer = new TransactionMQProducer("transactionMQProducer");
   			producer.setNamesrvAddr("ip:port");
   			producer.setTransactionListener(new SelfTransactionListener());
   			producer.start();
   		} catch (MQClientException e) {
   			e.printStackTrace();
   		}
   	}
   
   	/**
   	 * send message in transacion and cache sendResult
   	 * 
   	 * @param msg
   	 * @param bizKey
   	 */
   	public static void sendPrepare(String msg, String bizKey) {
   		if (producer == null) {
   			init();
   		}
   		Message message = new Message("TransactionTopic", "transactionTest", bizKey, msg.getBytes());
   		try {
   			TransactionSendResult sendResult = producer.sendMessageInTransaction(message, bizKey);
   			
   			// cached in threadlocal 
   			Map<String, SendResult> map = txMessageCache.get();
   			if (map == null) {
   				map = new HashMap<>();
   				txMessageCache.set(map);
   			}
   			map.put(sendResult.getMsgId(), sendResult);
   			
   			System.out.println("cache sendResult:" + sendResult.getMsgId() + ", txId = " + sendResult.getTransactionId());
   		} catch (MQClientException e) {
   			e.printStackTrace();
   		}
   	}
   
   	public static void commit() {
   		endTransaction(LocalTransactionState.COMMIT_MESSAGE);
   	}
   
   	public static void rollback() {
   		endTransaction(LocalTransactionState.ROLLBACK_MESSAGE);
   	}
   
   	private static void endTransaction(LocalTransactionState localTransactionState) {
   		Map<String, SendResult> cacheMap = txMessageCache.get();
   		if (cacheMap == null) {
   			return;
   		}
   		
   		DefaultMQProducerImpl defaultMQProducerImpl = producer.getDefaultMQProducerImpl();
   		for (Iterator<Map.Entry<String, SendResult>> iter = cacheMap.entrySet().iterator(); iter.hasNext();) {
   			try {
   				
   				Entry<String, SendResult> entry = iter.next();
   				System.out.println("doEndTransaction : " + localTransactionState);
   				// Manually commit or roll back
   				defaultMQProducerImpl.endTransaction(entry.getValue(), localTransactionState, null);
   				iter.remove();
   			} catch (Exception e) {
   				e.printStackTrace();
   			}
   		}
   	}
   }
   
   public class SelfTransactionListener implements TransactionListener {
   
   	@Override
   	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
   		return LocalTransactionState.UNKNOW;
   	}
   
   	@Override
   	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
   		System.out.println("check local transaction status :" + new String(msg.getBody()));
   		String transactionId = msg.getTransactionId();
   		System.out.println("transactionId:" + transactionId);
   
   		Map<String, SendResult> cacheMap = ProducerUtils.getTxMessageCacheMap();
   		SendResult sendResult = null;
   		if (cacheMap != null) {
   			sendResult = cacheMap.get(msg.getMsgId());
   		}
   
   		LocalTransactionState state = LocalTransactionState.UNKNOW;
   		if (sendResult == null && checkTxCommited(msg)) {
   			state = LocalTransactionState.COMMIT_MESSAGE;
   		} else if (sendResult == null) { // sendResult not in the cache anymore
   			state = LocalTransactionState.ROLLBACK_MESSAGE;
   		}
   
   		return state;
   	}
   
   	/**
   	 * check tx state from database
   	 * 
   	 * @param msg
   	 * @return
   	 */
   	private boolean checkTxCommited(MessageExt msg) {
   		return false;  // or true 
   	}
   }
   
   @Service
   public class TxService {
   
   	@Transactional
   	public void doInTx(String msg, String key) {
   		
   		// Do some operations on the database with transaction 
   		
   		// send transaction message and executeLocalTransaction always return LocalTransactionState.UNKNOW
                   // Because it's in localTransaction right now
   		ProducerUtils.sendPrepare(msg, key);
   
   		TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
   			// When the spring transaction is complete, I will manually commit or roll back the transaction message sent earlier
   			@Override
   			public void afterCompletion(int status) {
   				if (TransactionSynchronization.STATUS_COMMITTED == status) {
   					ProducerUtils.commit(); // commit 
   				} else if (TransactionSynchronization.STATUS_ROLLED_BACK == status) {
   					ProducerUtils.rollback(); // rollback
   				}
   			}
   		});
   	}
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] largeTree commented on issue #2496: when i use the `TransactionSendResult` to call `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)` i got an exception

Posted by GitBox <gi...@apache.org>.
largeTree commented on issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496#issuecomment-741552908


   i want manual commit my transactionMessage before broker send check message to check the transaction status!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] francisoliverlee commented on issue #2496: when i use the `TransactionSendResult` to call `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)` i got an exception

Posted by GitBox <gi...@apache.org>.
francisoliverlee commented on issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496#issuecomment-745722822


   if you want to end transaction,  return LocalTransactionState.ROLLBACK_MESSAGE in executeLocalTransaction()


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] largeTree edited a comment on issue #2496: when i use the `TransactionSendResult` to call `org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)` i got an exception

Posted by GitBox <gi...@apache.org>.
largeTree edited a comment on issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496#issuecomment-747846243


   I use this because I need to construct and send messages during the transaction, but I need these messages to be delivered after the transaction is committed
   
   Just like the demo below
   
   ```
   public class ProducerUtils {
   
   	private static TransactionMQProducer producer;
   	private static ThreadLocal<Map<String, SendResult>> txMessageCache = new ThreadLocal<>();
   
   	private static void init() {
   		try {
   			producer = new TransactionMQProducer("transactionMQProducer");
   			producer.setNamesrvAddr("ip:port");
   			producer.setTransactionListener(new SelfTransactionListener());
   			producer.start();
   		} catch (MQClientException e) {
   			e.printStackTrace();
   		}
   	}
   
   	/**
   	 * send message in transacion and cache sendResult
   	 * 
   	 * @param msg
   	 * @param bizKey
   	 */
   	public static void sendPrepare(String msg, String bizKey) {
   		if (producer == null) {
   			init();
   		}
   		Message message = new Message("TransactionTopic", "transactionTest", bizKey, msg.getBytes());
   		try {
   			TransactionSendResult sendResult = producer.sendMessageInTransaction(message, bizKey);
   			
   			// cached in threadlocal 
   			Map<String, SendResult> map = txMessageCache.get();
   			if (map == null) {
   				map = new HashMap<>();
   				txMessageCache.set(map);
   			}
   			map.put(sendResult.getMsgId(), sendResult);
   			
   			System.out.println("cache sendResult:" + sendResult.getMsgId() + ", txId = " + sendResult.getTransactionId());
   		} catch (MQClientException e) {
   			e.printStackTrace();
   		}
   	}
   
   	public static void commit() {
   		endTransaction(LocalTransactionState.COMMIT_MESSAGE);
   	}
   
   	public static void rollback() {
   		endTransaction(LocalTransactionState.ROLLBACK_MESSAGE);
   	}
   
   	private static void endTransaction(LocalTransactionState localTransactionState) {
   		Map<String, SendResult> cacheMap = txMessageCache.get();
   		if (cacheMap == null) {
   			return;
   		}
   		
   		DefaultMQProducerImpl defaultMQProducerImpl = producer.getDefaultMQProducerImpl();
   		for (Iterator<Map.Entry<String, SendResult>> iter = cacheMap.entrySet().iterator(); iter.hasNext();) {
   			try {
   				
   				Entry<String, SendResult> entry = iter.next();
   				System.out.println("doEndTransaction : " + localTransactionState);
   				// Manually commit or roll back
   				defaultMQProducerImpl.endTransaction(entry.getValue(), localTransactionState, null);
   				iter.remove();
   			} catch (Exception e) {
   				e.printStackTrace();
   			}
   		}
   	}
   }
   
   public class SelfTransactionListener implements TransactionListener {
   
   	@Override
   	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
   		return LocalTransactionState.UNKNOW;
   	}
   
   	@Override
   	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
   		System.out.println("check local transaction status :" + new String(msg.getBody()));
   		String transactionId = msg.getTransactionId();
   		System.out.println("transactionId:" + transactionId);
   
   		Map<String, SendResult> cacheMap = ProducerUtils.getTxMessageCacheMap();
   		SendResult sendResult = null;
   		if (cacheMap != null) {
   			sendResult = cacheMap.get(msg.getMsgId());
   		}
   
   		LocalTransactionState state = LocalTransactionState.UNKNOW;
   		if (sendResult == null && checkTxCommited(msg)) {
   			state = LocalTransactionState.COMMIT_MESSAGE;
   		} else if (sendResult == null) { // sendResult not in the cache anymore
   			state = LocalTransactionState.ROLLBACK_MESSAGE;
   		}
   
   		return state;
   	}
   
   	/**
   	 * check tx state from database
   	 * 
   	 * @param msg
   	 * @return
   	 */
   	private boolean checkTxCommited(MessageExt msg) {
   		return false;  // or true 
   	}
   }
   
   @Service
   public class TxService {
   
   	@Transactional
   	public void doInTx(String msg, String key) {
   		
   		// Do some operations on the database with transaction 
   		
   		// send transaction message and executeLocalTransaction always return LocalTransactionState.UNKNOW
                   // Because it's in localTransaction right now
   		ProducerUtils.sendPrepare(msg, key);
   
   		TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
   			// When the spring transaction is complete, I will manually commit or roll back the transaction message sent earlier
   			@Override
   			public void afterCompletion(int state) {
   				if (TransactionSynchronization.STATUS_COMMITTED == state) {
   					ProducerUtils.commit(); // commit 
   				} else if (TransactionSynchronization.STATUS_ROLLED_BACK == state) {
   					ProducerUtils.rollback(); // rollback
   				}
   			}
   		});
   	}
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org