You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/05/06 20:26:31 UTC

[GitHub] [activemq-artemis] jbertram opened a new pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

jbertram opened a new pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram closed pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram closed pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r630185992



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +597,82 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         int received = 0;
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            results.get(tm.getStringProperty("lastval")).add(tm.getText());
+            tm.acknowledge();
+         }
+      }
+      for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+         StringBuilder message = new StringBuilder();
+         message.append("Messages received with lastval=" + entry.getKey() + " (");
+         for (String s : entry.getValue()) {
+            int occurrences = Collections.frequency(entry.getValue(), s);
+            if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+               dups.put(s, occurrences);
+            }
+            message.append(s + ",");
+         }
+         logger.info(message + ")");
+
+      }
+      if (dups.size() > 0) {
+         StringBuffer sb = new StringBuffer();
+         for (Map.Entry<String, Integer> stringIntegerEntry : dups.entrySet()) {
+            sb.append(stringIntegerEntry.getKey() + "(" + stringIntegerEntry.getValue() + "),");
+         }
+         Assert.fail("Duplicate messages received " + sb);
+      }
+

Review comment:
       I added the following code here:
   
   ```java
         HashMap<Integer, AtomicInteger> map = countJournal(server.getConfiguration());
   
         map.forEach((a, b) -> System.out.println("record type " + a + ", counts=" + b));
   
   ```
   
   That would be all you need to check on the journal...
   
   
   Right now the output here shows exactly what I predicted:
   
   
   ```
   record type 32, counts=100
   record type 33, counts=97
   record type 34, counts=3
   record type 45, counts=100
   ```
   
   You will need some assertion on those records. You need to ack the previous message that was already delivered.
   
   
   If you can do that on handle(MessageReference), make it specialized on LastValueQueue would be better than the method you added on the consumer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r629600003



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +594,82 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         int received = 0;
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            System.out.println("Received: " + tm.getStringProperty("data") + "; total: " + ++received);
+            results.get(tm.getStringProperty("lastval")).add(tm.getText());
+            tm.acknowledge();
+         }
+      }
+      for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+         System.out.print("Messages received with lastval=" + entry.getKey() + " (");
+         for (String s : entry.getValue()) {
+            int occurrences = Collections.frequency(entry.getValue(), s);
+            if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+               dups.put(s, occurrences);
+            }
+            System.out.print(s + ",");

Review comment:
       Can you use log.info




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r630167338



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
##########
@@ -497,4 +497,8 @@ default void errorProcessing(Consumer consumer, Throwable t, MessageReference me
    default QueueConfiguration getQueueConfiguration() {
       return null;
    }
+
+   default boolean canBeDelivered(MessageReference ref) {

Review comment:
       This is also breaking the contract on the Queue...
   
   
   You have Queue::deliver already checking the message on handle(ref, consumer);
   
   Whatever happens here needs to be treated inside handle.
   
   Perhaps having a specialized version of handle would be a better choice.
   
   
   having this canBeDelivered happened at the proceedDeliver can create a lot of other issues... like I'm predicting with leaving references and message on the journal and paging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r627755378



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +594,82 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);

Review comment:
       Can you instead use CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616")
   
   or 
   
   CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616")
   
   
   We should probably remove the Connection supplier. It's a bit simpler to read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r629599326



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +594,82 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         int received = 0;
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            System.out.println("Received: " + tm.getStringProperty("data") + "; total: " + ++received);

Review comment:
       logger.debug please
   
   
   I have cleaned up the test suite recently to not use System.out.prinltn




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r629601039



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +594,82 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);

Review comment:
       I would rather deprecate it. .and move to CFUtil everywhere.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r629596410



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +594,82 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);

