You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ar...@apache.org on 2023/01/19 09:34:18 UTC

[fineract] branch develop updated: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

This is an automated email from the ASF dual-hosted git repository.

arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7d28646c7 FINERACT-1724: Implemented support for multiple parallel JMS producers for external events
7d28646c7 is described below

commit 7d28646c7d97088c9e89d11a394333a2d26ed50f
Author: Arnold Galovics <ga...@gmail.com>
AuthorDate: Tue Jan 17 12:16:43 2023 +0100

    FINERACT-1724: Implemented support for multiple parallel JMS producers for external events
---
 .../LoanAccountsStayedLockedBusinessEvent.java     |   5 +
 .../core/config/FineractProperties.java            |   1 +
 .../messaging/jms/ActiveMQMessageFactory.java}     |  23 ++-
 .../messaging/jms/MessageFactory.java}             |  11 +-
 .../service/HashingService.java}                   |  14 +-
 .../infrastructure/core/service/MeasuringUtil.java |  50 ++++++
 .../event/business/domain/BulkBusinessEvent.java   |  17 ++
 .../event/business/domain/BusinessEvent.java       |   2 +
 .../domain/client/ClientBusinessEvent.java         |   5 +
 .../deposit/FixedDepositAccountBusinessEvent.java  |   5 +
 .../RecurringDepositAccountBusinessEvent.java      |   5 +
 .../business/domain/group/GroupsBusinessEvent.java |   5 +
 .../loan/LoanAdjustTransactionBusinessEvent.java   |   5 +
 .../business/domain/loan/LoanBusinessEvent.java    |   5 +
 .../loan/charge/LoanChargeBusinessEvent.java       |   5 +
 .../loan/product/LoanProductBusinessEvent.java     |   5 +
 .../loan/repayment/LoanRepaymentBusinessEvent.java |   5 +
 .../transaction/LoanTransactionBusinessEvent.java  |   5 +
 .../savings/SavingsAccountBusinessEvent.java       |   5 +
 .../SavingsAccountTransactionBusinessEvent.java    |   5 +
 .../domain/share/ShareAccountBusinessEvent.java    |   5 +
 .../ShareProductDividentsCreateBusinessEvent.java  |   5 +
 ...ion.java => ExternalEventJMSConfiguration.java} |  12 +-
 .../ExternalEventJMSProducerConfiguration.java     |  55 ------
 .../config/ExternalEventProducerConfiguration.java |  36 ----
 .../jobs/SendAsynchronousEventsConfig.java         |   3 +-
 .../jobs/SendAsynchronousEventsTasklet.java        |  82 +++++++--
 .../external/producer/ExternalEventProducer.java   |  10 +-
 ...cerImpl.java => NoopExternalEventProducer.java} |  16 +-
 .../jms/JMSMultiExternalEventProducer.java         | 153 ++++++++++++++++
 .../repository/ExternalEventRepository.java        |   8 +-
 .../external/repository/domain/ExternalEvent.java  |   5 +-
 .../repository/domain/ExternalEventView.java}      |  25 ++-
 .../external/service/ExternalEventService.java     |   5 +-
 .../external/service/message/MessageFactory.java   |   4 +-
 .../infrastructure/jobs/service/StepName.java      |   2 +-
 .../src/main/resources/application.properties      |   1 +
 .../db/changelog/tenant/changelog-tenant.xml       |   1 +
 .../0085_add_aggregate_root_id_external_events.xml |  32 ++++
 .../core/service/HashingServiceTest.java           |  62 +++++++
 .../business/domain/BulkBusinessEventTest.java     |  64 +++++++
 .../BusinessEventNotifierServiceImplTest.java      |   5 +
 .../jobs/SendAsynchronousEventsTaskletTest.java    |  82 ++++++---
 .../producer/EventsJMSIntegrationTest.java         | 120 -------------
 .../jms/JMSMultiExternalEventProducerTest.java     | 200 +++++++++++++++++++++
 lombok.config                                      |   4 +-
 46 files changed, 882 insertions(+), 298 deletions(-)

diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanAccountsStayedLockedBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanAccountsStayedLockedBusinessEvent.java
index 274210414..d9280ae56 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanAccountsStayedLockedBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanAccountsStayedLockedBusinessEvent.java
@@ -39,4 +39,9 @@ public class LoanAccountsStayedLockedBusinessEvent extends AbstractBusinessEvent
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return null;
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index b28722c43..99b50c5fd 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -165,6 +165,7 @@ public class FineractProperties {
         private String brokerUrl;
         private String brokerUsername;
         private String brokerPassword;
+        private int producerCount;
 
         public boolean isBrokerPasswordProtected() {
             return StringUtils.isNotBlank(brokerUsername) || StringUtils.isNotBlank(brokerPassword);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/ActiveMQMessageFactory.java
similarity index 62%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/ActiveMQMessageFactory.java
index 1ff3eafd5..cc982c8c0 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/ActiveMQMessageFactory.java
@@ -16,21 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.business.domain.loan;
+package org.apache.fineract.infrastructure.core.messaging.jms;
 
-import org.apache.fineract.infrastructure.event.business.domain.AbstractBusinessEvent;
-import org.apache.fineract.portfolio.loanaccount.domain.Loan;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.springframework.stereotype.Component;
 
-public abstract class LoanBusinessEvent extends AbstractBusinessEvent<Loan> {
-
-    private static final String CATEGORY = "Loan";
-
-    public LoanBusinessEvent(Loan value) {
-        super(value);
-    }
+@Component
+public class ActiveMQMessageFactory implements MessageFactory {
 
     @Override
-    public String getCategory() {
-        return CATEGORY;
+    public BytesMessage createByteMessage(byte[] msg) throws JMSException {
+        ActiveMQBytesMessage result = new ActiveMQBytesMessage();
+        result.writeBytes(msg);
+        return result;
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/MessageFactory.java
similarity index 78%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/MessageFactory.java
index c2876066b..1f3ed3927 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/MessageFactory.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.business.domain;
+package org.apache.fineract.infrastructure.core.messaging.jms;
 
-public interface BusinessEvent<T> {
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
 
-    T get();
+public interface MessageFactory {
 
-    String getType();
-
-    String getCategory();
+    BytesMessage createByteMessage(byte[] msg) throws JMSException;
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/HashingService.java
similarity index 72%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/HashingService.java
index c2876066b..148ad7980 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/HashingService.java
@@ -16,13 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.business.domain;
+package org.apache.fineract.infrastructure.core.service;
 
-public interface BusinessEvent<T> {
+import com.google.common.hash.Hashing;
+import org.springframework.stereotype.Component;
 
-    T get();
+@Component
+public class HashingService {
 
-    String getType();
-
-    String getCategory();
+    public int consistentHash(long input, int buckets) {
+        return Hashing.consistentHash(input, buckets);
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/MeasuringUtil.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/MeasuringUtil.java
new file mode 100644
index 000000000..29f7f4c6a
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/MeasuringUtil.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.core.service;
+
+import java.time.Duration;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.springframework.util.StopWatch;
+
+public final class MeasuringUtil {
+
+    private MeasuringUtil() {}
+
+    public static void measure(Runnable r, Consumer<Duration> c) {
+        measure(() -> {
+            r.run();
+            return null;
+        }, c);
+    }
+
+    public static <T> T measure(Supplier<T> s, Consumer<Duration> c) {
+        return measure(s, (result, timeTaken) -> c.accept(timeTaken));
+    }
+
+    public static <T> T measure(Supplier<T> s, BiConsumer<T, Duration> c) {
+        StopWatch sw = new StopWatch();
+        sw.start();
+        T result = s.get();
+        sw.stop();
+        c.accept(result, Duration.ofMillis(sw.getTotalTimeMillis()));
+        return result;
+    }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEvent.java
index 32f842dc6..b1e6eae27 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEvent.java
@@ -18,7 +18,11 @@
  */
 package org.apache.fineract.infrastructure.event.business.domain;
 
+import static java.util.stream.Collectors.toSet;
+
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 
 public class BulkBusinessEvent extends AbstractBusinessEvent<List<BusinessEvent<?>>> {
 
@@ -27,6 +31,14 @@ public class BulkBusinessEvent extends AbstractBusinessEvent<List<BusinessEvent<
 
     public BulkBusinessEvent(List<BusinessEvent<?>> value) {
         super(value);
+        verifySameAggregate(value);
+    }
+
+    private void verifySameAggregate(List<BusinessEvent<?>> events) {
+        Set<Long> aggregateRootIds = events.stream().map(BusinessEvent::getAggregateRootId).filter(Objects::nonNull).collect(toSet());
+        if (aggregateRootIds.size() > 1) {
+            throw new IllegalArgumentException("The business events are related to multiple aggregate roots which is not allowed");
+        }
     }
 
     @Override
@@ -38,4 +50,9 @@ public class BulkBusinessEvent extends AbstractBusinessEvent<List<BusinessEvent<
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().iterator().next().getAggregateRootId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
index c2876066b..65c2a9ebb 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
@@ -25,4 +25,6 @@ public interface BusinessEvent<T> {
     String getType();
 
     String getCategory();
+
+    Long getAggregateRootId();
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/client/ClientBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/client/ClientBusinessEvent.java
index 78acd5432..cbe497a3a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/client/ClientBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/client/ClientBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class ClientBusinessEvent extends AbstractBusinessEvent<Client>
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/FixedDepositAccountBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/FixedDepositAccountBusinessEvent.java
index ae38e0ccc..770ccb86f 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/FixedDepositAccountBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/FixedDepositAccountBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class FixedDepositAccountBusinessEvent extends AbstractBusinessE
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/RecurringDepositAccountBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/RecurringDepositAccountBusinessEvent.java
index 1c2b60b53..d2c57ea59 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/RecurringDepositAccountBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/RecurringDepositAccountBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class RecurringDepositAccountBusinessEvent extends AbstractBusin
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/group/GroupsBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/group/GroupsBusinessEvent.java
index 1c0f0030d..70172f48d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/group/GroupsBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/group/GroupsBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class GroupsBusinessEvent extends AbstractBusinessEvent<CommandP
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getResourceId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanAdjustTransactionBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanAdjustTransactionBusinessEvent.java
index dc4b98907..c8629beb4 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanAdjustTransactionBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanAdjustTransactionBusinessEvent.java
@@ -43,6 +43,11 @@ public class LoanAdjustTransactionBusinessEvent extends AbstractBusinessEvent<Lo
         return CATEGORY;
     }
 
+    @Override
+    public Long getAggregateRootId() {
+        return get().getTransactionToAdjust().getLoan().getId();
+    }
+
     @RequiredArgsConstructor
     @Getter
     public static class Data {
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java
index 1ff3eafd5..62efeab55 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class LoanBusinessEvent extends AbstractBusinessEvent<Loan> {
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/charge/LoanChargeBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/charge/LoanChargeBusinessEvent.java
index 71381e6a6..be42639be 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/charge/LoanChargeBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/charge/LoanChargeBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class LoanChargeBusinessEvent extends AbstractBusinessEvent<Loan
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getLoan().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/product/LoanProductBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/product/LoanProductBusinessEvent.java
index 352315b26..46f2d7071 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/product/LoanProductBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/product/LoanProductBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class LoanProductBusinessEvent extends AbstractBusinessEvent<Loa
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/repayment/LoanRepaymentBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/repayment/LoanRepaymentBusinessEvent.java
index 4ae53fad8..a25909ff6 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/repayment/LoanRepaymentBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/repayment/LoanRepaymentBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class LoanRepaymentBusinessEvent extends AbstractBusinessEvent<L
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getLoan().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/transaction/LoanTransactionBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/transaction/LoanTransactionBusinessEvent.java
index 541f302c4..5a6b434b2 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/transaction/LoanTransactionBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/transaction/LoanTransactionBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class LoanTransactionBusinessEvent extends AbstractBusinessEvent
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getLoan().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/SavingsAccountBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/SavingsAccountBusinessEvent.java
index e09b45c98..6cc16da41 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/SavingsAccountBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/SavingsAccountBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class SavingsAccountBusinessEvent extends AbstractBusinessEvent<
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/transaction/SavingsAccountTransactionBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/transaction/SavingsAccountTransactionBusinessEvent.java
index fe56ce09f..5b92a41d8 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/transaction/SavingsAccountTransactionBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/transaction/SavingsAccountTransactionBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class SavingsAccountTransactionBusinessEvent extends AbstractBus
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getSavingsAccount().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareAccountBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareAccountBusinessEvent.java
index b4f90febf..da524deb9 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareAccountBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareAccountBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class ShareAccountBusinessEvent extends AbstractBusinessEvent<Sh
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get().getId();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareProductDividentsCreateBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareProductDividentsCreateBusinessEvent.java
index 0e25d7ece..f790d4dae 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareProductDividentsCreateBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareProductDividentsCreateBusinessEvent.java
@@ -38,4 +38,9 @@ public class ShareProductDividentsCreateBusinessEvent extends AbstractBusinessEv
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override
+    public Long getAggregateRootId() {
+        return get();
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
similarity index 84%
rename from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java
rename to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
index 2f3c89f25..b38f90e58 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
@@ -28,10 +28,11 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Conditional;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 @Configuration
 @ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
-public class ExternalEventJMSBrokerConfiguration {
+public class ExternalEventJMSConfiguration {
 
     @Autowired
     private FineractProperties fineractProperties;
@@ -60,4 +61,13 @@ public class ExternalEventJMSBrokerConfiguration {
     public ActiveMQQueue activeMqQueue() {
         return new ActiveMQQueue(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventQueueName());
     }
+
+    @Bean("externalEventJmsProducerExecutor")
+    public ThreadPoolTaskExecutor externalEventJmsProducerExecutor() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+        threadPoolTaskExecutor.setCorePoolSize(10);
+        threadPoolTaskExecutor.setMaxPoolSize(100);
+        threadPoolTaskExecutor.setThreadNamePrefix("externalEventJms");
+        return threadPoolTaskExecutor;
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java
deleted file mode 100644
index 8ed972584..000000000
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.fineract.infrastructure.event.external.config;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import org.apache.fineract.infrastructure.core.config.FineractProperties;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Import;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.dsl.IntegrationFlow;
-import org.springframework.integration.dsl.IntegrationFlows;
-import org.springframework.integration.jms.dsl.Jms;
-
-@Configuration
-@EnableIntegration
-@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
-@Import(value = { ExternalEventJMSBrokerConfiguration.class })
-public class ExternalEventJMSProducerConfiguration {
-
-    @Autowired
-    private DirectChannel outboundRequestsEvents;
-
-    @Autowired
-    private FineractProperties fineractProperties;
-
-    @Bean
-    public IntegrationFlow outboundFlowEvents(ConnectionFactory connectionFactory,
-            @Qualifier("eventDestination") Destination eventDestination) {
-        return IntegrationFlows.from(outboundRequestsEvents) //
-                .handle(Jms.outboundAdapter(connectionFactory).destination(eventDestination)).get();
-    }
-
-}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java
deleted file mode 100644
index b99275830..000000000
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.fineract.infrastructure.event.external.config;
-
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.config.EnableIntegration;
-
-@Configuration
-@EnableIntegration
-@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "true")
-public class ExternalEventProducerConfiguration {
-
-    @Bean
-    public DirectChannel outboundRequestsEvents() {
-        return new DirectChannel();
-    }
-}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
index 30810773a..55cc4a46d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
@@ -19,6 +19,7 @@
 package org.apache.fineract.infrastructure.event.external.jobs;
 
 import org.apache.fineract.infrastructure.jobs.service.JobName;
+import org.apache.fineract.infrastructure.jobs.service.StepName;
 import org.springframework.batch.core.Job;
 import org.springframework.batch.core.Step;
 import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
@@ -40,7 +41,7 @@ public class SendAsynchronousEventsConfig {
 
     @Bean
     protected Step sendAsynchronousEventsStep() {
-        return steps.get(JobName.SEND_ASYNCHRONOUS_EVENTS.name()).tasklet(tasklet).build();
+        return steps.get(StepName.SEND_ASYNCHRONOUS_EVENTS_STEP.name()).tasklet(tasklet).build();
     }
 
     @Bean
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
index 289f20591..35733e664 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
@@ -18,8 +18,17 @@
  */
 package org.apache.fineract.infrastructure.event.external.jobs;
 
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.fineract.infrastructure.core.service.MeasuringUtil.measure;
+
+import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.fineract.avro.MessageV1;
@@ -28,8 +37,8 @@ import org.apache.fineract.infrastructure.core.config.FineractProperties;
 import org.apache.fineract.infrastructure.core.service.DateUtils;
 import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
 import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
-import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
 import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
 import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
 import org.springframework.batch.core.StepContribution;
@@ -56,8 +65,9 @@ public class SendAsynchronousEventsTasklet implements Tasklet {
     public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
         try {
             if (isDownstreamChannelEnabled()) {
-                List<ExternalEvent> events = getQueuedEventsBatch();
-                processEvents(events);
+                List<ExternalEventView> events = getQueuedEventsBatch();
+                log.debug("Queued events size: {}", events.size());
+                sendEvents(events);
             }
         } catch (Exception e) {
             log.error("Error occurred while processing events: ", e);
@@ -69,20 +79,66 @@ public class SendAsynchronousEventsTasklet implements Tasklet {
         return fineractProperties.getEvents().getExternal().getProducer().getJms().isEnabled();
     }
 
-    private List<ExternalEvent> getQueuedEventsBatch() {
+    private List<ExternalEventView> getQueuedEventsBatch() {
         int readBatchSize = getBatchSize();
         Pageable batchSize = PageRequest.ofSize(readBatchSize);
-        return repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize);
+        return measure(() -> repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize),
+                (events, timeTaken) -> log.debug("Loaded {} events in {}ms", events.size(), timeTaken.toMillis()));
+    }
+
+    private void sendEvents(List<ExternalEventView> queuedEvents) {
+        Map<Long, List<byte[]>> partitions = generatePartitions(queuedEvents);
+        List<Long> eventIds = queuedEvents.stream().map(ExternalEventView::getId).toList();
+        sendEventsToProducer(partitions);
+        markEventsAsSent(eventIds);
     }
 
-    private void processEvents(List<ExternalEvent> queuedEvents) throws IOException {
-        for (ExternalEvent event : queuedEvents) {
-            MessageV1 message = messageFactory.createMessage(event);
-            byte[] byteMessage = byteBufferConverter.convert(message.toByteBuffer());
-            eventProducer.sendEvent(byteMessage);
-            event.setStatus(ExternalEventStatus.SENT);
-            event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
-            repository.save(event);
+    private void sendEventsToProducer(Map<Long, List<byte[]>> partitions) {
+        eventProducer.sendEvents(partitions);
+    }
+
+    private void markEventsAsSent(List<Long> eventIds) {
+        OffsetDateTime sentAt = DateUtils.getOffsetDateTimeOfTenant();
+
+        // Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters
+        List<List<Long>> partitions = Lists.partition(eventIds, 5_000);
+        partitions.forEach(partitionedEventIds -> {
+            measure(() -> {
+                repository.markEventsSent(partitionedEventIds, sentAt);
+            }, timeTaken -> {
+                log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
+            });
+        });
+    }
+
+    private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView> queuedEvents) {
+        Map<Long, List<ExternalEventView>> initialPartitions = queuedEvents.stream().collect(groupingBy(externalEvent -> {
+            Long aggregateRootId = externalEvent.getAggregateRootId();
+            if (aggregateRootId == null) {
+                aggregateRootId = -1L;
+            }
+            return aggregateRootId;
+        }));
+        Map<Long, List<byte[]>> partitions = measure(
+                () -> initialPartitions.entrySet().stream().collect(toMap(Map.Entry::getKey, e -> createMessages(e.getValue()))),
+                timeTaken -> {
+                    log.debug("Took {}ms to create message partitions", timeTaken.toMillis());
+                });
+        return partitions;
+    }
+
+    private List<byte[]> createMessages(List<ExternalEventView> events) {
+        try {
+            List<byte[]> messages = new ArrayList<>();
+            for (ExternalEventView event : events) {
+                MessageV1 message = messageFactory.createMessage(event);
+                ByteBuffer toByteBuffer = message.toByteBuffer();
+                byte[] convert = byteBufferConverter.convert(toByteBuffer);
+                messages.add(convert);
+            }
+            return messages;
+        } catch (IOException e) {
+            throw new RuntimeException("Error while serializing the message", e);
         }
     }
 
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
index daf5b0151..14dd60ce5 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
@@ -18,15 +18,11 @@
  */
 package org.apache.fineract.infrastructure.event.external.producer;
 
+import java.util.List;
+import java.util.Map;
 import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.integration.annotation.Gateway;
-import org.springframework.integration.annotation.MessagingGateway;
 
-@MessagingGateway(name = "externalEventGateway")
-@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "true")
 public interface ExternalEventProducer {
 
-    @Gateway(requestChannel = "outboundRequestsEvents", replyTimeout = 2, requestTimeout = 200)
-    void sendEvent(byte[] message) throws AcknowledgementTimeoutException;
+    void sendEvents(Map<Long, List<byte[]>> partitions) throws AcknowledgementTimeoutException;
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/NoopExternalEventProducer.java
similarity index 71%
rename from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java
rename to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/NoopExternalEventProducer.java
index 4f60f0a31..8e6227eb1 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/NoopExternalEventProducer.java
@@ -18,16 +18,18 @@
  */
 package org.apache.fineract.infrastructure.event.external.producer;
 
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Service;
+import org.springframework.stereotype.Component;
 
-@Service
-@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "false")
-public class DummyExternalEventProducerImpl implements ExternalEventProducer {
+@Component
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "false")
+@Slf4j
+public class NoopExternalEventProducer implements ExternalEventProducer {
 
     @Override
-    public void sendEvent(byte[] message) throws AcknowledgementTimeoutException {
-        return;
-    }
+    public void sendEvents(Map<Long, List<byte[]>> messages) throws AcknowledgementTimeoutException {}
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
new file mode 100644
index 000000000..e063e44f4
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer.jms;
+
+import static org.apache.fineract.infrastructure.core.service.MeasuringUtil.measure;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
+import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.core.task.AsyncTaskExecutor;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
+public class JMSMultiExternalEventProducer implements ExternalEventProducer, InitializingBean {
+
+    @Qualifier("eventDestination")
+    private final Destination destination;
+
+    private final ConnectionFactory connectionFactory;
+
+    private final MessageFactory messageFactory;
+
+    @Qualifier("externalEventJmsProducerExecutor")
+    private final AsyncTaskExecutor taskExecutor;
+
+    private final HashingService hashingService;
+
+    private final FineractProperties fineractProperties;
+
+    private final List<MessageProducer> producers = new ArrayList<>();
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        int producerCount = getProducerCount();
+        try (Connection connection = connectionFactory.createConnection()) {
+            for (int i = 0; i < producerCount; i++) {
+                // It's crucial to create the session within the loop, otherwise the producers won't be handled as
+                // parallel
+                // producers
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(destination);
+                producers.add(producer);
+            }
+        }
+        log.info("Initialized JMS multi producer for external events with {} parallel producers", producerCount);
+    }
+
+    private int getProducerCount() {
+        return fineractProperties.getEvents().getExternal().getProducer().getJms().getProducerCount();
+    }
+
+    @Override
+    public void sendEvents(Map<Long, List<byte[]>> partitions) throws AcknowledgementTimeoutException {
+        Map<Integer, List<byte[]>> indexedPartitions = mapPartitionsToProducers(partitions);
+        measure(() -> {
+            List<Future<?>> tasks = sendPartitions(indexedPartitions);
+            waitForSendingCompletion(tasks);
+        }, timeTaken -> {
+            if (log.isDebugEnabled()) {
+                // in case execution is faster than 1sec
+                long seconds = Math.max(timeTaken.toSeconds(), 1L);
+                Integer eventCount = partitions.values().stream().map(Collection::size).reduce(0, Integer::sum);
+                log.debug("Sent messages with {} msg/s", (eventCount / seconds));
+            }
+        });
+    }
+
+    private List<Future<?>> sendPartitions(Map<Integer, List<byte[]>> indexedPartitions) {
+        List<Future<?>> tasks = new ArrayList<>();
+        for (Map.Entry<Integer, List<byte[]>> entry : indexedPartitions.entrySet()) {
+            Integer producerIndex = entry.getKey();
+            List<byte[]> messages = entry.getValue();
+            Future<?> future = createSendingTask(producerIndex, messages);
+            tasks.add(future);
+        }
+        return tasks;
+    }
+
+    private Future<?> createSendingTask(Integer producerIndex, List<byte[]> messages) {
+        return taskExecutor.submit(() -> {
+            MessageProducer messageProducer = producers.get(producerIndex);
+
+            for (byte[] message : messages) {
+                try {
+                    messageProducer.send(destination, messageFactory.createByteMessage(message));
+                } catch (JMSException e) {
+                    throw new RuntimeException("Error while sending the message", e);
+                }
+            }
+        });
+    }
+
+    private Map<Integer, List<byte[]>> mapPartitionsToProducers(Map<Long, List<byte[]>> partitions) {
+        Map<Integer, List<byte[]>> indexedPartitions = new HashMap<>();
+        for (Map.Entry<Long, List<byte[]>> partition : partitions.entrySet()) {
+            Long key = partition.getKey();
+            List<byte[]> messages = partition.getValue();
+
+            int producerIndex = hashingService.consistentHash(key, getProducerCount());
+            indexedPartitions.putIfAbsent(producerIndex, new ArrayList<>());
+            indexedPartitions.get(producerIndex).addAll(messages);
+        }
+        return indexedPartitions;
+    }
+
+    private void waitForSendingCompletion(List<Future<?>> tasks) {
+        try {
+            for (Future<?> task : tasks) {
+                task.get();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
index 4a04bfa84..648e22565 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
@@ -19,9 +19,11 @@
 package org.apache.fineract.infrastructure.event.external.repository;
 
 import java.time.LocalDate;
+import java.time.OffsetDateTime;
 import java.util.List;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.Modifying;
@@ -29,9 +31,13 @@ import org.springframework.data.jpa.repository.Query;
 
 public interface ExternalEventRepository extends JpaRepository<ExternalEvent, Long> {
 
-    List<ExternalEvent> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
+    List<ExternalEventView> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
 
     @Modifying(flushAutomatically = true)
     @Query("delete from ExternalEvent e where e.status = :status and e.businessDate <= :dateForPurgeCriteria")
     void deleteOlderEventsWithSentStatus(ExternalEventStatus status, LocalDate dateForPurgeCriteria);
+
+    @Modifying
+    @Query("UPDATE ExternalEvent e SET e.status = org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus.SENT, e.sentAt = :sentAt WHERE e.id IN :ids")
+    void markEventsSent(List<Long> ids, OffsetDateTime sentAt);
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEvent.java
index e237a38e7..2ab426a55 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEvent.java
@@ -70,7 +70,10 @@ public class ExternalEvent extends AbstractPersistableCustom {
     @Column(name = "business_date", nullable = false)
     private LocalDate businessDate;
 
-    public ExternalEvent(String type, String category, String schema, byte[] data, String idempotencyKey) {
+    @Column(name = "aggregate_root_id", nullable = true)
+    private Long aggregateRootId;
+
+    public ExternalEvent(String type, String category, String schema, byte[] data, String idempotencyKey, Long aggregateRootId) {
         this.type = type;
         this.category = category;
         this.schema = schema;
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEventView.java
similarity index 65%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEventView.java
index c2876066b..4c7b43caf 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEventView.java
@@ -16,13 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.business.domain;
+package org.apache.fineract.infrastructure.event.external.repository.domain;
 
-public interface BusinessEvent<T> {
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
 
-    T get();
+public interface ExternalEventView {
+
+    Long getId();
 
     String getType();
 
     String getCategory();
+
+    String getSchema();
+
+    byte[] getData();
+
+    OffsetDateTime getCreatedAt();
+
+    ExternalEventStatus getStatus();
+
+    OffsetDateTime getSentAt();
+
+    String getIdempotencyKey();
+
+    LocalDate getBusinessDate();
+
+    Long getAggregateRootId();
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
index b158eee6b..c3c3690b4 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
@@ -85,7 +85,7 @@ public class ExternalEventService {
         byte[] data = byteBufferConverter.convert(avroDto.toByteBuffer());
 
         return new ExternalEvent(bulkBusinessEvent.getType(), bulkBusinessEvent.getCategory(), BulkMessagePayloadV1.class.getName(), data,
-                idempotencyKey);
+                idempotencyKey, bulkBusinessEvent.getAggregateRootId());
     }
 
     private <T> ExternalEvent handleRegularBusinessEvent(BusinessEvent<T> event) throws IOException {
@@ -95,8 +95,9 @@ public class ExternalEventService {
         BusinessEventSerializer serializer = serializerFactory.create(event);
         String schema = serializer.getSupportedSchema().getName();
         byte[] data = serializer.serialize(event);
+        Long aggregateRootId = event.getAggregateRootId();
 
-        return new ExternalEvent(eventType, eventCategory, schema, data, idempotencyKey);
+        return new ExternalEvent(eventType, eventCategory, schema, data, idempotencyKey, aggregateRootId);
     }
 
     private void flushChangesBeforeSerialization() {
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
index 2218cd458..389a1ab63 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
@@ -27,7 +27,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.fineract.avro.MessageV1;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
-import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
 import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageBusinessDate;
 import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageCategory;
 import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageCreatedAt;
@@ -66,7 +66,7 @@ public class MessageFactory implements InitializingBean {
         return result;
     }
 
-    public MessageV1 createMessage(ExternalEvent event) {
+    public MessageV1 createMessage(ExternalEventView event) {
         MessageId id = new MessageId(event.getId().intValue());
         MessageSource source = new MessageSource(SOURCE_UUID);
         MessageType type = new MessageType(event.getType());
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StepName.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StepName.java
index 2dbac082c..bdf0abc16 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StepName.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StepName.java
@@ -19,5 +19,5 @@
 package org.apache.fineract.infrastructure.jobs.service;
 
 public enum StepName {
-    PURGE_PROCESSED_COMMANDS_STEP;
+    PURGE_PROCESSED_COMMANDS_STEP, SEND_ASYNCHRONOUS_EVENTS_STEP
 }
diff --git a/fineract-provider/src/main/resources/application.properties b/fineract-provider/src/main/resources/application.properties
index 385400e3c..a322b44b4 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -62,6 +62,7 @@ fineract.events.external.producer.jms.event-topic-name=${FINERACT_EXTERNAL_EVENT
 fineract.events.external.producer.jms.broker-url=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL:tcp://127.0.0.1:61616}
 fineract.events.external.producer.jms.broker-username=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_USERNAME:}
 fineract.events.external.producer.jms.broker-password=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_PASSWORD:}
+fineract.events.external.producer.jms.producer-count=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT:1}
 
 fineract.idempotency-key-header-name=${FINERACT_IDEMPOTENCY_KEY_HEADER_NAME:Idempotency-Key}
 
diff --git a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
index 932f1fec5..d81bba3f1 100644
--- a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
+++ b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
@@ -104,4 +104,5 @@
     <include file="parts/0082_add_external_event_default_configuration.xml" relativeToChangelogFile="true" />
     <include file="parts/0083_add_loan_transaction_enum_values.xml" relativeToChangelogFile="true" />
     <include file="parts/0084_add_general_accounting_table_reports.xml" relativeToChangelogFile="true" />
+    <include file="parts/0085_add_aggregate_root_id_external_events.xml" relativeToChangelogFile="true" />
 </databaseChangeLog>
diff --git a/fineract-provider/src/main/resources/db/changelog/tenant/parts/0085_add_aggregate_root_id_external_events.xml b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0085_add_aggregate_root_id_external_events.xml
new file mode 100644
index 000000000..906fc50b3
--- /dev/null
+++ b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0085_add_aggregate_root_id_external_events.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
+                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.3.xsd">
+    <changeSet author="fineract" id="1">
+        <addColumn tableName="m_external_event">
+            <column name="aggregate_root_id" type="BIGINT">
+                <constraints nullable="true"/>
+            </column>
+        </addColumn>
+    </changeSet>
+</databaseChangeLog>
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/service/HashingServiceTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/service/HashingServiceTest.java
new file mode 100644
index 000000000..305e32fc6
--- /dev/null
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/service/HashingServiceTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.core.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Random;
+import org.junit.jupiter.api.Test;
+
+class HashingServiceTest {
+
+    private final Random rnd = new Random();
+
+    private HashingService underTest = new HashingService();
+
+    @Test
+    public void testConsistentHashGeneratesHashesConsistentlyForTheSameValue() {
+        // given
+        int initialResult = underTest.consistentHash(1L, 20);
+        // when & then
+        for (int i = 0; i < 20_000; i++) {
+            int result = underTest.consistentHash(1L, 20);
+            assertThat(result).isEqualTo(initialResult);
+        }
+    }
+
+    @Test
+    public void testConsistentHashWorksForNegativeValues() {
+        // given
+        // when
+        int result = underTest.consistentHash(-1L, 20);
+        // then
+        assertThat(result).isBetween(0, 19);
+    }
+
+    @Test
+    public void testConsistentHashGeneratesHashesWithinBuckets() {
+        // given
+        int buckets = 10;
+        // when & then
+        for (int i = 0; i < 20_000; i++) {
+            int result = underTest.consistentHash(rnd.nextLong(), buckets);
+            assertThat(result).isLessThan(buckets);
+        }
+    }
+}
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEventTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEventTest.java
new file mode 100644
index 000000000..e2eb0c389
--- /dev/null
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEventTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.business.domain;
+
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import org.apache.fineract.infrastructure.event.business.domain.client.ClientActivateBusinessEvent;
+import org.apache.fineract.infrastructure.event.business.domain.client.ClientCreateBusinessEvent;
+import org.apache.fineract.portfolio.client.domain.Client;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class BulkBusinessEventTest {
+
+    @Test
+    public void testConstructorWorksForSameAggregateId() {
+        // given
+        // when
+        new BulkBusinessEvent(List.of(new ClientCreateBusinessEvent(client(1L)), new ClientActivateBusinessEvent(client(1L))));
+        // then no exception thrown
+    }
+
+    @Test
+    public void testConstructorThrowsExceptionForDifferentAggregateId() {
+        // given
+        // when
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new BulkBusinessEvent(
+                List.of(new ClientCreateBusinessEvent(client(1L)), new ClientActivateBusinessEvent(client(2L)))));
+        // then no exception thrown
+    }
+
+    @Test
+    public void testConstructorWorksForNullAggregateId() {
+        // given
+        // when
+        new BulkBusinessEvent(List.of(new ClientCreateBusinessEvent(client(1L)), new ClientActivateBusinessEvent(client(null))));
+        // then no exception thrown
+    }
+
+    private Client client(Long id) {
+        Client client = mock(Client.class);
+        given(client.getId()).willReturn(id);
+        return client;
+    }
+
+}
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/service/BusinessEventNotifierServiceImplTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/service/BusinessEventNotifierServiceImplTest.java
index 81da91e2f..56984cc84 100644
--- a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/service/BusinessEventNotifierServiceImplTest.java
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/service/BusinessEventNotifierServiceImplTest.java
@@ -226,6 +226,11 @@ class BusinessEventNotifierServiceImplTest {
         public String getCategory() {
             return null;
         }
+
+        @Override
+        public Long getAggregateRootId() {
+            return null;
+        }
     }
 
 }
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
index 0fdfca0fd..f505856d9 100644
--- a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
@@ -20,7 +20,6 @@ package org.apache.fineract.infrastructure.event.external.jobs;
 
 import static com.google.common.truth.Truth.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -34,6 +33,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import org.apache.fineract.avro.MessageV1;
 import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
 import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
@@ -43,8 +43,7 @@ import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
 import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
 import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
 import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
-import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
-import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
 import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
 import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
 import org.junit.jupiter.api.BeforeEach;
@@ -54,12 +53,15 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
 import org.springframework.batch.core.StepContribution;
 import org.springframework.batch.core.scope.context.ChunkContext;
 import org.springframework.batch.repeat.RepeatStatus;
 import org.springframework.data.domain.Pageable;
 
 @ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
 class SendAsynchronousEventsTaskletTest {
 
     @Mock
@@ -81,6 +83,8 @@ class SendAsynchronousEventsTaskletTest {
     private SendAsynchronousEventsTasklet underTest;
     private RepeatStatus resultStatus;
 
+    private Random rnd = new Random();
+
     @BeforeEach
     public void setUp() {
         ThreadLocalContextUtil.setTenant(new FineractPlatformTenant(1L, "default", "Default", "Asia/Kolkata", null));
@@ -108,8 +112,9 @@ class SendAsynchronousEventsTaskletTest {
     @Test
     public void givenBatchSize2WhenTaskExecutionThenSend2Events() throws Exception {
         // given
-        List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey"),
-                new ExternalEvent("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey"));
+        List<ExternalEventView> events = Arrays.asList(
+                createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L),
+                createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L));
         // Dummy Message
         MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
                 "anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
@@ -117,64 +122,95 @@ class SendAsynchronousEventsTaskletTest {
         when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
         when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
         when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
-        doNothing().when(eventProducer).sendEvent(Mockito.any());
         // when
-        resultStatus = this.underTest.execute(stepContribution, chunkContext);
+        resultStatus = underTest.execute(stepContribution, chunkContext);
         // then
-        verify(eventProducer, times(2)).sendEvent(new byte[0]);
-        verify(repository, times(2)).save(Mockito.any(ExternalEvent.class));
+        verify(eventProducer).sendEvents(Mockito.any());
+        verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
         assertEquals(RepeatStatus.FINISHED, resultStatus);
     }
 
     @Test
     public void givenBatchSize2WhenEventSendFailsThenExecutionStops() throws Exception {
         // given
-        List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey"),
-                new ExternalEvent("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey"));
+        List<ExternalEventView> events = Arrays.asList(
+                createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L),
+                createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L));
         MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
                 "anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
         when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
         when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
         when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
         doThrow(new AcknowledgementTimeoutException("Event Send Exception", new RuntimeException())).when(eventProducer)
-                .sendEvent(Mockito.any());
+                .sendEvents(Mockito.any());
         // when
-        resultStatus = this.underTest.execute(stepContribution, chunkContext);
+        resultStatus = underTest.execute(stepContribution, chunkContext);
         // then
-        verify(repository, times(0)).save(Mockito.any(ExternalEvent.class));
+        verify(repository, times(0)).markEventsSent(Mockito.any(), Mockito.any());
         assertEquals(RepeatStatus.FINISHED, resultStatus);
     }
 
     @Test
     public void givenOneEventWhenEventSentThenEventStatusUpdates() throws Exception {
         // given
-        ArgumentCaptor<ExternalEvent> externalEventArgumentCaptor = ArgumentCaptor.forClass(ExternalEvent.class);
-        List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey"));
+        List<ExternalEventView> events = Arrays
+                .asList(createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L));
         MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
                 "anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
         when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
         when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
         when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
-        doNothing().when(eventProducer).sendEvent(Mockito.any());
         // when
-        resultStatus = this.underTest.execute(stepContribution, chunkContext);
+        resultStatus = underTest.execute(stepContribution, chunkContext);
         // then
         verify(messageFactory).createMessage(Mockito.any());
-        verify(repository).save(externalEventArgumentCaptor.capture());
-        ExternalEvent externalEvent = externalEventArgumentCaptor.getValue();
-        assertThat(externalEvent.getStatus()).isEqualTo(ExternalEventStatus.SENT);
+        verify(eventProducer).sendEvents(Mockito.any());
+        verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
+        assertEquals(RepeatStatus.FINISHED, resultStatus);
+    }
+
+    @Test
+    public void testExecuteShouldHandleNullAggregateId() throws Exception {
+        // given
+        List<ExternalEventView> events = Arrays
+                .asList(createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", null));
+        MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
+                "anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
+        when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
+        when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
+        byte[] byteMsg = new byte[0];
+        when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(byteMsg);
+        // when
+        resultStatus = underTest.execute(stepContribution, chunkContext);
+        // then
+        verify(messageFactory).createMessage(Mockito.any());
+        verify(eventProducer).sendEvents(Map.of(-1L, List.of(byteMsg)));
+        verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
         assertEquals(RepeatStatus.FINISHED, resultStatus);
     }
 
     @Test
     public void givenEventBatchSizeIsConfiguredAs10WhenTaskExecutionThenEventReadPageSizeIsCorrect() {
         ArgumentCaptor<Pageable> externalEventPageSizeArgumentCaptor = ArgumentCaptor.forClass(Pageable.class);
-        List<ExternalEvent> events = new ArrayList<>();
+        List<ExternalEventView> events = new ArrayList<>();
         when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
         // when
-        resultStatus = this.underTest.execute(stepContribution, chunkContext);
+        resultStatus = underTest.execute(stepContribution, chunkContext);
         // then
         verify(repository).findByStatusOrderById(Mockito.any(), externalEventPageSizeArgumentCaptor.capture());
         assertThat(externalEventPageSizeArgumentCaptor.getValue().getPageSize()).isEqualTo(10);
     }
+
+    private ExternalEventView createExternalEventView(String type, String category, String schema, byte[] data, String idempotencyKey,
+            Long aggregateRootId) {
+        ExternalEventView result = Mockito.mock(ExternalEventView.class);
+        Mockito.when(result.getId()).thenReturn(rnd.nextLong());
+        Mockito.when(result.getType()).thenReturn(type);
+        Mockito.when(result.getCategory()).thenReturn(category);
+        Mockito.when(result.getSchema()).thenReturn(schema);
+        Mockito.when(result.getData()).thenReturn(data);
+        Mockito.when(result.getIdempotencyKey()).thenReturn(idempotencyKey);
+        Mockito.when(result.getAggregateRootId()).thenReturn(aggregateRootId);
+        return result;
+    }
 }
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java
deleted file mode 100644
index bc666406e..000000000
--- a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.fineract.infrastructure.event.external.producer;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.ConnectionFactory;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.config.GlobalChannelInterceptor;
-import org.springframework.integration.dsl.IntegrationFlow;
-import org.springframework.integration.dsl.IntegrationFlows;
-import org.springframework.integration.handler.LoggingHandler;
-import org.springframework.integration.jms.dsl.Jms;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.support.ChannelInterceptor;
-import org.springframework.stereotype.Component;
-import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-
-@ExtendWith(SpringExtension.class)
-@ContextConfiguration
-@DirtiesContext
-@TestPropertySource(properties = { "fineract.events.external.enabled=true" })
-public class EventsJMSIntegrationTest {
-
-    @Autowired
-    @Qualifier("outboundRequestsEvents")
-    private DirectChannel outboundRequestsEvents;
-
-    @Autowired
-    private ContextConfiguration.TestChannelInterceptor testChannelInterceptor;
-
-    @Autowired
-    private ExternalEventProducer underTest;
-
-    @Test
-    public void testJmsDownstreamChannelIntegration() {
-        assertThat(outboundRequestsEvents.getSubscriberCount()).isEqualTo(1);
-    }
-
-    @Test
-    void given2EventsThenOutBoundChannelIsInvokedTwice() {
-        // when
-        underTest.sendEvent(new byte[0]);
-        underTest.sendEvent(new byte[0]);
-        // then
-        assertTrue(outboundRequestsEvents.getInterceptors().contains(this.testChannelInterceptor));
-        assertThat(testChannelInterceptor.getInvoked()).isEqualTo(2);
-
-    }
-
-    @Configuration
-    @IntegrationComponentScan
-    @EnableIntegration
-    public static class ContextConfiguration {
-
-        private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
-
-        @Bean
-        public DirectChannel outboundRequestsEvents() {
-            return new DirectChannel();
-        }
-
-        @Bean
-        public IntegrationFlow outboundFlow() {
-            return IntegrationFlows.from("outboundRequestsEvents") //
-                    .log(LoggingHandler.Level.DEBUG) //
-                    .handle(Jms.outboundAdapter(connectionFactory).destination("destinationChannel")).get();
-        }
-
-        @Component
-        @GlobalChannelInterceptor(patterns = "outboundRequestsEvents")
-        public static class TestChannelInterceptor implements ChannelInterceptor {
-
-            private final AtomicInteger invoked = new AtomicInteger();
-
-            public int getInvoked() {
-                return invoked.get();
-            }
-
-            @Override
-            public Message<?> preSend(Message<?> message, MessageChannel channel) {
-                this.invoked.incrementAndGet();
-                return message;
-            }
-
-        }
-
-    }
-}
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
new file mode 100644
index 000000000..503717ed8
--- /dev/null
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer.jms;
+
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
+import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.springframework.core.task.AsyncTaskExecutor;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class JMSMultiExternalEventProducerTest {
+
+    private static final int PRODUCER_COUNT = 3;
+    private final Random rnd = new Random();
+
+    @Mock
+    private Destination destination;
+    @Mock
+    private ActiveMQConnectionFactory connectionFactory;
+    @Mock
+    private MessageFactory messageFactory;
+
+    @Mock
+    private HashingService hashingService;
+
+    private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
+
+    @Mock
+    private Connection connection;
+    @Mock
+    private Session session1;
+    @Mock
+    private Session session2;
+    @Mock
+    private Session session3;
+
+    @Mock
+    private MessageProducer producer1;
+    @Mock
+    private MessageProducer producer2;
+    @Mock
+    private MessageProducer producer3;
+
+    private JMSMultiExternalEventProducer underTest;
+
+    private FineractProperties fineractProperties;
+
+    private List<MessageProducer> producers = new ArrayList<>();
+
+    @Test
+    public void testAfterPropertiesShouldCreateMultipleSessions() throws Exception {
+        // given
+        initializeMocks();
+        // when
+        underTest.afterPropertiesSet();
+        // then
+        verify(connection, times(3)).createSession(false, Session.AUTO_ACKNOWLEDGE);
+        verify(session1).createProducer(destination);
+        verify(session2).createProducer(destination);
+        verify(session3).createProducer(destination);
+        verify(connection).close();
+    }
+
+    @Test
+    public void testSendEventsShouldWork() throws Exception {
+        // given
+        initializeMocks();
+        underTest.afterPropertiesSet();
+
+        byte[] msg1 = createMessage();
+        List<byte[]> messages = new ArrayList<>();
+        messages.add(msg1);
+        Map<Long, List<byte[]>> partitions = Map.of(1L, messages);
+
+        BytesMessage bytesMsg1 = Mockito.mock(BytesMessage.class);
+        given(messageFactory.createByteMessage(msg1)).willReturn(bytesMsg1);
+        given(hashingService.consistentHash(1L, producers.size())).willReturn(0);
+        // when
+        underTest.sendEvents(partitions);
+        // then
+        verify(producer1).send(destination, bytesMsg1);
+    }
+
+    @Test
+    public void testSendEventsBalancesBetweenProducers() throws Exception {
+        // given
+        initializeMocks();
+        underTest.afterPropertiesSet();
+
+        byte[] msg1 = createMessage();
+        byte[] msg2 = createMessage();
+        byte[] msg3 = createMessage();
+        List<byte[]> messages1 = new ArrayList<>();
+        messages1.add(msg1);
+        List<byte[]> messages2 = new ArrayList<>();
+        messages2.add(msg2);
+        List<byte[]> messages3 = new ArrayList<>();
+        messages3.add(msg3);
+        Map<Long, List<byte[]>> partitions = Map.of(1L, messages1, 2L, messages2, 3L, messages3);
+
+        BytesMessage bytesMsg1 = Mockito.mock(BytesMessage.class);
+        BytesMessage bytesMsg2 = Mockito.mock(BytesMessage.class);
+        BytesMessage bytesMsg3 = Mockito.mock(BytesMessage.class);
+        given(messageFactory.createByteMessage(msg1)).willReturn(bytesMsg1);
+        given(messageFactory.createByteMessage(msg2)).willReturn(bytesMsg2);
+        given(messageFactory.createByteMessage(msg3)).willReturn(bytesMsg3);
+        given(hashingService.consistentHash(1L, producers.size())).willReturn(0);
+        given(hashingService.consistentHash(2L, producers.size())).willReturn(1);
+        given(hashingService.consistentHash(3L, producers.size())).willReturn(2);
+        // when
+        underTest.sendEvents(partitions);
+        // then
+        verify(producer1).send(destination, bytesMsg1);
+        verify(producer2).send(destination, bytesMsg2);
+        verify(producer3).send(destination, bytesMsg3);
+    }
+
+    private byte[] createMessage() throws IOException {
+        MessageV1 messageV1 = new MessageV1();
+        messageV1.setId(rnd.nextInt());
+        messageV1.setSource("");
+        messageV1.setBusinessDate("");
+        messageV1.setCategory("");
+        messageV1.setCreatedAt("");
+        messageV1.setDataschema("");
+        messageV1.setIdempotencyKey("");
+        messageV1.setTenantId("");
+        messageV1.setType("");
+        messageV1.setData(ByteBuffer.wrap(new byte[0]));
+        return messageV1.toByteBuffer().array();
+    }
+
+    private void initializeMocks() throws JMSException {
+        FineractProperties.FineractExternalEventsProducerJmsProperties jms = new FineractProperties.FineractExternalEventsProducerJmsProperties();
+        jms.setProducerCount(PRODUCER_COUNT);
+        FineractProperties.FineractExternalEventsProducerProperties producer = new FineractProperties.FineractExternalEventsProducerProperties();
+        producer.setJms(jms);
+        FineractProperties.FineractExternalEventsProperties external = new FineractProperties.FineractExternalEventsProperties();
+        external.setProducer(producer);
+        FineractProperties.FineractEventsProperties events = new FineractProperties.FineractEventsProperties();
+        events.setExternal(external);
+        fineractProperties = new FineractProperties();
+        fineractProperties.setEvents(events);
+        underTest = new JMSMultiExternalEventProducer(destination, connectionFactory, messageFactory, taskExecutor, hashingService,
+                fineractProperties);
+
+        given(connectionFactory.createConnection()).willReturn(connection);
+        given(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(session1, session2, session3);
+        given(session1.createProducer(destination)).willReturn(producer1);
+        given(session2.createProducer(destination)).willReturn(producer2);
+        given(session3.createProducer(destination)).willReturn(producer3);
+
+        producers.add(producer1);
+        producers.add(producer2);
+        producers.add(producer3);
+    }
+}
diff --git a/lombok.config b/lombok.config
index aa692161e..f542c6381 100644
--- a/lombok.config
+++ b/lombok.config
@@ -18,4 +18,6 @@
 #
 config.stopBubbling = true
 
-lombok.addLombokGeneratedAnnotation = true
\ No newline at end of file
+lombok.addLombokGeneratedAnnotation = true
+
+lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
\ No newline at end of file