You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2022/06/20 20:36:07 UTC

[GitHub] [activemq] NikitaShupletsov opened a new pull request, #848: Feature/replica broker

NikitaShupletsov opened a new pull request, #848:
URL: https://github.com/apache/activemq/pull/848

   AMQ-8354
   
   Asynchronous replication plugin


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1377611153

   >I haven't gone through much of this yet but the number 1 thing is anything here needs to be isolated and as Matt pointed out I have some concerns with internal changes. This needs to not impact anything in the broker if it's not running so any changes that would impact performance or could introduce other issues/bugs would be a problem. Essentially if it's disabled (which it will be by default) then the broker shouldn't behave any differently than today.
   
   i totally agree. We changed the internal classes just to add a couple methods(and modified one that hasn't been used since pure master-slave was deleted). Hence nothing should be impacted by the changes when the plugin is turned off.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1065082722


##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRole.java:
##########
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+public enum ReplicaRole {
+    source, replica, dual

Review Comment:
   yes. it's used here: https://github.com/apache/activemq/pull/848/files#diff-7aeef5b9b38e464d9a560c33dff459f2fe95f8cb5cc0ab3d45424e0a50092fc8R56
   
   plus we are working on real roles now, so it will be used more in the nearest future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1065068052


##########
activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java:
##########
@@ -2347,6 +2436,25 @@ public void processDispatchNotification(MessageDispatchNotification messageDispa
         Subscription sub = getMatchingSubscription(messageDispatchNotification);
         if (sub != null) {
             MessageReference message = getMatchingMessage(messageDispatchNotification);
+
+            pagedInMessagesLock.writeLock().lock();

Review Comment:
   it's method isn't used right now. it was introduced for pure master-slave feature that was deleted a long time ago. I noticed that the method wasn't working correctly, so I fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1371544323

   > l
   
   Sorry for the late reply.
   We've come up with the idea of so called "message compaction". we scan the replication queue and delete events that cancel each other out(like send and ack)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] lucastetreault commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
lucastetreault commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1065123856


##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaAckHelper.java:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ReplicaAckHelper {
+
+    private final Broker broker;
+
+    public ReplicaAckHelper(Broker broker) {
+        this.broker = broker;
+    }
+
+    public List<MessageReference> getMessagesToAck(MessageAck ack, Destination destination) {
+        PrefetchSubscription prefetchSubscription = getPrefetchSubscription(destination, ack.getConsumerId());
+        if (prefetchSubscription == null) {
+            return null;

Review Comment:
   [Tired of Null Pointer Exceptions? Consider Using Java SE 8's "Optional"!](https://www.oracle.com/technical-resources/articles/java/java8-optional.html)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1377786693

   due to the concerns regarding the internal changes, I extracted them to a separate PR: https://github.com/apache/activemq/pull/953 so that it's easier to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1376313000

   >A bit missing I'm struggling with how the acks back from a mirror is working reliably here where clients can be connect to either. I'm assuming you're trying to mimick Artemis here.
   
   the replica is not meant to be used at the same time with primary. we are working on the changes to block connections on the relica side. the feature is designed as a hot-warm pair not a hot-hot one.
   
   >The other bit assuming jms2 is still a target for the classic activemq 5.x stream, is how this fits into the design of support shared subs in that roadmap. E.g. ensuring this doesn't trip that initiative / effort at all.
   the plugin will need to be updated to support new features. 
   right now I don't know how much work is needed. but when/if this change is merged, we are planning to support it, so if it starts blocking some feature releases, we will try to work through


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] cshannon commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
cshannon commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1377150392

   I haven't gone through much of this yet but the number 1 thing is anything here needs to be isolated and as Matt pointed out I have some concerns with internal changes. This needs to not impact anything in the broker if it's not running so any changes that would impact performance or could introduce other issues/bugs would be a problem. Essentially if it's disabled (which it will be by default) then the broker shouldn't behave any differently than today. 
   
   I also wonder whether or not this should be part of the core broker. At this point I'm not really convinced this should be included and may be better as a stand alone plugin that is maintained separately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] michaelandrepearce commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
michaelandrepearce commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1371647102

   This is a big feature I would have expected some discussion on mailing lists around it and it's design. Maybe I missed it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1080646856


##########
activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaCompactorTest.java:
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ReplicaCompactorTest {
+
+    private final ConnectionContext connectionContext = mock(ConnectionContext.class);
+    private final Broker broker = mock(Broker.class);
+    private final ReplicaReplicationQueueSupplier queueProvider = mock(ReplicaReplicationQueueSupplier.class);
+    private final MessageStore messageStore = mock(MessageStore.class);
+
+    private final ActiveMQQueue intermediateQueueDestination = new ActiveMQQueue(ReplicaSupport.INTERMEDIATE_REPLICATION_QUEUE_NAME);
+    private final Queue intermediateQueue = mock(Queue.class);
+
+    private ReplicaCompactor replicaCompactor;
+
+    @Before
+    public void setUp() throws Exception {
+        ConnectionContext adminConnectionContext = mock(ConnectionContext.class);
+        when(adminConnectionContext.copy()).thenReturn(connectionContext);
+        when(broker.getAdminConnectionContext()).thenReturn(adminConnectionContext);
+
+        when(queueProvider.getIntermediateQueue()).thenReturn(intermediateQueueDestination);
+        when(broker.getDestinations(intermediateQueueDestination)).thenReturn(Set.of(intermediateQueue));
+        when(intermediateQueue.getMessageStore()).thenReturn(messageStore);
+
+        ConsumerInfo consumerInfo = new ConsumerInfo();
+        PrefetchSubscription originalSubscription = mock(PrefetchSubscription.class);
+        when(originalSubscription.getConsumerInfo()).thenReturn(consumerInfo);
+
+        replicaCompactor = new ReplicaCompactor(broker, connectionContext, queueProvider, originalSubscription);
+    }
+
+    @Test
+    public void compactWhenSendAndAck() throws Exception {
+        MessageId messageId1 = new MessageId("1:0:0:1");
+        MessageId messageId2 = new MessageId("1:0:0:2");
+        MessageId messageId3 = new MessageId("1:0:0:3");
+
+        String messageIdToAck = "2:1";
+
+        ActiveMQMessage message1 = new ActiveMQMessage();
+        message1.setMessageId(messageId1);
+        message1.setBooleanProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY, true);
+        message1.setStringProperty(ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_SEND.toString());
+        message1.setStringProperty(ReplicaSupport.MESSAGE_ID_PROPERTY, messageIdToAck);

Review Comment:
   `ReplicaSupport.MESSAGE_ID_PROPERTY` is used for the selector https://github.com/apache/activemq/pull/848/files/ec191dae604c3e51230ac88da8fb383dcd27d168#diff-eb08a6397c7e082f3d1aab53880eeb062d837f235dc870f910819a5b2d63c30aR102



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1377949707

   >I also wonder whether or not this should be part of the core broker. At this point I'm not really convinced this should be included and may be better as a stand alone plugin that is maintained separately.
   
   This feature has broad value and we are seeing demand from lots of ActiveMQ customers.  Similar to Artemis where it is part of the core broker, we see similar needs for ActiveMQ Classic. Having it as a part of the core broker will bring more visibility and support for the plugin for the broader community.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] michaelandrepearce commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
michaelandrepearce commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1371733447

   A bit missing I'm struggling with how the acks back from a mirror is working reliably here where clients can be connect to either. I'm assuming you're trying to mimick Artemis here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] lucastetreault commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
lucastetreault commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1371729518

   @michaelandrepearce this work was started quite some time ago by @ehossack-aws when he was with the Amazon MQ team. He published [this design](https://github.com/ehossack-aws/activemq-replication-design/tree/initial-design) and attempted to get feedback on the dev mailing list but I don't think there was much discussion. 
   
   I haven't seen Nikita's latest changes so I was planning on doing a review and leaving some comments. Is there anything more you think I can do to help get the ball rolling? I know this is an important feature for the Amazon MQ team so it would be great it we can figure out how to get the community behind it! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1068833603


##########
activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java:
##########
@@ -1296,9 +1297,36 @@ public QueueMessageReference getMessage(String id) {
         return null;
     }
 
+    public List<MessageId> getAllMessageIds() throws Exception {

Review Comment:
   addressed there: https://github.com/apache/activemq/pull/953



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1065085416


##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequenceStorage.java:
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaSequenceStorage {
+
+    private final Logger logger = LoggerFactory.getLogger(ReplicaSequenceStorage.class);
+
+    static final String SEQUENCE_NAME_PROPERTY = "SequenceName";
+    private final LongSequenceGenerator eventMessageIdGenerator = new LongSequenceGenerator();
+    private final ProducerId replicationProducerId = new ProducerId();
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaInternalMessageProducer replicaInternalMessageProducer;
+    private final String sequenceName;
+    private final ReplicaReplicationQueueSupplier queueProvider;
+
+    private Queue sequenceQueue;
+    private PrefetchSubscription subscription;
+
+    public ReplicaSequenceStorage(Broker broker, ConnectionContext connectionContext, ReplicaReplicationQueueSupplier queueProvider,
+                                  ReplicaInternalMessageProducer replicaInternalMessageProducer, String sequenceName) {
+        this.broker = requireNonNull(broker);
+        this.connectionContext = connectionContext;
+        this.replicaInternalMessageProducer = replicaInternalMessageProducer;
+        this.sequenceName = requireNonNull(sequenceName);
+        this.queueProvider = queueProvider;
+
+        replicationProducerId.setConnectionId(new IdGenerator().generateId());
+    }
+
+    public String initialize() throws Exception {
+        sequenceQueue = broker.getDestinations(queueProvider.getSequenceQueue()).stream().findFirst()
+            .map(DestinationExtractor::extractQueue).orElseThrow();
+
+        String selector = String.format("%s LIKE '%s'", SEQUENCE_NAME_PROPERTY, sequenceName);
+
+        ConnectionId connectionId = new ConnectionId(new IdGenerator("ReplicationPlugin.ReplicaSequenceStorage").generateId());
+        SessionId sessionId = new SessionId(connectionId, new LongSequenceGenerator().getNextSequenceId());
+        ConsumerId consumerId = new ConsumerId(sessionId, new LongSequenceGenerator().getNextSequenceId());
+        ConsumerInfo consumerInfo = new ConsumerInfo();
+        consumerInfo.setConsumerId(consumerId);
+        consumerInfo.setPrefetchSize(10);
+        consumerInfo.setDestination(queueProvider.getSequenceQueue());
+        consumerInfo.setSelector(selector);
+        subscription = (PrefetchSubscription) broker.addConsumer(connectionContext, consumerInfo);
+
+        List<ActiveMQTextMessage> allMessages = new ArrayList<>();
+        for (MessageId messageId : sequenceQueue.getAllMessageIds()) {
+            ActiveMQTextMessage message = getMessageByMessageId(messageId);
+            if (message.getStringProperty(SEQUENCE_NAME_PROPERTY).equals(sequenceName)) {

Review Comment:
   yes, but it's a different piece of code. here we are making sure that there is only one message for the sequence. and if there are more than one we need to delete all except the latest.
   it's just a safety mechanism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] lucastetreault commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
lucastetreault commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1065124586


##########
activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java:
##########
@@ -2347,6 +2436,25 @@ public void processDispatchNotification(MessageDispatchNotification messageDispa
         Subscription sub = getMatchingSubscription(messageDispatchNotification);
         if (sub != null) {
             MessageReference message = getMatchingMessage(messageDispatchNotification);
+
+            pagedInMessagesLock.writeLock().lock();

Review Comment:
   Oh - maybe a separate PR to clean up the unused code instead of fixing it? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] mattrpav commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
mattrpav commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1371770835

   Are there testing numbers are available to review? I recall a previous convo (maybe mailing list?) around that, but don't see anything in this thread or the JIRA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1371655511

   > This is a big feature I would have expected some discussion on mailing lists around it and it's design. Maybe I missed it?
   
   I tried to initiate a discussion a couple times, but it didn't get too far. the thread: https://lists.apache.org/thread/kzt173o56qdok1m0hghgo0564812nyzd


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1065071927


##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaAckHelper.java:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ReplicaAckHelper {
+
+    private final Broker broker;
+
+    public ReplicaAckHelper(Broker broker) {
+        this.broker = broker;
+    }
+
+    public List<MessageReference> getMessagesToAck(MessageAck ack, Destination destination) {
+        PrefetchSubscription prefetchSubscription = getPrefetchSubscription(destination, ack.getConsumerId());
+        if (prefetchSubscription == null) {
+            return null;

Review Comment:
   Okay. I am not sure `Optional<List<MessageReference>>` will look better, but I can do it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1065166803


##########
activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java:
##########
@@ -2347,6 +2436,25 @@ public void processDispatchNotification(MessageDispatchNotification messageDispa
         Subscription sub = getMatchingSubscription(messageDispatchNotification);
         if (sub != null) {
             MessageReference message = getMatchingMessage(messageDispatchNotification);
+
+            pagedInMessagesLock.writeLock().lock();

Review Comment:
   we started using it in the plugin. by `it's method isn't used right now` I meant it's not used in the main branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] mattrpav commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
mattrpav commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1373698509

   On first pass, I have concerns that an approach needs this much internal changes to things like Queue.java and Topic.java. There may very well be needs to change internal APIs, but I think it would be best to separate those changes out as broker API/SPI change set in order to support different scenarios.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] mattrpav commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
mattrpav commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1063465789


##########
activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java:
##########
@@ -1296,9 +1297,36 @@ public QueueMessageReference getMessage(String id) {
         return null;
     }
 
+    public List<MessageId> getAllMessageIds() throws Exception {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] jbonofre commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
jbonofre commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1373180990

   From a community standpoint, I don't see any issue with this PR:
   - we started to work on design with Etienne while ago, we shared a proposal on the mailing list
   - a document has been shared on the mailing list too with details
   - as this is a new feature/addition, with limited impacts on the existing behavior, I don't see any blocker to move forward.
   
   Definitely, I'm supporter of this (that's why I already worked with the guys on design, and I will do a review pass): it makes sense to have this in addition of Spring 6, Jakarta, JMS 2, bookkeeper,  ... new features we are planning for ActiveMQ.
   
   I will do a new pass on this one then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] trex-amazon commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
trex-amazon commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1290041162

   What will be the behavior when the replication queue is full and the replica broker is unreachable? Have you considered this case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1065079995


##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+    private static final Logger logger = LoggerFactory.getLogger(ReplicaCompactor.class);
+    private static final String CONSUMER_SELECTOR = String.format("%s LIKE '%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+    public static final int MAXIMUM_MESSAGES = 1_000;
+
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaReplicationQueueSupplier queueProvider;
+    private final PrefetchSubscription subscription;
+
+    private final Queue intermediateQueue;
+
+    public ReplicaCompactor(Broker broker, ConnectionContext connectionContext, ReplicaReplicationQueueSupplier queueProvider, PrefetchSubscription subscription) {
+        this.broker = broker;
+        this.connectionContext = connectionContext;
+        this.queueProvider = queueProvider;
+        this.subscription = subscription;
+
+        intermediateQueue = broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+                .map(DestinationExtractor::extractQueue).orElseThrow();
+    }
+
+    List<MessageReference> compactAndFilter(List<MessageReference> list, boolean withAdditionalMessages) throws Exception {
+        List<DeliveredMessageReference> toProcess = list.stream()
+                .map(DeliveredMessageReference::new)
+                .collect(Collectors.toList());
+
+        int prefetchSize = subscription.getPrefetchSize();
+        try {
+            if (withAdditionalMessages) {
+                subscription.setPrefetchSize(0);
+                toProcess.addAll(getAdditionalMessages());
+            }
+
+            List<DeliveredMessageReference> processed = compactAndFilter0(toProcess);
+
+            Set<MessageId> messageIds = list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+            return processed.stream()
+                    .map(dmr -> dmr.messageReference)
+                    .filter(mr -> messageIds.contains(mr.getMessageId()))
+                    .collect(Collectors.toList());
+        } finally {
+            subscription.setPrefetchSize(prefetchSize);
+        }
+    }
+
+    private List<DeliveredMessageReference> getAdditionalMessages() throws Exception {
+        List<DeliveredMessageReference> result = new ArrayList<>();
+        List<QueueMessageReference> additionalMessages = intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR, MAXIMUM_MESSAGES);

Review Comment:
   it will do a full scan only if there are less than 1000 messages for this selector. but yeah, I agree it may be expensive. I will take a look what we can do about it. but any ideas are highly appreciated 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] NikitaShupletsov commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
NikitaShupletsov commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1065073940


##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+    private static final Logger logger = LoggerFactory.getLogger(ReplicaCompactor.class);
+    private static final String CONSUMER_SELECTOR = String.format("%s LIKE '%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+    public static final int MAXIMUM_MESSAGES = 1_000;
+
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaReplicationQueueSupplier queueProvider;
+    private final PrefetchSubscription subscription;
+
+    private final Queue intermediateQueue;
+
+    public ReplicaCompactor(Broker broker, ConnectionContext connectionContext, ReplicaReplicationQueueSupplier queueProvider, PrefetchSubscription subscription) {
+        this.broker = broker;
+        this.connectionContext = connectionContext;
+        this.queueProvider = queueProvider;
+        this.subscription = subscription;
+
+        intermediateQueue = broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+                .map(DestinationExtractor::extractQueue).orElseThrow();
+    }
+
+    List<MessageReference> compactAndFilter(List<MessageReference> list, boolean withAdditionalMessages) throws Exception {
+        List<DeliveredMessageReference> toProcess = list.stream()
+                .map(DeliveredMessageReference::new)
+                .collect(Collectors.toList());
+
+        int prefetchSize = subscription.getPrefetchSize();
+        try {
+            if (withAdditionalMessages) {
+                subscription.setPrefetchSize(0);
+                toProcess.addAll(getAdditionalMessages());
+            }
+
+            List<DeliveredMessageReference> processed = compactAndFilter0(toProcess);
+
+            Set<MessageId> messageIds = list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+            return processed.stream()
+                    .map(dmr -> dmr.messageReference)
+                    .filter(mr -> messageIds.contains(mr.getMessageId()))
+                    .collect(Collectors.toList());
+        } finally {
+            subscription.setPrefetchSize(prefetchSize);
+        }
+    }
+
+    private List<DeliveredMessageReference> getAdditionalMessages() throws Exception {
+        List<DeliveredMessageReference> result = new ArrayList<>();
+        List<QueueMessageReference> additionalMessages = intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR, MAXIMUM_MESSAGES);

Review Comment:
   we expect to have only 2 messages here - one for source and one for replica. so it should impact the performance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] lucastetreault commented on a diff in pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
lucastetreault commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1062161446


##########
activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java:
##########
@@ -1296,9 +1297,36 @@ public QueueMessageReference getMessage(String id) {
         return null;
     }
 
+    public List<MessageId> getAllMessageIds() throws Exception {

Review Comment:
   Seems like there is a risk of out of memory exception if you're going to load all the messages from a queue in to memory? 



##########
activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java:
##########
@@ -412,5 +412,5 @@ public interface Broker extends Region, Service {
 
     void networkBridgeStopped(BrokerInfo brokerInfo);
 
-
+    void queuePurged(ConnectionContext context, ActiveMQDestination destination);

Review Comment:
   The verb makes me thing this should be an event listener as opposed to a method we call everywhere. Will check how this gets used later. 



##########
activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java:
##########
@@ -245,7 +245,7 @@ public String getUserName() {
         return userName;
     }
 
-    protected void setUserName(String userName) {
+    public void setUserName(String userName) {

Review Comment:
   Interested to see why this needs to be public. Will try to remember to come back. 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/PeriodAcknowledge.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PeriodAcknowledge {

Review Comment:
   Rename to PeriodicAcknowledge? 
   



##########
activemq-broker/src/main/java/org/apache/activemq/replica/PeriodAcknowledge.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PeriodAcknowledge {
+
+    private static final int MAX_ACK_BATCH_SIZE = 100;

Review Comment:
   Should this be configurable? 
   



##########
activemq-broker/src/main/java/org/apache/activemq/replica/PeriodAcknowledge.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PeriodAcknowledge {
+
+    private static final int MAX_ACK_BATCH_SIZE = 100;
+    private boolean safeToAck = true;
+    private final AtomicLong lastAckTime = new AtomicLong();
+    private final AtomicInteger pendingAckCount = new AtomicInteger();
+    private final AtomicReference<ActiveMQConnection> connection = new AtomicReference<>();
+    private final AtomicReference<ActiveMQSession> connectionSession = new AtomicReference<>();
+    private final long replicaAckPeriod;
+    private final Object periodicCommitLock = new Object();
+
+
+    public PeriodAcknowledge(long replicaAckPeriod) {
+        this.replicaAckPeriod = replicaAckPeriod;
+    }
+
+    public void setConnection(ActiveMQConnection activeMQConnection) {
+        connection.set(activeMQConnection);
+    }
+
+    public void setConnectionSession(ActiveMQSession activeMQSession) {
+        connectionSession.set(activeMQSession);
+    }
+
+    public void setSafeToAck(boolean safeToAck) {
+        this.safeToAck = safeToAck;
+    }
+
+    private boolean shouldPeriodicallyCommit() {

Review Comment:
   Class is already called PeriodAcknowledge and I think it's actually about acknowledging so maybe rename to `shouldAcknowledge`? 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBatcher.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ReplicaBatcher {
+
+    static final int MAX_BATCH_LENGTH = 500;
+    static final int MAX_BATCH_SIZE = 5_000_000; // 5 Mb
+
+    @SuppressWarnings("unchecked")
+    static List<List<MessageReference>> batches(List<MessageReference> list) throws Exception {
+        List<List<MessageReference>> result = new ArrayList<>();
+
+        Map<String, Set<String>> destination2eventType = new HashMap<>();
+        List<MessageReference> batch = new ArrayList<>();
+        int batchSize = 0;
+        for (MessageReference reference : list) {
+            ActiveMQMessage message = (ActiveMQMessage) reference.getMessage();
+            String originalDestination = message.getStringProperty(ReplicaSupport.ORIGINAL_MESSAGE_DESTINATION_PROPERTY);
+            ReplicaEventType currentEventType =
+                    ReplicaEventType.valueOf(message.getStringProperty(ReplicaEventType.EVENT_TYPE_PROPERTY));
+
+            boolean eventTypeSwitch = false;
+            if (originalDestination != null) {
+                Set<String> sends = destination2eventType.computeIfAbsent(originalDestination, k -> new HashSet<>());
+                if (currentEventType == ReplicaEventType.MESSAGE_SEND) {
+                    sends.add(message.getStringProperty(ReplicaSupport.MESSAGE_ID_PROPERTY));
+                }
+                if (currentEventType == ReplicaEventType.MESSAGE_ACK) {
+                    List<String> stringProperty = (List<String>) message.getProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY);
+                    if (sends.stream().anyMatch(stringProperty::contains)) {

Review Comment:
   Could be expensive for many sends and many acks in the stringProperty. Is it worth converting stringProperty to a set? 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBatcher.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ReplicaBatcher {
+
+    static final int MAX_BATCH_LENGTH = 500;

Review Comment:
   Should these be configurable? 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/PeriodAcknowledge.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PeriodAcknowledge {
+
+    private static final int MAX_ACK_BATCH_SIZE = 100;
+    private boolean safeToAck = true;
+    private final AtomicLong lastAckTime = new AtomicLong();
+    private final AtomicInteger pendingAckCount = new AtomicInteger();
+    private final AtomicReference<ActiveMQConnection> connection = new AtomicReference<>();
+    private final AtomicReference<ActiveMQSession> connectionSession = new AtomicReference<>();
+    private final long replicaAckPeriod;
+    private final Object periodicCommitLock = new Object();
+
+
+    public PeriodAcknowledge(long replicaAckPeriod) {
+        this.replicaAckPeriod = replicaAckPeriod;
+    }
+
+    public void setConnection(ActiveMQConnection activeMQConnection) {
+        connection.set(activeMQConnection);
+    }
+
+    public void setConnectionSession(ActiveMQSession activeMQSession) {
+        connectionSession.set(activeMQSession);
+    }
+
+    public void setSafeToAck(boolean safeToAck) {
+        this.safeToAck = safeToAck;
+    }
+
+    private boolean shouldPeriodicallyCommit() {
+        return System.currentTimeMillis() - lastAckTime.get() >= replicaAckPeriod;
+    }
+
+    private boolean reachedMaxAckBatchSize() {
+        return pendingAckCount.incrementAndGet() >= MAX_ACK_BATCH_SIZE;

Review Comment:
   Is the increment an intended side-effect of this method? 
   



##########
activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java:
##########
@@ -2347,6 +2436,25 @@ public void processDispatchNotification(MessageDispatchNotification messageDispa
         Subscription sub = getMatchingSubscription(messageDispatchNotification);
         if (sub != null) {
             MessageReference message = getMatchingMessage(messageDispatchNotification);
+
+            pagedInMessagesLock.writeLock().lock();

Review Comment:
   Is this necessary if the replication plugin isn't enabled? 
   



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBroker.java:
##########
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.lang.reflect.Method;
+import java.text.MessageFormat;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaBroker extends BrokerFilter {
+
+    private final static long REPLICA_ACK_PERIOD = 5_000;

Review Comment:
   Make configurable? 
   



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+    private static final Logger logger = LoggerFactory.getLogger(ReplicaCompactor.class);
+    private static final String CONSUMER_SELECTOR = String.format("%s LIKE '%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+    public static final int MAXIMUM_MESSAGES = 1_000;
+
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaReplicationQueueSupplier queueProvider;
+    private final PrefetchSubscription subscription;
+
+    private final Queue intermediateQueue;
+
+    public ReplicaCompactor(Broker broker, ConnectionContext connectionContext, ReplicaReplicationQueueSupplier queueProvider, PrefetchSubscription subscription) {
+        this.broker = broker;
+        this.connectionContext = connectionContext;
+        this.queueProvider = queueProvider;
+        this.subscription = subscription;
+
+        intermediateQueue = broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+                .map(DestinationExtractor::extractQueue).orElseThrow();
+    }
+
+    List<MessageReference> compactAndFilter(List<MessageReference> list, boolean withAdditionalMessages) throws Exception {
+        List<DeliveredMessageReference> toProcess = list.stream()
+                .map(DeliveredMessageReference::new)
+                .collect(Collectors.toList());
+
+        int prefetchSize = subscription.getPrefetchSize();
+        try {
+            if (withAdditionalMessages) {
+                subscription.setPrefetchSize(0);
+                toProcess.addAll(getAdditionalMessages());
+            }
+
+            List<DeliveredMessageReference> processed = compactAndFilter0(toProcess);
+
+            Set<MessageId> messageIds = list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+            return processed.stream()
+                    .map(dmr -> dmr.messageReference)
+                    .filter(mr -> messageIds.contains(mr.getMessageId()))
+                    .collect(Collectors.toList());
+        } finally {
+            subscription.setPrefetchSize(prefetchSize);
+        }
+    }
+
+    private List<DeliveredMessageReference> getAdditionalMessages() throws Exception {
+        List<DeliveredMessageReference> result = new ArrayList<>();
+        List<QueueMessageReference> additionalMessages = intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR, MAXIMUM_MESSAGES);
+        if (additionalMessages.isEmpty()) {
+            return result;
+        }
+
+        String selector = String.format("%s IN %s", ReplicaSupport.MESSAGE_ID_PROPERTY, getAckedMessageIds(additionalMessages));
+        additionalMessages.addAll(intermediateQueue.getMatchingMessages(connectionContext, selector, MAXIMUM_MESSAGES));
+
+        Set<MessageId> dispatchedMessageIds = subscription.getDispatched().stream()
+                .map(MessageReference::getMessageId)
+                .collect(Collectors.toSet());
+
+        for (MessageReference messageReference : additionalMessages) {
+            if (!dispatchedMessageIds.contains(messageReference.getMessageId())) {
+                result.add(new DeliveredMessageReference(messageReference, false));
+            }
+        }
+
+        return result;
+    }
+
+    private List<DeliveredMessageReference> compactAndFilter0(List<DeliveredMessageReference> list) throws Exception {

Review Comment:
   Just call it `compactAndFilter` since the signature is different? 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java:
##########
@@ -0,0 +1,482 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.MessageReferenceFilter;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaBrokerEventListener implements MessageListener {
+
+    private static final String REPLICATION_CONSUMER_CLIENT_ID = "DUMMY_REPLICATION_CONSUMER";
+    private static final String SEQUENCE_NAME = "replicaSeq";
+    private final Logger logger = LoggerFactory.getLogger(ReplicaBrokerEventListener.class);
+    private final ReplicaEventSerializer eventSerializer = new ReplicaEventSerializer();
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaInternalMessageProducer replicaInternalMessageProducer;
+
+    private final PeriodAcknowledge acknowledgeCallback;
+    final ReplicaSequenceStorage sequenceStorage;
+    BigInteger sequence;
+    MessageId sequenceMessageId;
+
+    ReplicaBrokerEventListener(Broker broker, ReplicaReplicationQueueSupplier queueProvider, PeriodAcknowledge acknowledgeCallback) {
+        this.broker = requireNonNull(broker);
+        this.acknowledgeCallback = requireNonNull(acknowledgeCallback);
+        connectionContext = broker.getAdminConnectionContext().copy();
+        connectionContext.setUserName(ReplicaSupport.REPLICATION_PLUGIN_USER_NAME);
+        connectionContext.setClientId(REPLICATION_CONSUMER_CLIENT_ID);
+        connectionContext.setConnection(new DummyConnection());
+        replicaInternalMessageProducer = new ReplicaInternalMessageProducer(broker, connectionContext);
+
+        createTransactionMapIfNotExist();
+
+        this.sequenceStorage = new ReplicaSequenceStorage(broker, connectionContext,
+                queueProvider, replicaInternalMessageProducer, SEQUENCE_NAME);
+    }
+
+    public void initialize() throws Exception {
+        String savedSequence = sequenceStorage.initialize();
+        sequence = savedSequence == null ? null : new BigInteger(savedSequence);
+    }
+
+    @Override
+    public void onMessage(Message jmsMessage) {
+        logger.trace("Received replication message from replica source");
+        ActiveMQMessage message = (ActiveMQMessage) jmsMessage;
+
+        processMessageWithRetries(message, null);
+    }
+
+    private synchronized void processMessageWithRetries(ActiveMQMessage message, TransactionId transactionId) {
+        new ReplicaEventRetrier(() -> {
+            boolean commit = false;
+            TransactionId tid = transactionId;
+            if (tid == null) {
+                tid = new LocalTransactionId(
+                        new ConnectionId(ReplicaSupport.REPLICATION_PLUGIN_CONNECTION_ID),
+                        ReplicaSupport.LOCAL_TRANSACTION_ID_GENERATOR.getNextSequenceId());
+
+                broker.beginTransaction(connectionContext, tid);
+
+                commit = true;
+            }
+
+            try {
+                ReplicaEventType eventType = getEventType(message);
+                if (eventType == ReplicaEventType.BATCH) {
+                    processBatch(message, tid);
+                } else {
+                    processMessage(message, eventType, tid);
+                }
+
+                if (commit) {
+                    sequenceStorage.enqueue(tid, sequence.toString());
+
+                    broker.commitTransaction(connectionContext, tid, true);
+                    acknowledgeCallback.setSafeToAck(true);
+                }
+            } catch (Exception e) {
+                if (commit) {
+                    broker.rollbackTransaction(connectionContext, tid);
+                }
+                acknowledgeCallback.setSafeToAck(false);
+                throw e;
+            }
+            return null;
+        }).process();
+    }
+
+    private void processMessage(ActiveMQMessage message, ReplicaEventType eventType, TransactionId transactionId) throws Exception {
+        Object deserializedData = eventSerializer.deserializeMessageData(message.getContent());
+        BigInteger newSequence = new BigInteger(message.getStringProperty(ReplicaSupport.SEQUENCE_PROPERTY));
+
+        long sequenceDifference = sequence == null ? 0 : newSequence.subtract(sequence).longValue();
+        MessageId messageId = message.getMessageId();
+        if (sequence == null || sequenceDifference == 1) {
+            processMessage(message, eventType, deserializedData, transactionId);
+
+            sequence = newSequence;
+            sequenceMessageId = messageId;
+
+        } else if (sequenceDifference > 0) {
+            throw new IllegalStateException(String.format(
+                    "Replication event is out of order. Current sequence: %s, the sequence of the event: %s",
+                    sequence, newSequence));
+        } else if (sequenceDifference < 0) {
+            logger.info("Replication message duplicate.");
+        } else if (!sequenceMessageId.equals(messageId)) {
+            throw new IllegalStateException(String.format(
+                    "Replication event is out of order. Current sequence %s belongs to message with id %s," +
+                            "but the id of the event is %s", sequence, sequenceMessageId, messageId));
+        }
+    }
+
+    private void processMessage(ActiveMQMessage message, ReplicaEventType eventType, Object deserializedData,
+            TransactionId transactionId) throws Exception {
+        switch (eventType) {
+            case DESTINATION_UPSERT:
+                logger.trace("Processing replicated destination");
+                upsertDestination((ActiveMQDestination) deserializedData);
+                return;
+            case DESTINATION_DELETE:
+                logger.trace("Processing replicated destination deletion");
+                deleteDestination((ActiveMQDestination) deserializedData);
+                return;
+            case MESSAGE_SEND:
+                logger.trace("Processing replicated message send");
+                persistMessage((ActiveMQMessage) deserializedData, transactionId);
+                return;
+            case MESSAGE_ACK:
+                logger.trace("Processing replicated messages dropped");
+                try {
+                    messageAck((MessageAck) deserializedData,
+                            (List<String>) message.getObjectProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY), transactionId);
+                } catch (JMSException e) {
+                    logger.error("Failed to extract property to replicate messages dropped [{}]", deserializedData, e);
+                    throw new Exception(e);
+                }
+                return;
+            case QUEUE_PURGED:
+                logger.trace("Processing queue purge");
+                purgeQueue((ActiveMQDestination) deserializedData);
+                return;
+            case TRANSACTION_BEGIN:
+                logger.trace("Processing replicated transaction begin");
+                beginTransaction((TransactionId) deserializedData);
+                return;
+            case TRANSACTION_PREPARE:
+                logger.trace("Processing replicated transaction prepare");
+                prepareTransaction((TransactionId) deserializedData);
+                return;
+            case TRANSACTION_FORGET:
+                logger.trace("Processing replicated transaction forget");
+                forgetTransaction((TransactionId) deserializedData);
+                return;
+            case TRANSACTION_ROLLBACK:
+                logger.trace("Processing replicated transaction rollback");
+                rollbackTransaction((TransactionId) deserializedData);
+                return;
+            case TRANSACTION_COMMIT:
+                logger.trace("Processing replicated transaction commit");
+                try {
+                    commitTransaction(
+                            (TransactionId) deserializedData,
+                            message.getBooleanProperty(ReplicaSupport.TRANSACTION_ONE_PHASE_PROPERTY));
+                } catch (JMSException e) {
+                    logger.error("Failed to extract property to replicate transaction commit with id [{}]", deserializedData, e);
+                    throw new Exception(e);
+                }
+                return;
+            case ADD_DURABLE_CONSUMER:
+                logger.trace("Processing replicated add consumer");
+                try {
+                    addDurableConsumer((ConsumerInfo) deserializedData,
+                            message.getStringProperty(ReplicaSupport.CLIENT_ID_PROPERTY));
+                } catch (JMSException e) {
+                    logger.error("Failed to extract property to replicate add consumer [{}]", deserializedData, e);
+                    throw new Exception(e);
+                }
+                return;
+            case REMOVE_DURABLE_CONSUMER:
+                logger.trace("Processing replicated remove consumer");
+                removeDurableConsumer((ConsumerInfo) deserializedData);
+                return;
+            default:
+                throw new IllegalStateException(
+                        String.format("Unhandled event type \"%s\" for replication message id: %s",
+                                eventType, message.getJMSMessageID()));
+        }
+    }
+
+    private void processBatch(ActiveMQMessage message, TransactionId tid) throws Exception {
+        List<Object> objects = eventSerializer.deserializeListOfObjects(message.getContent().getData());
+        for (Object o : objects) {
+            processMessageWithRetries((ActiveMQMessage) o, tid);
+        }
+    }
+
+    private void upsertDestination(ActiveMQDestination destination) throws Exception {
+        try {
+            boolean isExistingDestination = Arrays.stream(broker.getDestinations())
+                    .anyMatch(d -> d.getQualifiedName().equals(destination.getQualifiedName()));
+            if (isExistingDestination) {
+                logger.debug("Destination [{}] already exists, no action to take", destination);
+                return;
+            }
+        } catch (Exception e) {
+            logger.error("Unable to determine if [{}] is an existing destination", destination, e);
+            throw e;
+        }
+        try {
+            broker.addDestination(connectionContext, destination, true);
+        } catch (Exception e) {
+            logger.error("Unable to add destination [{}]", destination, e);
+            throw e;
+        }
+    }
+
+    private void deleteDestination(ActiveMQDestination destination) throws Exception {
+        try {
+            boolean isNonExtantDestination = Arrays.stream(broker.getDestinations())

Review Comment:
   nit: typo
   



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+    private static final Logger logger = LoggerFactory.getLogger(ReplicaCompactor.class);
+    private static final String CONSUMER_SELECTOR = String.format("%s LIKE '%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+    public static final int MAXIMUM_MESSAGES = 1_000;
+
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaReplicationQueueSupplier queueProvider;
+    private final PrefetchSubscription subscription;
+
+    private final Queue intermediateQueue;
+
+    public ReplicaCompactor(Broker broker, ConnectionContext connectionContext, ReplicaReplicationQueueSupplier queueProvider, PrefetchSubscription subscription) {
+        this.broker = broker;
+        this.connectionContext = connectionContext;
+        this.queueProvider = queueProvider;
+        this.subscription = subscription;
+
+        intermediateQueue = broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+                .map(DestinationExtractor::extractQueue).orElseThrow();
+    }
+
+    List<MessageReference> compactAndFilter(List<MessageReference> list, boolean withAdditionalMessages) throws Exception {
+        List<DeliveredMessageReference> toProcess = list.stream()
+                .map(DeliveredMessageReference::new)
+                .collect(Collectors.toList());
+
+        int prefetchSize = subscription.getPrefetchSize();
+        try {
+            if (withAdditionalMessages) {
+                subscription.setPrefetchSize(0);
+                toProcess.addAll(getAdditionalMessages());
+            }
+
+            List<DeliveredMessageReference> processed = compactAndFilter0(toProcess);
+
+            Set<MessageId> messageIds = list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+            return processed.stream()
+                    .map(dmr -> dmr.messageReference)
+                    .filter(mr -> messageIds.contains(mr.getMessageId()))
+                    .collect(Collectors.toList());
+        } finally {
+            subscription.setPrefetchSize(prefetchSize);
+        }
+    }
+
+    private List<DeliveredMessageReference> getAdditionalMessages() throws Exception {
+        List<DeliveredMessageReference> result = new ArrayList<>();
+        List<QueueMessageReference> additionalMessages = intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR, MAXIMUM_MESSAGES);
+        if (additionalMessages.isEmpty()) {
+            return result;
+        }
+
+        String selector = String.format("%s IN %s", ReplicaSupport.MESSAGE_ID_PROPERTY, getAckedMessageIds(additionalMessages));
+        additionalMessages.addAll(intermediateQueue.getMatchingMessages(connectionContext, selector, MAXIMUM_MESSAGES));
+
+        Set<MessageId> dispatchedMessageIds = subscription.getDispatched().stream()
+                .map(MessageReference::getMessageId)
+                .collect(Collectors.toSet());
+
+        for (MessageReference messageReference : additionalMessages) {
+            if (!dispatchedMessageIds.contains(messageReference.getMessageId())) {
+                result.add(new DeliveredMessageReference(messageReference, false));
+            }
+        }
+
+        return result;
+    }
+
+    private List<DeliveredMessageReference> compactAndFilter0(List<DeliveredMessageReference> list) throws Exception {
+        List<DeliveredMessageReference> result = new ArrayList<>(list);
+
+        List<Destination> destinations = combineByDestination(list);
+
+        List<DeliveredMessageId> toDelete = compact(destinations);
+
+        if (toDelete.isEmpty()) {
+            return result;
+        }
+
+        acknowledge(toDelete);
+
+        List<MessageId> messageIds = toDelete.stream().map(dmid -> dmid.messageId).collect(Collectors.toList());
+        result.removeIf(reference -> messageIds.contains(reference.messageReference.getMessageId()));
+
+        return result;
+    }
+
+    private void acknowledge(List<DeliveredMessageId> list) throws Exception {
+        TransactionId transactionId = new LocalTransactionId(
+                new ConnectionId(ReplicaSupport.REPLICATION_PLUGIN_CONNECTION_ID),
+                ReplicaSupport.LOCAL_TRANSACTION_ID_GENERATOR.getNextSequenceId());
+
+        synchronized (ReplicaSupport.INTERMEDIATE_QUEUE_MUTEX) {
+            broker.beginTransaction(connectionContext, transactionId);
+
+            ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
+            consumerExchange.setConnectionContext(connectionContext);
+
+            for (DeliveredMessageId deliveredMessageId : list) {
+                if (!deliveredMessageId.delivered) {
+                    messageDispatch(deliveredMessageId.messageId);
+                }
+
+                MessageAck messageAck = new MessageAck();
+                messageAck.setMessageID(deliveredMessageId.messageId);
+                messageAck.setMessageCount(1);
+                messageAck.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
+                messageAck.setDestination(queueProvider.getIntermediateQueue());
+
+                consumerExchange.setSubscription(subscription);
+
+                broker.acknowledge(consumerExchange, messageAck);
+            }
+
+            broker.commitTransaction(connectionContext, transactionId, true);
+        }
+    }
+
+    private List<Destination> combineByDestination(List<DeliveredMessageReference> list) throws Exception {
+        Map<String, Destination> result = new HashMap<>();
+        for (DeliveredMessageReference reference : list) {
+            ActiveMQMessage message = (ActiveMQMessage) reference.messageReference.getMessage();
+
+            ReplicaEventType eventType =
+                    ReplicaEventType.valueOf(message.getStringProperty(ReplicaEventType.EVENT_TYPE_PROPERTY));
+            if (eventType != ReplicaEventType.MESSAGE_SEND && eventType != ReplicaEventType.MESSAGE_ACK) {
+                continue;
+            }
+
+            if (!message.getBooleanProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY)
+                    || message.getBooleanProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_IN_XA_TRANSACTION_PROPERTY)) {
+                continue;
+            }
+
+            Destination destination =
+                    result.computeIfAbsent(message.getStringProperty(ReplicaSupport.ORIGINAL_MESSAGE_DESTINATION_PROPERTY),
+                            k -> new Destination());
+
+            if (eventType == ReplicaEventType.MESSAGE_SEND) {
+                destination.sendMap.put(message.getStringProperty(ReplicaSupport.MESSAGE_ID_PROPERTY),
+                        new DeliveredMessageId(message.getMessageId(), reference.delivered));
+            }
+            if (eventType == ReplicaEventType.MESSAGE_ACK) {
+                List<String> messageIds = getAckMessageIds(message);
+                destination.acks.add(new Ack(messageIds, message, reference.delivered));
+            }
+        }
+
+        return new ArrayList<>(result.values());
+    }
+
+    private List<DeliveredMessageId> compact(List<Destination> destinations) throws IOException {
+        List<DeliveredMessageId> result = new ArrayList<>();
+        for (Destination destination : destinations) {
+            for (Ack ack : destination.acks) {
+                List<String> sends = new ArrayList<>();
+                for (String id : ack.messageIdsToAck) {
+                    if (destination.sendMap.containsKey(id)) {
+                        sends.add(id);
+                        result.add(destination.sendMap.get(id));
+                    }
+                }
+                if (sends.size() == 0) {
+                    continue;
+                }
+
+                if (ack.messageIdsToAck.size() == sends.size() && new HashSet<>(ack.messageIdsToAck).containsAll(sends)) {
+                    result.add(ack);
+                } else {
+                    updateMessage(ack.message, ack.messageIdsToAck, sends);
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private void updateMessage(ActiveMQMessage message, List<String> messageIdsToAck, List<String> sends) throws IOException {
+        message.setProperty(ReplicaSupport.ORIGINAL_MESSAGE_IDS_PROPERTY, messageIdsToAck);
+        ArrayList<String> newList = new ArrayList<>(messageIdsToAck);
+        newList.removeAll(sends);
+        message.setProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY, newList);
+
+        synchronized (ReplicaSupport.INTERMEDIATE_QUEUE_MUTEX) {
+            intermediateQueue.getMessageStore().updateMessage(message);
+        }
+    }
+
+    private String getAckedMessageIds(List<QueueMessageReference> ackMessages) throws IOException {
+        List<String> messageIds = new ArrayList<>();
+        for (QueueMessageReference messageReference : ackMessages) {
+            ActiveMQMessage message = (ActiveMQMessage) messageReference.getMessage();
+
+            messageIds.addAll(getAckMessageIds(message));
+        }
+
+        return messageIds.stream().collect(Collectors.joining("','", "('", "')"));
+    }
+
+    private void messageDispatch(MessageId messageId) throws Exception {
+        MessageDispatchNotification mdn = new MessageDispatchNotification();
+        mdn.setConsumerId(subscription.getConsumerInfo().getConsumerId());
+        mdn.setDestination(queueProvider.getIntermediateQueue());
+        mdn.setMessageId(messageId);
+        broker.processDispatchNotification(mdn);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static List<String> getAckMessageIds(ActiveMQMessage message) throws IOException {
+        return (List<String>)
+                Optional.ofNullable(message.getProperty(ReplicaSupport.ORIGINAL_MESSAGE_IDS_PROPERTY))
+                        .orElse(message.getProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY));
+    }
+
+    private static class DeliveredMessageReference {
+        final MessageReference messageReference;
+        final boolean delivered;
+
+        public DeliveredMessageReference(MessageReference messageReference) {
+            this(messageReference, true);
+        }
+
+        public DeliveredMessageReference(MessageReference messageReference, boolean delivered) {
+            this.messageReference = messageReference;
+            this.delivered = delivered;
+        }
+    }
+
+    private static class Destination {

Review Comment:
   This confused me in the code above since there are other Destination classes like org.apache.activemq.broker.region.Destination and javax.jms.Destination used throughout the code base. Maybe rename to something more descriptive of what this is used for? 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+    private static final Logger logger = LoggerFactory.getLogger(ReplicaCompactor.class);
+    private static final String CONSUMER_SELECTOR = String.format("%s LIKE '%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+    public static final int MAXIMUM_MESSAGES = 1_000;
+
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaReplicationQueueSupplier queueProvider;
+    private final PrefetchSubscription subscription;
+
+    private final Queue intermediateQueue;
+
+    public ReplicaCompactor(Broker broker, ConnectionContext connectionContext, ReplicaReplicationQueueSupplier queueProvider, PrefetchSubscription subscription) {
+        this.broker = broker;
+        this.connectionContext = connectionContext;
+        this.queueProvider = queueProvider;
+        this.subscription = subscription;
+
+        intermediateQueue = broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+                .map(DestinationExtractor::extractQueue).orElseThrow();
+    }
+
+    List<MessageReference> compactAndFilter(List<MessageReference> list, boolean withAdditionalMessages) throws Exception {
+        List<DeliveredMessageReference> toProcess = list.stream()
+                .map(DeliveredMessageReference::new)
+                .collect(Collectors.toList());
+
+        int prefetchSize = subscription.getPrefetchSize();
+        try {
+            if (withAdditionalMessages) {
+                subscription.setPrefetchSize(0);
+                toProcess.addAll(getAdditionalMessages());
+            }
+
+            List<DeliveredMessageReference> processed = compactAndFilter0(toProcess);
+
+            Set<MessageId> messageIds = list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+            return processed.stream()
+                    .map(dmr -> dmr.messageReference)
+                    .filter(mr -> messageIds.contains(mr.getMessageId()))
+                    .collect(Collectors.toList());
+        } finally {
+            subscription.setPrefetchSize(prefetchSize);
+        }
+    }
+
+    private List<DeliveredMessageReference> getAdditionalMessages() throws Exception {
+        List<DeliveredMessageReference> result = new ArrayList<>();
+        List<QueueMessageReference> additionalMessages = intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR, MAXIMUM_MESSAGES);
+        if (additionalMessages.isEmpty()) {
+            return result;
+        }
+
+        String selector = String.format("%s IN %s", ReplicaSupport.MESSAGE_ID_PROPERTY, getAckedMessageIds(additionalMessages));
+        additionalMessages.addAll(intermediateQueue.getMatchingMessages(connectionContext, selector, MAXIMUM_MESSAGES));

Review Comment:
   Using a selector here means you're doing a full scan of the queue every time. That seems expensive! 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java:
##########
@@ -0,0 +1,482 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.MessageReferenceFilter;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaBrokerEventListener implements MessageListener {
+
+    private static final String REPLICATION_CONSUMER_CLIENT_ID = "DUMMY_REPLICATION_CONSUMER";
+    private static final String SEQUENCE_NAME = "replicaSeq";
+    private final Logger logger = LoggerFactory.getLogger(ReplicaBrokerEventListener.class);
+    private final ReplicaEventSerializer eventSerializer = new ReplicaEventSerializer();
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaInternalMessageProducer replicaInternalMessageProducer;
+
+    private final PeriodAcknowledge acknowledgeCallback;
+    final ReplicaSequenceStorage sequenceStorage;
+    BigInteger sequence;
+    MessageId sequenceMessageId;
+
+    ReplicaBrokerEventListener(Broker broker, ReplicaReplicationQueueSupplier queueProvider, PeriodAcknowledge acknowledgeCallback) {
+        this.broker = requireNonNull(broker);
+        this.acknowledgeCallback = requireNonNull(acknowledgeCallback);
+        connectionContext = broker.getAdminConnectionContext().copy();
+        connectionContext.setUserName(ReplicaSupport.REPLICATION_PLUGIN_USER_NAME);
+        connectionContext.setClientId(REPLICATION_CONSUMER_CLIENT_ID);
+        connectionContext.setConnection(new DummyConnection());
+        replicaInternalMessageProducer = new ReplicaInternalMessageProducer(broker, connectionContext);
+
+        createTransactionMapIfNotExist();
+
+        this.sequenceStorage = new ReplicaSequenceStorage(broker, connectionContext,
+                queueProvider, replicaInternalMessageProducer, SEQUENCE_NAME);
+    }
+
+    public void initialize() throws Exception {
+        String savedSequence = sequenceStorage.initialize();
+        sequence = savedSequence == null ? null : new BigInteger(savedSequence);
+    }
+
+    @Override
+    public void onMessage(Message jmsMessage) {
+        logger.trace("Received replication message from replica source");
+        ActiveMQMessage message = (ActiveMQMessage) jmsMessage;
+
+        processMessageWithRetries(message, null);
+    }
+
+    private synchronized void processMessageWithRetries(ActiveMQMessage message, TransactionId transactionId) {
+        new ReplicaEventRetrier(() -> {
+            boolean commit = false;
+            TransactionId tid = transactionId;
+            if (tid == null) {
+                tid = new LocalTransactionId(
+                        new ConnectionId(ReplicaSupport.REPLICATION_PLUGIN_CONNECTION_ID),
+                        ReplicaSupport.LOCAL_TRANSACTION_ID_GENERATOR.getNextSequenceId());
+
+                broker.beginTransaction(connectionContext, tid);
+
+                commit = true;
+            }
+
+            try {
+                ReplicaEventType eventType = getEventType(message);
+                if (eventType == ReplicaEventType.BATCH) {
+                    processBatch(message, tid);
+                } else {
+                    processMessage(message, eventType, tid);
+                }
+
+                if (commit) {
+                    sequenceStorage.enqueue(tid, sequence.toString());
+
+                    broker.commitTransaction(connectionContext, tid, true);
+                    acknowledgeCallback.setSafeToAck(true);
+                }
+            } catch (Exception e) {
+                if (commit) {
+                    broker.rollbackTransaction(connectionContext, tid);
+                }
+                acknowledgeCallback.setSafeToAck(false);
+                throw e;
+            }
+            return null;
+        }).process();
+    }
+
+    private void processMessage(ActiveMQMessage message, ReplicaEventType eventType, TransactionId transactionId) throws Exception {
+        Object deserializedData = eventSerializer.deserializeMessageData(message.getContent());
+        BigInteger newSequence = new BigInteger(message.getStringProperty(ReplicaSupport.SEQUENCE_PROPERTY));
+
+        long sequenceDifference = sequence == null ? 0 : newSequence.subtract(sequence).longValue();
+        MessageId messageId = message.getMessageId();
+        if (sequence == null || sequenceDifference == 1) {
+            processMessage(message, eventType, deserializedData, transactionId);
+
+            sequence = newSequence;
+            sequenceMessageId = messageId;
+
+        } else if (sequenceDifference > 0) {
+            throw new IllegalStateException(String.format(
+                    "Replication event is out of order. Current sequence: %s, the sequence of the event: %s",
+                    sequence, newSequence));
+        } else if (sequenceDifference < 0) {
+            logger.info("Replication message duplicate.");
+        } else if (!sequenceMessageId.equals(messageId)) {
+            throw new IllegalStateException(String.format(
+                    "Replication event is out of order. Current sequence %s belongs to message with id %s," +
+                            "but the id of the event is %s", sequence, sequenceMessageId, messageId));
+        }
+    }
+
+    private void processMessage(ActiveMQMessage message, ReplicaEventType eventType, Object deserializedData,
+            TransactionId transactionId) throws Exception {
+        switch (eventType) {
+            case DESTINATION_UPSERT:
+                logger.trace("Processing replicated destination");
+                upsertDestination((ActiveMQDestination) deserializedData);
+                return;
+            case DESTINATION_DELETE:
+                logger.trace("Processing replicated destination deletion");
+                deleteDestination((ActiveMQDestination) deserializedData);
+                return;
+            case MESSAGE_SEND:
+                logger.trace("Processing replicated message send");
+                persistMessage((ActiveMQMessage) deserializedData, transactionId);
+                return;
+            case MESSAGE_ACK:
+                logger.trace("Processing replicated messages dropped");
+                try {
+                    messageAck((MessageAck) deserializedData,
+                            (List<String>) message.getObjectProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY), transactionId);
+                } catch (JMSException e) {
+                    logger.error("Failed to extract property to replicate messages dropped [{}]", deserializedData, e);
+                    throw new Exception(e);
+                }
+                return;
+            case QUEUE_PURGED:
+                logger.trace("Processing queue purge");
+                purgeQueue((ActiveMQDestination) deserializedData);
+                return;
+            case TRANSACTION_BEGIN:
+                logger.trace("Processing replicated transaction begin");
+                beginTransaction((TransactionId) deserializedData);
+                return;
+            case TRANSACTION_PREPARE:
+                logger.trace("Processing replicated transaction prepare");
+                prepareTransaction((TransactionId) deserializedData);
+                return;
+            case TRANSACTION_FORGET:
+                logger.trace("Processing replicated transaction forget");
+                forgetTransaction((TransactionId) deserializedData);
+                return;
+            case TRANSACTION_ROLLBACK:
+                logger.trace("Processing replicated transaction rollback");
+                rollbackTransaction((TransactionId) deserializedData);
+                return;
+            case TRANSACTION_COMMIT:
+                logger.trace("Processing replicated transaction commit");
+                try {
+                    commitTransaction(
+                            (TransactionId) deserializedData,
+                            message.getBooleanProperty(ReplicaSupport.TRANSACTION_ONE_PHASE_PROPERTY));
+                } catch (JMSException e) {
+                    logger.error("Failed to extract property to replicate transaction commit with id [{}]", deserializedData, e);
+                    throw new Exception(e);
+                }
+                return;
+            case ADD_DURABLE_CONSUMER:
+                logger.trace("Processing replicated add consumer");
+                try {
+                    addDurableConsumer((ConsumerInfo) deserializedData,
+                            message.getStringProperty(ReplicaSupport.CLIENT_ID_PROPERTY));
+                } catch (JMSException e) {
+                    logger.error("Failed to extract property to replicate add consumer [{}]", deserializedData, e);
+                    throw new Exception(e);
+                }
+                return;
+            case REMOVE_DURABLE_CONSUMER:
+                logger.trace("Processing replicated remove consumer");
+                removeDurableConsumer((ConsumerInfo) deserializedData);
+                return;
+            default:
+                throw new IllegalStateException(
+                        String.format("Unhandled event type \"%s\" for replication message id: %s",
+                                eventType, message.getJMSMessageID()));
+        }
+    }
+
+    private void processBatch(ActiveMQMessage message, TransactionId tid) throws Exception {
+        List<Object> objects = eventSerializer.deserializeListOfObjects(message.getContent().getData());
+        for (Object o : objects) {
+            processMessageWithRetries((ActiveMQMessage) o, tid);
+        }
+    }
+
+    private void upsertDestination(ActiveMQDestination destination) throws Exception {
+        try {
+            boolean isExistingDestination = Arrays.stream(broker.getDestinations())
+                    .anyMatch(d -> d.getQualifiedName().equals(destination.getQualifiedName()));
+            if (isExistingDestination) {
+                logger.debug("Destination [{}] already exists, no action to take", destination);
+                return;
+            }
+        } catch (Exception e) {
+            logger.error("Unable to determine if [{}] is an existing destination", destination, e);
+            throw e;
+        }
+        try {
+            broker.addDestination(connectionContext, destination, true);
+        } catch (Exception e) {
+            logger.error("Unable to add destination [{}]", destination, e);
+            throw e;
+        }
+    }
+
+    private void deleteDestination(ActiveMQDestination destination) throws Exception {
+        try {
+            boolean isNonExtantDestination = Arrays.stream(broker.getDestinations())
+                    .noneMatch(d -> d.getQualifiedName().equals(destination.getQualifiedName()));
+            if (isNonExtantDestination) {
+                logger.debug("Destination [{}] does not exist, no action to take", destination);
+                return;
+            }
+        } catch (Exception e) {
+            logger.error("Unable to determine if [{}] is an existing destination", destination, e);
+            throw e;
+        }
+        try {
+            broker.removeDestination(connectionContext, destination, 1000);

Review Comment:
   What is the `1000` here? 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequenceStorage.java:
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaSequenceStorage {
+
+    private final Logger logger = LoggerFactory.getLogger(ReplicaSequenceStorage.class);
+
+    static final String SEQUENCE_NAME_PROPERTY = "SequenceName";
+    private final LongSequenceGenerator eventMessageIdGenerator = new LongSequenceGenerator();
+    private final ProducerId replicationProducerId = new ProducerId();
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaInternalMessageProducer replicaInternalMessageProducer;
+    private final String sequenceName;
+    private final ReplicaReplicationQueueSupplier queueProvider;
+
+    private Queue sequenceQueue;
+    private PrefetchSubscription subscription;
+
+    public ReplicaSequenceStorage(Broker broker, ConnectionContext connectionContext, ReplicaReplicationQueueSupplier queueProvider,
+                                  ReplicaInternalMessageProducer replicaInternalMessageProducer, String sequenceName) {
+        this.broker = requireNonNull(broker);
+        this.connectionContext = connectionContext;
+        this.replicaInternalMessageProducer = replicaInternalMessageProducer;
+        this.sequenceName = requireNonNull(sequenceName);
+        this.queueProvider = queueProvider;
+
+        replicationProducerId.setConnectionId(new IdGenerator().generateId());
+    }
+
+    public String initialize() throws Exception {
+        sequenceQueue = broker.getDestinations(queueProvider.getSequenceQueue()).stream().findFirst()
+            .map(DestinationExtractor::extractQueue).orElseThrow();
+
+        String selector = String.format("%s LIKE '%s'", SEQUENCE_NAME_PROPERTY, sequenceName);
+
+        ConnectionId connectionId = new ConnectionId(new IdGenerator("ReplicationPlugin.ReplicaSequenceStorage").generateId());
+        SessionId sessionId = new SessionId(connectionId, new LongSequenceGenerator().getNextSequenceId());
+        ConsumerId consumerId = new ConsumerId(sessionId, new LongSequenceGenerator().getNextSequenceId());
+        ConsumerInfo consumerInfo = new ConsumerInfo();
+        consumerInfo.setConsumerId(consumerId);
+        consumerInfo.setPrefetchSize(10);
+        consumerInfo.setDestination(queueProvider.getSequenceQueue());
+        consumerInfo.setSelector(selector);
+        subscription = (PrefetchSubscription) broker.addConsumer(connectionContext, consumerInfo);
+
+        List<ActiveMQTextMessage> allMessages = new ArrayList<>();
+        for (MessageId messageId : sequenceQueue.getAllMessageIds()) {
+            ActiveMQTextMessage message = getMessageByMessageId(messageId);
+            if (message.getStringProperty(SEQUENCE_NAME_PROPERTY).equals(sequenceName)) {

Review Comment:
   You already have the selector checking this right? 
   



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequenceStorage.java:
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaSequenceStorage {
+
+    private final Logger logger = LoggerFactory.getLogger(ReplicaSequenceStorage.class);
+
+    static final String SEQUENCE_NAME_PROPERTY = "SequenceName";
+    private final LongSequenceGenerator eventMessageIdGenerator = new LongSequenceGenerator();
+    private final ProducerId replicationProducerId = new ProducerId();
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaInternalMessageProducer replicaInternalMessageProducer;
+    private final String sequenceName;
+    private final ReplicaReplicationQueueSupplier queueProvider;
+
+    private Queue sequenceQueue;
+    private PrefetchSubscription subscription;
+
+    public ReplicaSequenceStorage(Broker broker, ConnectionContext connectionContext, ReplicaReplicationQueueSupplier queueProvider,
+                                  ReplicaInternalMessageProducer replicaInternalMessageProducer, String sequenceName) {
+        this.broker = requireNonNull(broker);
+        this.connectionContext = connectionContext;
+        this.replicaInternalMessageProducer = replicaInternalMessageProducer;
+        this.sequenceName = requireNonNull(sequenceName);
+        this.queueProvider = queueProvider;
+
+        replicationProducerId.setConnectionId(new IdGenerator().generateId());
+    }
+
+    public String initialize() throws Exception {
+        sequenceQueue = broker.getDestinations(queueProvider.getSequenceQueue()).stream().findFirst()
+            .map(DestinationExtractor::extractQueue).orElseThrow();
+
+        String selector = String.format("%s LIKE '%s'", SEQUENCE_NAME_PROPERTY, sequenceName);
+
+        ConnectionId connectionId = new ConnectionId(new IdGenerator("ReplicationPlugin.ReplicaSequenceStorage").generateId());
+        SessionId sessionId = new SessionId(connectionId, new LongSequenceGenerator().getNextSequenceId());
+        ConsumerId consumerId = new ConsumerId(sessionId, new LongSequenceGenerator().getNextSequenceId());
+        ConsumerInfo consumerInfo = new ConsumerInfo();
+        consumerInfo.setConsumerId(consumerId);
+        consumerInfo.setPrefetchSize(10);
+        consumerInfo.setDestination(queueProvider.getSequenceQueue());
+        consumerInfo.setSelector(selector);
+        subscription = (PrefetchSubscription) broker.addConsumer(connectionContext, consumerInfo);
+
+        List<ActiveMQTextMessage> allMessages = new ArrayList<>();
+        for (MessageId messageId : sequenceQueue.getAllMessageIds()) {
+            ActiveMQTextMessage message = getMessageByMessageId(messageId);
+            if (message.getStringProperty(SEQUENCE_NAME_PROPERTY).equals(sequenceName)) {
+                allMessages.add(message);
+            }
+        }
+
+        if (allMessages.size() == 0) {
+            return null;
+        }
+
+        if (allMessages.size() > 1) {
+            for (int i = 0; i < allMessages.size() - 1; i++) {
+                sequenceQueue.removeMessage(allMessages.get(i).getMessageId().toString());
+            }
+        }
+
+        return allMessages.get(0).getText();
+    }
+
+    public void enqueue(TransactionId tid, String message) throws Exception {
+        // before enqueue message, we acknowledge all messages currently in queue.
+        acknowledgeAll(tid);
+
+        send(tid, message);
+    }
+
+    private void acknowledgeAll(TransactionId tid) throws Exception {
+        List<MessageReference> dispatched = subscription.getDispatched();
+        ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
+        consumerExchange.setConnectionContext(connectionContext);
+        consumerExchange.setSubscription(subscription);
+
+        for(MessageReference messageReference: dispatched) {

Review Comment:
   Is it any cheaper to send 1 ack for the whole range? 
   
   ack.setFirstMessage(...)
   ack.setLastMessage(...)
   ack.setMessageCount(...)



##########
activemq-broker/src/main/java/org/apache/activemq/replica/PeriodAcknowledge.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PeriodAcknowledge {
+
+    private static final int MAX_ACK_BATCH_SIZE = 100;
+    private boolean safeToAck = true;
+    private final AtomicLong lastAckTime = new AtomicLong();
+    private final AtomicInteger pendingAckCount = new AtomicInteger();
+    private final AtomicReference<ActiveMQConnection> connection = new AtomicReference<>();
+    private final AtomicReference<ActiveMQSession> connectionSession = new AtomicReference<>();
+    private final long replicaAckPeriod;
+    private final Object periodicCommitLock = new Object();
+
+
+    public PeriodAcknowledge(long replicaAckPeriod) {
+        this.replicaAckPeriod = replicaAckPeriod;
+    }
+
+    public void setConnection(ActiveMQConnection activeMQConnection) {
+        connection.set(activeMQConnection);
+    }
+
+    public void setConnectionSession(ActiveMQSession activeMQSession) {
+        connectionSession.set(activeMQSession);
+    }
+
+    public void setSafeToAck(boolean safeToAck) {
+        this.safeToAck = safeToAck;
+    }
+
+    private boolean shouldPeriodicallyCommit() {
+        return System.currentTimeMillis() - lastAckTime.get() >= replicaAckPeriod;
+    }
+
+    private boolean reachedMaxAckBatchSize() {
+        return pendingAckCount.incrementAndGet() >= MAX_ACK_BATCH_SIZE;
+    }
+
+    public void acknowledge() throws Exception {
+        if (connection.get() == null || connectionSession.get() == null || !safeToAck) {
+            return;
+        }
+
+        synchronized (periodicCommitLock) {

Review Comment:
   I think there's less contention if you only lock when `reachedMaxAckBatchSize() || shouldPeriodicallyCommit()`. You'd have to double check once you get the lock though I guess 🤔



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaAckHelper.java:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ReplicaAckHelper {
+
+    private final Broker broker;
+
+    public ReplicaAckHelper(Broker broker) {
+        this.broker = broker;
+    }
+
+    public List<MessageReference> getMessagesToAck(MessageAck ack, Destination destination) {
+        PrefetchSubscription prefetchSubscription = getPrefetchSubscription(destination, ack.getConsumerId());
+        if (prefetchSubscription == null) {
+            return null;

Review Comment:
   Can we return an Optional here? 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+    private static final Logger logger = LoggerFactory.getLogger(ReplicaCompactor.class);
+    private static final String CONSUMER_SELECTOR = String.format("%s LIKE '%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+    public static final int MAXIMUM_MESSAGES = 1_000;
+
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaReplicationQueueSupplier queueProvider;
+    private final PrefetchSubscription subscription;
+
+    private final Queue intermediateQueue;
+
+    public ReplicaCompactor(Broker broker, ConnectionContext connectionContext, ReplicaReplicationQueueSupplier queueProvider, PrefetchSubscription subscription) {
+        this.broker = broker;
+        this.connectionContext = connectionContext;
+        this.queueProvider = queueProvider;
+        this.subscription = subscription;
+
+        intermediateQueue = broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+                .map(DestinationExtractor::extractQueue).orElseThrow();
+    }
+
+    List<MessageReference> compactAndFilter(List<MessageReference> list, boolean withAdditionalMessages) throws Exception {
+        List<DeliveredMessageReference> toProcess = list.stream()
+                .map(DeliveredMessageReference::new)
+                .collect(Collectors.toList());
+
+        int prefetchSize = subscription.getPrefetchSize();
+        try {
+            if (withAdditionalMessages) {
+                subscription.setPrefetchSize(0);
+                toProcess.addAll(getAdditionalMessages());
+            }
+
+            List<DeliveredMessageReference> processed = compactAndFilter0(toProcess);
+
+            Set<MessageId> messageIds = list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+            return processed.stream()
+                    .map(dmr -> dmr.messageReference)
+                    .filter(mr -> messageIds.contains(mr.getMessageId()))
+                    .collect(Collectors.toList());
+        } finally {
+            subscription.setPrefetchSize(prefetchSize);
+        }
+    }
+
+    private List<DeliveredMessageReference> getAdditionalMessages() throws Exception {
+        List<DeliveredMessageReference> result = new ArrayList<>();
+        List<QueueMessageReference> additionalMessages = intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR, MAXIMUM_MESSAGES);

Review Comment:
   Using a selector here means you're doing a full scan of the queue every time. That seems expensive!
   



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+    private static final Logger logger = LoggerFactory.getLogger(ReplicaCompactor.class);
+    private static final String CONSUMER_SELECTOR = String.format("%s LIKE '%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+    public static final int MAXIMUM_MESSAGES = 1_000;
+
+    private final Broker broker;
+    private final ConnectionContext connectionContext;
+    private final ReplicaReplicationQueueSupplier queueProvider;
+    private final PrefetchSubscription subscription;
+
+    private final Queue intermediateQueue;
+
+    public ReplicaCompactor(Broker broker, ConnectionContext connectionContext, ReplicaReplicationQueueSupplier queueProvider, PrefetchSubscription subscription) {
+        this.broker = broker;
+        this.connectionContext = connectionContext;
+        this.queueProvider = queueProvider;
+        this.subscription = subscription;
+
+        intermediateQueue = broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+                .map(DestinationExtractor::extractQueue).orElseThrow();
+    }
+
+    List<MessageReference> compactAndFilter(List<MessageReference> list, boolean withAdditionalMessages) throws Exception {
+        List<DeliveredMessageReference> toProcess = list.stream()
+                .map(DeliveredMessageReference::new)
+                .collect(Collectors.toList());
+
+        int prefetchSize = subscription.getPrefetchSize();
+        try {
+            if (withAdditionalMessages) {
+                subscription.setPrefetchSize(0);
+                toProcess.addAll(getAdditionalMessages());
+            }
+
+            List<DeliveredMessageReference> processed = compactAndFilter0(toProcess);
+
+            Set<MessageId> messageIds = list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+            return processed.stream()
+                    .map(dmr -> dmr.messageReference)
+                    .filter(mr -> messageIds.contains(mr.getMessageId()))
+                    .collect(Collectors.toList());
+        } finally {
+            subscription.setPrefetchSize(prefetchSize);
+        }
+    }
+
+    private List<DeliveredMessageReference> getAdditionalMessages() throws Exception {
+        List<DeliveredMessageReference> result = new ArrayList<>();
+        List<QueueMessageReference> additionalMessages = intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR, MAXIMUM_MESSAGES);
+        if (additionalMessages.isEmpty()) {
+            return result;
+        }
+
+        String selector = String.format("%s IN %s", ReplicaSupport.MESSAGE_ID_PROPERTY, getAckedMessageIds(additionalMessages));
+        additionalMessages.addAll(intermediateQueue.getMatchingMessages(connectionContext, selector, MAXIMUM_MESSAGES));
+
+        Set<MessageId> dispatchedMessageIds = subscription.getDispatched().stream()
+                .map(MessageReference::getMessageId)
+                .collect(Collectors.toSet());
+
+        for (MessageReference messageReference : additionalMessages) {
+            if (!dispatchedMessageIds.contains(messageReference.getMessageId())) {
+                result.add(new DeliveredMessageReference(messageReference, false));
+            }
+        }
+
+        return result;
+    }
+
+    private List<DeliveredMessageReference> compactAndFilter0(List<DeliveredMessageReference> list) throws Exception {
+        List<DeliveredMessageReference> result = new ArrayList<>(list);
+
+        List<Destination> destinations = combineByDestination(list);
+
+        List<DeliveredMessageId> toDelete = compact(destinations);
+
+        if (toDelete.isEmpty()) {
+            return result;
+        }
+
+        acknowledge(toDelete);
+
+        List<MessageId> messageIds = toDelete.stream().map(dmid -> dmid.messageId).collect(Collectors.toList());

Review Comment:
   Use a set? 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBroker.java:
##########
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.lang.reflect.Method;
+import java.text.MessageFormat;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaBroker extends BrokerFilter {
+
+    private final static long REPLICA_ACK_PERIOD = 5_000;
+
+    private final Logger logger = LoggerFactory.getLogger(ReplicaBroker.class);
+    private final ScheduledExecutorService brokerConnectionPoller = Executors.newSingleThreadScheduledExecutor();
+    private final ScheduledExecutorService periodicAckPoller = Executors.newSingleThreadScheduledExecutor();
+    private final AtomicBoolean isConnecting = new AtomicBoolean();
+    private final AtomicReference<ActiveMQConnection> connection = new AtomicReference<>();
+    private final AtomicReference<ActiveMQSession> connectionSession = new AtomicReference<>();
+    private final AtomicReference<ActiveMQMessageConsumer> eventConsumer = new AtomicReference<>();
+    private final ReplicaReplicationQueueSupplier queueProvider;
+    private final ActiveMQConnectionFactory replicaSourceConnectionFactory;
+    private final PeriodAcknowledge periodAcknowledgeCallBack;
+
+    public ReplicaBroker(Broker next, ReplicaReplicationQueueSupplier queueProvider, ActiveMQConnectionFactory replicaSourceConnectionFactory) {
+        super(next);
+        this.queueProvider = queueProvider;
+        this.periodAcknowledgeCallBack = new PeriodAcknowledge(REPLICA_ACK_PERIOD);
+        this.replicaSourceConnectionFactory = requireNonNull(replicaSourceConnectionFactory, "Need connection details of replica source for this broker");
+        requireNonNull(replicaSourceConnectionFactory.getBrokerURL(), "Need connection URI of replica source for this broker");
+        validateUser(replicaSourceConnectionFactory);
+    }
+
+    private void validateUser(ActiveMQConnectionFactory replicaSourceConnectionFactory) {
+        if (replicaSourceConnectionFactory.getUserName() != null) {
+            requireNonNull(replicaSourceConnectionFactory.getPassword(), "Both userName and password or none of them should be configured for replica broker");
+        }
+        if (replicaSourceConnectionFactory.getPassword() != null) {
+            requireNonNull(replicaSourceConnectionFactory.getUserName(), "Both userName and password or none of them should be configured for replica broker");
+        }
+    }
+
+    @Override
+    public void start() throws Exception {
+        super.start();
+        queueProvider.initializeSequenceQueue();
+        brokerConnectionPoller.scheduleAtFixedRate(this::beginReplicationIdempotent, 5, 5, TimeUnit.SECONDS);

Review Comment:
   Is 5 seconds meaningful here? Can you use `REPLICA_ACK_PERIOD`? 



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+    private static final Logger logger = LoggerFactory.getLogger(ReplicaCompactor.class);
+    private static final String CONSUMER_SELECTOR = String.format("%s LIKE '%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+    public static final int MAXIMUM_MESSAGES = 1_000;

Review Comment:
   Configurable? 
   



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBatcher.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ReplicaBatcher {
+
+    static final int MAX_BATCH_LENGTH = 500;
+    static final int MAX_BATCH_SIZE = 5_000_000; // 5 Mb
+
+    @SuppressWarnings("unchecked")
+    static List<List<MessageReference>> batches(List<MessageReference> list) throws Exception {
+        List<List<MessageReference>> result = new ArrayList<>();
+
+        Map<String, Set<String>> destination2eventType = new HashMap<>();

Review Comment:
   destinationToeventType?



##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRole.java:
##########
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+public enum ReplicaRole {
+    source, replica, dual

Review Comment:
   Is replica used? It looks like only source and dual are supported in `ReplicaPlugin`



##########
activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaCompactorTest.java:
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ReplicaCompactorTest {
+
+    private final ConnectionContext connectionContext = mock(ConnectionContext.class);
+    private final Broker broker = mock(Broker.class);
+    private final ReplicaReplicationQueueSupplier queueProvider = mock(ReplicaReplicationQueueSupplier.class);
+    private final MessageStore messageStore = mock(MessageStore.class);
+
+    private final ActiveMQQueue intermediateQueueDestination = new ActiveMQQueue(ReplicaSupport.INTERMEDIATE_REPLICATION_QUEUE_NAME);
+    private final Queue intermediateQueue = mock(Queue.class);
+
+    private ReplicaCompactor replicaCompactor;
+
+    @Before
+    public void setUp() throws Exception {
+        ConnectionContext adminConnectionContext = mock(ConnectionContext.class);
+        when(adminConnectionContext.copy()).thenReturn(connectionContext);
+        when(broker.getAdminConnectionContext()).thenReturn(adminConnectionContext);
+
+        when(queueProvider.getIntermediateQueue()).thenReturn(intermediateQueueDestination);
+        when(broker.getDestinations(intermediateQueueDestination)).thenReturn(Set.of(intermediateQueue));
+        when(intermediateQueue.getMessageStore()).thenReturn(messageStore);
+
+        ConsumerInfo consumerInfo = new ConsumerInfo();
+        PrefetchSubscription originalSubscription = mock(PrefetchSubscription.class);
+        when(originalSubscription.getConsumerInfo()).thenReturn(consumerInfo);
+
+        replicaCompactor = new ReplicaCompactor(broker, connectionContext, queueProvider, originalSubscription);
+    }
+
+    @Test
+    public void compactWhenSendAndAck() throws Exception {
+        MessageId messageId1 = new MessageId("1:0:0:1");
+        MessageId messageId2 = new MessageId("1:0:0:2");
+        MessageId messageId3 = new MessageId("1:0:0:3");
+
+        String messageIdToAck = "2:1";
+
+        ActiveMQMessage message1 = new ActiveMQMessage();
+        message1.setMessageId(messageId1);
+        message1.setBooleanProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY, true);
+        message1.setStringProperty(ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_SEND.toString());
+        message1.setStringProperty(ReplicaSupport.MESSAGE_ID_PROPERTY, messageIdToAck);

Review Comment:
   Why is there a seperate ReplicaSupport.MESSAGE_ID_PROPERTY? Why not just use the message id you're already setting? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq] michaelandrepearce commented on pull request #848: Feature/replica broker

Posted by GitBox <gi...@apache.org>.
michaelandrepearce commented on PR #848:
URL: https://github.com/apache/activemq/pull/848#issuecomment-1371719963

   It's important to gain support from community,  committers and pmc on dev list first for the feature. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org