Review comment:
       I used the `ConnectionSupplier` because that's what all the other tests in the class use and I find that consistency helps readability. I wouldn't necessarily mind removing the `ConnectionSupplier` altogether, but it's used in around 80 other places so that change is beyond the scope of this PR. All things being equal I think the `ConnectionSupplier` is at least as readable as the alternative you have proposed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#issuecomment-838946445


   there are also a few failures added on this PR:
   
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerCoreConsumer[persistenceEnabled=false, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerAMQPConsumer[persistenceEnabled=false, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstAMQP[persistenceEnabled=false, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstCore[persistenceEnabled=false, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerAMQPConsumer[persistenceEnabled=false, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerCoreConsumer[persistenceEnabled=false, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerCoreConsumer[persistenceEnabled=true, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerAMQPConsumer[persistenceEnabled=true, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstAMQP[persistenceEnabled=true, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstCore[persistenceEnabled=true, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerAMQPConsumer[persistenceEnabled=true, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerCoreConsumer[persistenceEnabled=true, scanPeriod=100]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerCoreConsumer[persistenceEnabled=true, scanPeriod=-1]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerAMQPConsumer[persistenceEnabled=true, scanPeriod=-1]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstAMQP[persistenceEnabled=true, scanPeriod=-1]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstCore[persistenceEnabled=true, scanPeriod=-1]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerAMQPConsumer[persistenceEnabled=true, scanPeriod=-1]
   org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerCoreConsumer[persistenceEnabled=true, scanPeriod=-1]
   org.apache.activemq.artemis.tests.integration.cli.RecoverTest.org.apache.activemq.artemis.tests.integration.cli.RecoverTest
   org.apache.activemq.artemis.tests.integration.cluster.failover.ReplicatedMultipleServerFailoverExtraBackupsTest.testStartLiveFirst
   org.apache.activemq.artemis.tests.integration.jms.client.LVQTest.testLVQandNonDestructive
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r630162206



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
##########
@@ -1166,17 +1166,19 @@ private void resumeLargeMessage() {
     * @param message
     */
    private void deliverStandardMessage(final MessageReference ref, Message message) throws ActiveMQException {
-      applyPrefixForLegacyConsumer(message);
-      int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
+      if (messageQueue.canBeDelivered(ref)) {
+         applyPrefixForLegacyConsumer(message);
+         int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
 
-      if (availableCredits != null) {
-         availableCredits.addAndGet(-packetSize);
+         if (availableCredits != null) {
+            availableCredits.addAndGet(-packetSize);
 
-         if (logger.isTraceEnabled()) {
-            logger.trace(this + "::FlowControl::delivery standard taking " +
-                            packetSize +
-                            " from credits, available now is " +
-                            availableCredits);
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + "::FlowControl::delivery standard taking " +
+                               packetSize +
+                               " from credits, available now is " +
+                               availableCredits);
+            }
          }

Review comment:
       You need to at least test what would happen on journal... try the issue out, and look at ./artemis print data if there are duplicates on the journal. I believe you would hit worsened issues here. 
   
   you need better tests where you would look at the result on the journal I am afraid.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
michaelandrepearce commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r630442808



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -350,25 +342,33 @@ public SimpleString getLastValueKey() {
       return Collections.unmodifiableSet(map.keySet());
    }
 
+   @Override
+   public boolean canBeDelivered(MessageReference ref) {
+      if (isNonDestructive() && ref instanceof HolderReference) {
+         return !((HolderReference)ref).isDelivered();
+      } else {
+         return true;
+      }
+   }
+
    private static class HolderReference implements MessageReference {
 
       private final SimpleString prop;
 
-      private volatile boolean delivered = false;
+      private volatile AtomicBoolean delivered = new AtomicBoolean(false);

Review comment:
       Theres a way to do something, ill try dig it out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r629608522



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +594,82 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         int received = 0;
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            System.out.println("Received: " + tm.getStringProperty("data") + "; total: " + ++received);

Review comment:
       Done.

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +594,82 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         int received = 0;
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            System.out.println("Received: " + tm.getStringProperty("data") + "; total: " + ++received);
+            results.get(tm.getStringProperty("lastval")).add(tm.getText());
+            tm.acknowledge();
+         }
+      }
+      for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+         System.out.print("Messages received with lastval=" + entry.getKey() + " (");
+         for (String s : entry.getValue()) {
+            int occurrences = Collections.frequency(entry.getValue(), s);
+            if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+               dups.put(s, occurrences);
+            }
+            System.out.print(s + ",");

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r630160108



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -350,25 +342,33 @@ public SimpleString getLastValueKey() {
       return Collections.unmodifiableSet(map.keySet());
    }
 
+   @Override
+   public boolean canBeDelivered(MessageReference ref) {
+      if (isNonDestructive() && ref instanceof HolderReference) {
+         return !((HolderReference)ref).isDelivered();
+      } else {
+         return true;
+      }
+   }
+
    private static class HolderReference implements MessageReference {
 
       private final SimpleString prop;
 
-      private volatile boolean delivered = false;
+      private volatile AtomicBoolean delivered = new AtomicBoolean(false);

Review comment:
       Please use the AtomicLongFieldUpdater instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r630436218



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -350,25 +342,33 @@ public SimpleString getLastValueKey() {
       return Collections.unmodifiableSet(map.keySet());
    }
 
+   @Override
+   public boolean canBeDelivered(MessageReference ref) {
+      if (isNonDestructive() && ref instanceof HolderReference) {
+         return !((HolderReference)ref).isDelivered();
+      } else {
+         return true;
+      }
+   }
+
    private static class HolderReference implements MessageReference {
 
       private final SimpleString prop;
 
-      private volatile boolean delivered = false;
+      private volatile AtomicBoolean delivered = new AtomicBoolean(false);

Review comment:
       As far as I can tell you can't use `AtomicLongFieldUpdater` on a `boolean`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#issuecomment-839008476


   I'll go back to the drawing board and rework this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3569: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3569:
URL: https://github.com/apache/activemq-artemis/pull/3569#discussion_r630161241



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
##########
@@ -1166,17 +1166,19 @@ private void resumeLargeMessage() {
     * @param message
     */
    private void deliverStandardMessage(final MessageReference ref, Message message) throws ActiveMQException {
-      applyPrefixForLegacyConsumer(message);
-      int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
+      if (messageQueue.canBeDelivered(ref)) {
+         applyPrefixForLegacyConsumer(message);
+         int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
 
-      if (availableCredits != null) {
-         availableCredits.addAndGet(-packetSize);
+         if (availableCredits != null) {
+            availableCredits.addAndGet(-packetSize);
 
-         if (logger.isTraceEnabled()) {
-            logger.trace(this + "::FlowControl::delivery standard taking " +
-                            packetSize +
-                            " from credits, available now is " +
-                            availableCredits);
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + "::FlowControl::delivery standard taking " +
+                               packetSize +
+                               " from credits, available now is " +
+                               availableCredits);
+            }
          }

Review comment:
       You don't need to ack the message if not?
   
   
   This will create a big leakage on the journal

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
##########
@@ -1166,17 +1166,19 @@ private void resumeLargeMessage() {
     * @param message
     */
    private void deliverStandardMessage(final MessageReference ref, Message message) throws ActiveMQException {
-      applyPrefixForLegacyConsumer(message);
-      int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
+      if (messageQueue.canBeDelivered(ref)) {
+         applyPrefixForLegacyConsumer(message);
+         int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
 
-      if (availableCredits != null) {
-         availableCredits.addAndGet(-packetSize);
+         if (availableCredits != null) {
+            availableCredits.addAndGet(-packetSize);
 
-         if (logger.isTraceEnabled()) {
-            logger.trace(this + "::FlowControl::delivery standard taking " +
-                            packetSize +
-                            " from credits, available now is " +
-                            availableCredits);
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + "::FlowControl::delivery standard taking " +
+                               packetSize +
+                               " from credits, available now is " +
+                               availableCredits);
+            }
          }

Review comment:
       .. and paging




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org