You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Justin Bertram (Jira)" <ji...@apache.org> on 2021/09/15 21:47:00 UTC

[jira] [Commented] (ARTEMIS-2546) Message loss when consumers consume and disapper without acknowledging

    [ https://issues.apache.org/jira/browse/ARTEMIS-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415775#comment-17415775 ] 

Justin Bertram commented on ARTEMIS-2546:
-----------------------------------------

I ran your test and it failed as you suggested. However, I made a one-line change, and I couldn't make it fail after that despite running it hundreds of times. I simply enabled persistence on the embedded broker:
{code:java}
@ClassRule
public static EmbeddedActiveMQResource resource = new EmbeddedActiveMQResource(new ConfigurationImpl().setPersistenceEnabled(true).setSecurityEnabled(false).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())));{code}

Given that XA transactions are essentially worthless without a persistent transaction log I'd say that this is a prerequisite for your test.

> Message loss when consumers consume and disapper without acknowledging
> ----------------------------------------------------------------------
>
>                 Key: ARTEMIS-2546
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2546
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.10.1
>         Environment: I'm on Windows 10, Oracle JVM 11.0.1
>            Reporter: Viliam Durina
>            Priority: Major
>
> I do a stress testing of my product where I check that I commit messages properly and that I get exactly-once delivery. I have multiple parallel consumers of a queue and I restart them and check for correct results. But occasionally I miss the initial messages. By checking the logs I found out that those were never returned from the `MessageConsumer.receive()` method, so I thought it's not me incorrectly committing and rolling back. I managed to write a standalone test that reproduces this issue. On my machine it fails in about 50% of cases, but I surmise that it might be micro-timing dependent, so you might tweak sleep times and what not.
> The test code:
> {noformat}
> import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
> import org.apache.activemq.artemis.junit.EmbeddedActiveMQResource;
> import org.junit.ClassRule;
> import org.junit.Test;
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.JMSException;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import javax.jms.XAConnection;
> import javax.jms.XAConnectionFactory;
> import javax.jms.XASession;
> import javax.transaction.xa.XAException;
> import javax.transaction.xa.XAResource;
> import javax.transaction.xa.Xid;
> import java.util.Arrays;
> import java.util.concurrent.ThreadLocalRandom;
> import java.util.concurrent.atomic.AtomicBoolean;
> import static java.util.concurrent.TimeUnit.NANOSECONDS;
> import static javax.jms.Session.AUTO_ACKNOWLEDGE;
> import static org.junit.Assert.assertTrue;
> public class ArtemisTest_NoInterrupts {
>     @ClassRule
>     public static EmbeddedActiveMQResource resource = new EmbeddedActiveMQResource();
>     private volatile boolean receivedMsg0;
>     @Test
>     public void test() throws InterruptedException {
>         AtomicBoolean driverShutdownFlag = new AtomicBoolean();
>         Thread consumerDriverThread = new Thread(() -> consumerDriver(driverShutdownFlag));
>         Thread producerThread = new Thread(this::producer);
>         producerThread.start();
>         consumerDriverThread.start();
>         producerThread.join();
>         driverShutdownFlag.set(true);
>         consumerDriverThread.join();
>         assertTrue("msg-0 not received", receivedMsg0);
>     }
>     private void consumerDriver(AtomicBoolean driverShutdownFlag) {
>         try {
>             Xid xid1 = new MyXid(new byte[] {1});
>             Xid xid2 = new MyXid(new byte[] {2});
>             for (int id = 0; !driverShutdownFlag.get(); id++) {
>                 int finalId = id;
>                 Thread thread1 = null;
>                 Thread thread2 = null;
>                 try {
>                     AtomicBoolean shutdownFlag = new AtomicBoolean();
>                     thread1 = new Thread(() -> consumer(xid1, finalId, shutdownFlag));
>                     thread2 = new Thread(() -> consumer(xid2, finalId, shutdownFlag));
>                     thread1.start();
>                     thread2.start();
>                     Thread.sleep(15 + id < 4 ? 0 : ThreadLocalRandom.current().nextInt(100));
>                     shutdownFlag.set(true);
>                 } finally {
>                     if (thread1 != null) {
>                         thread1.join();
>                     }
>                     if (thread2 != null) {
>                         thread2.join();
>                     }
>                 }
>             }
>         } catch (Exception e) {
>             throw new RuntimeException(e);
>         }
>     }
>     private void consumer(Xid xid, int id, AtomicBoolean shutdownFlag) {
>         try (
>                 XAConnection conn = getConnectionFactory().createXAConnection();
>                 XASession sess = conn.createXASession()
>         ) {
>             conn.start();
>             XAResource res = sess.getXAResource();
>             // rollback the xid first
>             try {
>                 res.rollback(xid);
>                 System.out.println("consumer " + id + " rolled back, xid=" + xid);
>             } catch (XAException e) {
>                 if (e.errorCode == XAException.XAER_NOTA) {
>                     System.out.println("consumer " + id + " ignoring rollback, XAER_NOTA, xid=" + xid);
>                 }
>                 else throw e;
>             }
>             res.start(xid, XAResource.TMNOFLAGS);
>             MessageConsumer consumer = sess.createConsumer(sess.createQueue("queue"));
>             for (int msgCnt = 0; !shutdownFlag.get(); ) {
>                 TextMessage msg = (TextMessage) consumer.receiveNoWait();
>                 if (msg == null) {
>                     continue;
>                 }
>                 msgCnt++;
>                 if (msg.getText().equals("msg-0")) {
>                     receivedMsg0 = true;
>                 }
>                 System.out.println("consumer " + id + " received: " + msg.getText() + ", xid=" + xid);
>                 if (msgCnt % 10 == 9) {
>                     res.end(xid, XAResource.TMSUCCESS);
>                     res.prepare(xid);
>                     System.out.println("consumer " + id + " going to commit..., xid=" + xid);
>                     res.commit(xid, false);
>                     System.out.println("consumer " + id + " committed, xid=" + xid);
>                     res.start(xid, XAResource.TMNOFLAGS);
>                 }
>             }
>             System.out.println("consumer " + id + " shut down, xid=" + xid);
>         } catch (XAException e) {
>             throw new RuntimeException("XAException, errorCode=" + e.errorCode, e);
>         } catch (JMSException e) {
>             throw new RuntimeException(e);
>         }
>     }
>     private void producer() {
>         try (
>                 Connection conn = ((ConnectionFactory) getConnectionFactory()).createConnection();
>                 Session session = conn.createSession(false, AUTO_ACKNOWLEDGE);
>                 MessageProducer producer = session.createProducer(session.createQueue("queue"))
>         ) {
>             long startTime = System.nanoTime();
>             for (int i = 0; i < 1_000; i++) {
>                 producer.send(session.createTextMessage("msg-" + i));
>                 Thread.sleep(Math.max(0, i - NANOSECONDS.toMillis(System.nanoTime() - startTime)));
>             }
>         } catch (Exception e) {
>             throw new RuntimeException(e);
>         }
>     }
>     private static XAConnectionFactory getConnectionFactory() {
>         return new ActiveMQXAConnectionFactory(resource.getVmURL());
>     }
>     private static final class MyXid implements Xid {
>         private final byte[] gtrid;
>         private MyXid(byte[] gtrid) {
>             this.gtrid = gtrid;
>         }
>         @Override
>         public int getFormatId() {
>             return 9999;
>         }
>         @Override
>         public byte[] getGlobalTransactionId() {
>             return gtrid;
>         }
>         @Override
>         public byte[] getBranchQualifier() {
>             return new byte[0];
>         }
>         @Override
>         public String toString() {
>             return Arrays.toString(gtrid);
>         }
>     }
> }
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)