You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Justin Bertram (Jira)" <ji...@apache.org> on 2021/09/15 21:48:00 UTC
[jira] [Resolved] (ARTEMIS-2546) Message loss when consumers
consume and disapper without acknowledging
[ https://issues.apache.org/jira/browse/ARTEMIS-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Justin Bertram resolved ARTEMIS-2546.
-------------------------------------
Resolution: Not A Problem
> Message loss when consumers consume and disapper without acknowledging
> ----------------------------------------------------------------------
>
> Key: ARTEMIS-2546
> URL: https://issues.apache.org/jira/browse/ARTEMIS-2546
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.10.1
> Environment: I'm on Windows 10, Oracle JVM 11.0.1
> Reporter: Viliam Durina
> Priority: Major
>
> I do a stress testing of my product where I check that I commit messages properly and that I get exactly-once delivery. I have multiple parallel consumers of a queue and I restart them and check for correct results. But occasionally I miss the initial messages. By checking the logs I found out that those were never returned from the `MessageConsumer.receive()` method, so I thought it's not me incorrectly committing and rolling back. I managed to write a standalone test that reproduces this issue. On my machine it fails in about 50% of cases, but I surmise that it might be micro-timing dependent, so you might tweak sleep times and what not.
> The test code:
> {noformat}
> import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
> import org.apache.activemq.artemis.junit.EmbeddedActiveMQResource;
> import org.junit.ClassRule;
> import org.junit.Test;
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.JMSException;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import javax.jms.XAConnection;
> import javax.jms.XAConnectionFactory;
> import javax.jms.XASession;
> import javax.transaction.xa.XAException;
> import javax.transaction.xa.XAResource;
> import javax.transaction.xa.Xid;
> import java.util.Arrays;
> import java.util.concurrent.ThreadLocalRandom;
> import java.util.concurrent.atomic.AtomicBoolean;
> import static java.util.concurrent.TimeUnit.NANOSECONDS;
> import static javax.jms.Session.AUTO_ACKNOWLEDGE;
> import static org.junit.Assert.assertTrue;
> public class ArtemisTest_NoInterrupts {
> @ClassRule
> public static EmbeddedActiveMQResource resource = new EmbeddedActiveMQResource();
> private volatile boolean receivedMsg0;
> @Test
> public void test() throws InterruptedException {
> AtomicBoolean driverShutdownFlag = new AtomicBoolean();
> Thread consumerDriverThread = new Thread(() -> consumerDriver(driverShutdownFlag));
> Thread producerThread = new Thread(this::producer);
> producerThread.start();
> consumerDriverThread.start();
> producerThread.join();
> driverShutdownFlag.set(true);
> consumerDriverThread.join();
> assertTrue("msg-0 not received", receivedMsg0);
> }
> private void consumerDriver(AtomicBoolean driverShutdownFlag) {
> try {
> Xid xid1 = new MyXid(new byte[] {1});
> Xid xid2 = new MyXid(new byte[] {2});
> for (int id = 0; !driverShutdownFlag.get(); id++) {
> int finalId = id;
> Thread thread1 = null;
> Thread thread2 = null;
> try {
> AtomicBoolean shutdownFlag = new AtomicBoolean();
> thread1 = new Thread(() -> consumer(xid1, finalId, shutdownFlag));
> thread2 = new Thread(() -> consumer(xid2, finalId, shutdownFlag));
> thread1.start();
> thread2.start();
> Thread.sleep(15 + id < 4 ? 0 : ThreadLocalRandom.current().nextInt(100));
> shutdownFlag.set(true);
> } finally {
> if (thread1 != null) {
> thread1.join();
> }
> if (thread2 != null) {
> thread2.join();
> }
> }
> }
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> }
> private void consumer(Xid xid, int id, AtomicBoolean shutdownFlag) {
> try (
> XAConnection conn = getConnectionFactory().createXAConnection();
> XASession sess = conn.createXASession()
> ) {
> conn.start();
> XAResource res = sess.getXAResource();
> // rollback the xid first
> try {
> res.rollback(xid);
> System.out.println("consumer " + id + " rolled back, xid=" + xid);
> } catch (XAException e) {
> if (e.errorCode == XAException.XAER_NOTA) {
> System.out.println("consumer " + id + " ignoring rollback, XAER_NOTA, xid=" + xid);
> }
> else throw e;
> }
> res.start(xid, XAResource.TMNOFLAGS);
> MessageConsumer consumer = sess.createConsumer(sess.createQueue("queue"));
> for (int msgCnt = 0; !shutdownFlag.get(); ) {
> TextMessage msg = (TextMessage) consumer.receiveNoWait();
> if (msg == null) {
> continue;
> }
> msgCnt++;
> if (msg.getText().equals("msg-0")) {
> receivedMsg0 = true;
> }
> System.out.println("consumer " + id + " received: " + msg.getText() + ", xid=" + xid);
> if (msgCnt % 10 == 9) {
> res.end(xid, XAResource.TMSUCCESS);
> res.prepare(xid);
> System.out.println("consumer " + id + " going to commit..., xid=" + xid);
> res.commit(xid, false);
> System.out.println("consumer " + id + " committed, xid=" + xid);
> res.start(xid, XAResource.TMNOFLAGS);
> }
> }
> System.out.println("consumer " + id + " shut down, xid=" + xid);
> } catch (XAException e) {
> throw new RuntimeException("XAException, errorCode=" + e.errorCode, e);
> } catch (JMSException e) {
> throw new RuntimeException(e);
> }
> }
> private void producer() {
> try (
> Connection conn = ((ConnectionFactory) getConnectionFactory()).createConnection();
> Session session = conn.createSession(false, AUTO_ACKNOWLEDGE);
> MessageProducer producer = session.createProducer(session.createQueue("queue"))
> ) {
> long startTime = System.nanoTime();
> for (int i = 0; i < 1_000; i++) {
> producer.send(session.createTextMessage("msg-" + i));
> Thread.sleep(Math.max(0, i - NANOSECONDS.toMillis(System.nanoTime() - startTime)));
> }
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> }
> private static XAConnectionFactory getConnectionFactory() {
> return new ActiveMQXAConnectionFactory(resource.getVmURL());
> }
> private static final class MyXid implements Xid {
> private final byte[] gtrid;
> private MyXid(byte[] gtrid) {
> this.gtrid = gtrid;
> }
> @Override
> public int getFormatId() {
> return 9999;
> }
> @Override
> public byte[] getGlobalTransactionId() {
> return gtrid;
> }
> @Override
> public byte[] getBranchQualifier() {
> return new byte[0];
> }
> @Override
> public String toString() {
> return Arrays.toString(gtrid);
> }
> }
> }
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)