You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/05/17 13:44:13 UTC

[GitHub] [activemq-artemis] jbertram opened a new pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

jbertram opened a new pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r633554319



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -259,6 +251,8 @@ public QueueConfiguration getQueueConfiguration() {
    private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
       MessageReference oldRef = hr.getReference();
 
+//      logger.info("Replacing " + oldRef + " with " + ref);

Review comment:
       fixed

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
##########
@@ -133,7 +133,7 @@
    protected static final int CRITICAL_CONSUMER = 3;
    protected static final int CRITICAL_CHECK_DEPAGE = 4;
 
-   private static final Logger logger = Logger.getLogger(QueueImpl.class);
+   protected static final Logger logger = Logger.getLogger(QueueImpl.class);

Review comment:
       fixed

##########
File path: tests/config/logging.properties
##########
@@ -68,4 +68,5 @@ handler.TEST.formatter=PATTERN
 # Formatter pattern configuration
 formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
 formatter.PATTERN.properties=pattern
-formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
+#formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
+formatter.PATTERN.pattern=%d{HH:mm:ss,SSS} %-5p %s%E%n

Review comment:
       No. I just fixed it.

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +599,87 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            results.get(tm.getStringProperty("lastval")).add(tm.getText());
+            tm.acknowledge();
+         }
+      }
+      for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+         StringBuilder message = new StringBuilder();
+         message.append("Messages received with lastval=" + entry.getKey() + " (");
+         for (String s : entry.getValue()) {
+            int occurrences = Collections.frequency(entry.getValue(), s);
+            if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+               dups.put(s, occurrences);
+            }
+            message.append(s + ",");
+         }
+         logger.info(message + ")");
+
+      }
+      if (dups.size() > 0) {
+         StringBuffer sb = new StringBuffer();
+         for (Map.Entry<String, Integer> stringIntegerEntry : dups.entrySet()) {
+            sb.append(stringIntegerEntry.getKey() + "(" + stringIntegerEntry.getValue() + "),");
+         }
+         Assert.fail("Duplicate messages received " + sb);
+      }
+
+      if (persistenceEnabled) {
+         Wait.assertTrue(() -> {

Review comment:
       The test class is using `Parameterized` and if persistence is disabled then `countJournal` throws a bunch of exceptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r633618636



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +599,87 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            results.get(tm.getStringProperty("lastval")).add(tm.getText());
+            tm.acknowledge();
+         }
+      }
+      for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+         StringBuilder message = new StringBuilder();
+         message.append("Messages received with lastval=" + entry.getKey() + " (");
+         for (String s : entry.getValue()) {
+            int occurrences = Collections.frequency(entry.getValue(), s);
+            if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+               dups.put(s, occurrences);
+            }
+            message.append(s + ",");
+         }
+         logger.info(message + ")");
+
+      }
+      if (dups.size() > 0) {
+         StringBuffer sb = new StringBuffer();
+         for (Map.Entry<String, Integer> stringIntegerEntry : dups.entrySet()) {
+            sb.append(stringIntegerEntry.getKey() + "(" + stringIntegerEntry.getValue() + "),");
+         }
+         Assert.fail("Duplicate messages received " + sb);
+      }
+
+      if (persistenceEnabled) {
+         Wait.assertTrue(() -> {

Review comment:
       The test class is using `Parameterized` and if persistence is disabled then `countJournal` throws a bunch of exceptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r638234272



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -56,9 +58,27 @@
 @SuppressWarnings("ALL")
 public class LastValueQueue extends QueueImpl {
 
+   private static final Logger logger = Logger.getLogger(LastValueQueue.class);
    private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
    private final SimpleString lastValueKey;
 
+   // only use this within synchronized methods or synchronized(this) blocks
+   protected volatile LinkedList<MessageReference> nextDeliveries = new LinkedList<>();

Review comment:
       Should be `final` now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#issuecomment-847320962


   nice job on this one... merging it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
michaelandrepearce commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r633548577



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -259,6 +251,8 @@ public QueueConfiguration getQueueConfiguration() {
    private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
       MessageReference oldRef = hr.getReference();
 
+//      logger.info("Replacing " + oldRef + " with " + ref);

Review comment:
       Commented code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r633612532



##########
File path: tests/config/logging.properties
##########
@@ -68,4 +68,5 @@ handler.TEST.formatter=PATTERN
 # Formatter pattern configuration
 formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
 formatter.PATTERN.properties=pattern
-formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
+#formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
+formatter.PATTERN.pattern=%d{HH:mm:ss,SSS} %-5p %s%E%n

Review comment:
       @jbertram was this change intentional?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r638236618



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -151,25 +171,20 @@ public synchronized void addTail(final MessageReference ref, final boolean direc
          HolderReference hr = map.get(prop);
 
          if (hr != null) {
-            // We need to overwrite the old ref with the new one and ack the old one
-
-            replaceLVQMessage(ref, hr);
-
-            if (isNonDestructive() && hr.isDelivered()) {
-               hr.resetDelivered();
-               // --------------------------------------------------------------------------------
-               // If non Destructive, and if a reference was previously delivered
-               // we would not be able to receive this message again
-               // unless we reset the iterators
-               // The message is not removed, so we can't actually remove it
-               // a result of this operation is that previously delivered messages
-               // will probably be delivered again.
-               // if we ever want to avoid other redeliveries we would have to implement a reset or redeliver
-               // operation on the iterator for a single message
-               resetAllIterators();
-               deliverAsync();
-            }
+            if (isNonDestructive() && hr.isInDelivery()) {
+               // if the ref is already being delivered we'll do the replace in the postAcknowledge
+               hr.setReplacementRef(ref);
+            } else {
+               // We need to overwrite the old ref with the new one and ack the old one
+               replaceLVQMessage(ref, hr);
 
+               if (isNonDestructive() && hr.isDelivered()) {
+                  hr.resetDelivered();
+                  // since we're replacing a ref that was already delivered we want to trigger a "special" delivery for this new replacement

Review comment:
       I fixed "special" in the comment.
   
   I didn't change `repeatNextDelivery`. I used "repeat" in the name because it serves the same purpose as `repeat` on the `LinkedListIterator` in `QueueImpl`. I wanted that to be clear so its easy to see that if the consumer is "busy" we just repeat the delivery no matter what.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r633554319



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -259,6 +251,8 @@ public QueueConfiguration getQueueConfiguration() {
    private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
       MessageReference oldRef = hr.getReference();
 
+//      logger.info("Replacing " + oldRef + " with " + ref);

Review comment:
       fixed

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
##########
@@ -133,7 +133,7 @@
    protected static final int CRITICAL_CONSUMER = 3;
    protected static final int CRITICAL_CHECK_DEPAGE = 4;
 
-   private static final Logger logger = Logger.getLogger(QueueImpl.class);
+   protected static final Logger logger = Logger.getLogger(QueueImpl.class);

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r638234004



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
##########
@@ -928,7 +927,7 @@ public Queue getQueue() {
 
             acks++;
          }
-         while (ref.getMessageID() != messageID);
+         while (ref != null && ref.getMessageID() != messageID);

Review comment:
       This was also not on purpose.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
michaelandrepearce commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r633548577



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -259,6 +251,8 @@ public QueueConfiguration getQueueConfiguration() {
    private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
       MessageReference oldRef = hr.getReference();
 
+//      logger.info("Replacing " + oldRef + " with " + ref);

Review comment:
       Commented code

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
##########
@@ -133,7 +133,7 @@
    protected static final int CRITICAL_CONSUMER = 3;
    protected static final int CRITICAL_CHECK_DEPAGE = 4;
 
-   private static final Logger logger = Logger.getLogger(QueueImpl.class);
+   protected static final Logger logger = Logger.getLogger(QueueImpl.class);

Review comment:
       Should keep private and if a subclass wants to log then it makes its own logger , so its logged against the right class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r633610097



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +599,87 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            results.get(tm.getStringProperty("lastval")).add(tm.getText());
+            tm.acknowledge();
+         }
+      }
+      for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+         StringBuilder message = new StringBuilder();
+         message.append("Messages received with lastval=" + entry.getKey() + " (");
+         for (String s : entry.getValue()) {
+            int occurrences = Collections.frequency(entry.getValue(), s);
+            if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+               dups.put(s, occurrences);
+            }
+            message.append(s + ",");
+         }
+         logger.info(message + ")");
+
+      }
+      if (dups.size() > 0) {
+         StringBuffer sb = new StringBuffer();
+         for (Map.Entry<String, Integer> stringIntegerEntry : dups.entrySet()) {
+            sb.append(stringIntegerEntry.getKey() + "(" + stringIntegerEntry.getValue() + "),");
+         }
+         Assert.fail("Duplicate messages received " + sb);
+      }
+
+      if (persistenceEnabled) {
+         Wait.assertTrue(() -> {

Review comment:
       why the check on persistenceEnabled? Is the test running more than once?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r633610097



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +599,87 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            results.get(tm.getStringProperty("lastval")).add(tm.getText());
+            tm.acknowledge();
+         }
+      }
+      for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+         StringBuilder message = new StringBuilder();
+         message.append("Messages received with lastval=" + entry.getKey() + " (");
+         for (String s : entry.getValue()) {
+            int occurrences = Collections.frequency(entry.getValue(), s);
+            if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+               dups.put(s, occurrences);
+            }
+            message.append(s + ",");
+         }
+         logger.info(message + ")");
+
+      }
+      if (dups.size() > 0) {
+         StringBuffer sb = new StringBuffer();
+         for (Map.Entry<String, Integer> stringIntegerEntry : dups.entrySet()) {
+            sb.append(stringIntegerEntry.getKey() + "(" + stringIntegerEntry.getValue() + "),");
+         }
+         Assert.fail("Duplicate messages received " + sb);
+      }
+
+      if (persistenceEnabled) {
+         Wait.assertTrue(() -> {

Review comment:
       why the check on persistenceEnabled? Is the test running more than once?

##########
File path: tests/config/logging.properties
##########
@@ -68,4 +68,5 @@ handler.TEST.formatter=PATTERN
 # Formatter pattern configuration
 formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
 formatter.PATTERN.properties=pattern
-formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
+#formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
+formatter.PATTERN.pattern=%d{HH:mm:ss,SSS} %-5p %s%E%n

Review comment:
       @jbertram was this change intentional?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r633616905



##########
File path: tests/config/logging.properties
##########
@@ -68,4 +68,5 @@ handler.TEST.formatter=PATTERN
 # Formatter pattern configuration
 formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
 formatter.PATTERN.properties=pattern
-formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
+#formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
+formatter.PATTERN.pattern=%d{HH:mm:ss,SSS} %-5p %s%E%n

Review comment:
       No. I just fixed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] jbertram commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
jbertram commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r638233886



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
##########
@@ -918,7 +917,7 @@ public Queue getQueue() {
             }
 
             if (ref == null) {
-               ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
+               ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);

Review comment:
       This was not on purpose.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
michaelandrepearce commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r633549314



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
##########
@@ -133,7 +133,7 @@
    protected static final int CRITICAL_CONSUMER = 3;
    protected static final int CRITICAL_CHECK_DEPAGE = 4;
 
-   private static final Logger logger = Logger.getLogger(QueueImpl.class);
+   protected static final Logger logger = Logger.getLogger(QueueImpl.class);

Review comment:
       Should keep private and if a subclass wants to log then it makes its own logger , so its logged against the right class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic merged pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic merged pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3583: ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on a change in pull request #3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r638231866



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -56,9 +58,27 @@
 @SuppressWarnings("ALL")
 public class LastValueQueue extends QueueImpl {
 
+   private static final Logger logger = Logger.getLogger(LastValueQueue.class);
    private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
    private final SimpleString lastValueKey;
 
+   // only use this within synchronized methods or synchronized(this) blocks
+   protected volatile LinkedList<MessageReference> nextDeliveries = new LinkedList<>();

Review comment:
       I almost forgot.. replace volatile by final here please?
   
   it used to be an element on your logic. .but as it's a collection now it needs to be a final linkedList (at least on the way it is implemented now.

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -56,9 +58,27 @@
 @SuppressWarnings("ALL")
 public class LastValueQueue extends QueueImpl {
 
+   private static final Logger logger = Logger.getLogger(LastValueQueue.class);
    private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
    private final SimpleString lastValueKey;
 
+   // only use this within synchronized methods or synchronized(this) blocks
+   protected volatile LinkedList<MessageReference> nextDeliveries = new LinkedList<>();

Review comment:
       nice! that's what I thought you were going to need!

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
##########
@@ -928,7 +927,7 @@ public Queue getQueue() {
 
             acks++;
          }
-         while (ref.getMessageID() != messageID);
+         while (ref != null && ref.getMessageID() != messageID);

Review comment:
       was this on purpose? it seems a left over from a debug, since it was never supposed to hit null here (the throw exception is done when ref==null)

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
##########
@@ -918,7 +917,7 @@ public Queue getQueue() {
             }
 
             if (ref == null) {
-               ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
+               ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);

Review comment:
       Was this on purpose? The former seemed more complete to me.

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -151,25 +171,20 @@ public synchronized void addTail(final MessageReference ref, final boolean direc
          HolderReference hr = map.get(prop);
 
          if (hr != null) {
-            // We need to overwrite the old ref with the new one and ack the old one
-
-            replaceLVQMessage(ref, hr);
-
-            if (isNonDestructive() && hr.isDelivered()) {
-               hr.resetDelivered();
-               // --------------------------------------------------------------------------------
-               // If non Destructive, and if a reference was previously delivered
-               // we would not be able to receive this message again
-               // unless we reset the iterators
-               // The message is not removed, so we can't actually remove it
-               // a result of this operation is that previously delivered messages
-               // will probably be delivered again.
-               // if we ever want to avoid other redeliveries we would have to implement a reset or redeliver
-               // operation on the iterator for a single message
-               resetAllIterators();
-               deliverAsync();
-            }
+            if (isNonDestructive() && hr.isInDelivery()) {
+               // if the ref is already being delivered we'll do the replace in the postAcknowledge
+               hr.setReplacementRef(ref);
+            } else {
+               // We need to overwrite the old ref with the new one and ack the old one
+               replaceLVQMessage(ref, hr);
 
+               if (isNonDestructive() && hr.isDelivered()) {
+                  hr.resetDelivered();
+                  // since we're replacing a ref that was already delivered we want to trigger a "special" delivery for this new replacement

Review comment:
       I - stale comment... the "special" word...
   
   ii - just an idea... what about rename repeatNextDelivery(messageReference) as nextDelivery(messageReference)
   
   and call it here instead of using the collection directly? 
   
   just a suggestion!
   

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +599,87 @@ private void sendLVQTombstone(ConnectionSupplier producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            results.get(tm.getStringProperty("lastval")).add(tm.getText());
+            tm.acknowledge();
+         }
+      }
+      for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+         StringBuilder message = new StringBuilder();
+         message.append("Messages received with lastval=" + entry.getKey() + " (");
+         for (String s : entry.getValue()) {
+            int occurrences = Collections.frequency(entry.getValue(), s);
+            if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+               dups.put(s, occurrences);
+            }
+            message.append(s + ",");
+         }
+         logger.info(message + ")");
+
+      }
+      if (dups.size() > 0) {
+         StringBuffer sb = new StringBuffer();
+         for (Map.Entry<String, Integer> stringIntegerEntry : dups.entrySet()) {
+            sb.append(stringIntegerEntry.getKey() + "(" + stringIntegerEntry.getValue() + "),");
+         }
+         Assert.fail("Duplicate messages received " + sb);
+      }
+
+      if (persistenceEnabled) {
+         Wait.assertTrue(() -> {

Review comment:
       Ohh.. I see. I didn't know it was running non persistently. thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org