You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ar...@apache.org on 2023/01/19 09:34:18 UTC
[fineract] branch develop updated: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events
This is an automated email from the ASF dual-hosted git repository.
arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new 7d28646c7 FINERACT-1724: Implemented support for multiple parallel JMS producers for external events
7d28646c7 is described below
commit 7d28646c7d97088c9e89d11a394333a2d26ed50f
Author: Arnold Galovics <ga...@gmail.com>
AuthorDate: Tue Jan 17 12:16:43 2023 +0100
FINERACT-1724: Implemented support for multiple parallel JMS producers for external events
---
.../LoanAccountsStayedLockedBusinessEvent.java | 5 +
.../core/config/FineractProperties.java | 1 +
.../messaging/jms/ActiveMQMessageFactory.java} | 23 ++-
.../messaging/jms/MessageFactory.java} | 11 +-
.../service/HashingService.java} | 14 +-
.../infrastructure/core/service/MeasuringUtil.java | 50 ++++++
.../event/business/domain/BulkBusinessEvent.java | 17 ++
.../event/business/domain/BusinessEvent.java | 2 +
.../domain/client/ClientBusinessEvent.java | 5 +
.../deposit/FixedDepositAccountBusinessEvent.java | 5 +
.../RecurringDepositAccountBusinessEvent.java | 5 +
.../business/domain/group/GroupsBusinessEvent.java | 5 +
.../loan/LoanAdjustTransactionBusinessEvent.java | 5 +
.../business/domain/loan/LoanBusinessEvent.java | 5 +
.../loan/charge/LoanChargeBusinessEvent.java | 5 +
.../loan/product/LoanProductBusinessEvent.java | 5 +
.../loan/repayment/LoanRepaymentBusinessEvent.java | 5 +
.../transaction/LoanTransactionBusinessEvent.java | 5 +
.../savings/SavingsAccountBusinessEvent.java | 5 +
.../SavingsAccountTransactionBusinessEvent.java | 5 +
.../domain/share/ShareAccountBusinessEvent.java | 5 +
.../ShareProductDividentsCreateBusinessEvent.java | 5 +
...ion.java => ExternalEventJMSConfiguration.java} | 12 +-
.../ExternalEventJMSProducerConfiguration.java | 55 ------
.../config/ExternalEventProducerConfiguration.java | 36 ----
.../jobs/SendAsynchronousEventsConfig.java | 3 +-
.../jobs/SendAsynchronousEventsTasklet.java | 82 +++++++--
.../external/producer/ExternalEventProducer.java | 10 +-
...cerImpl.java => NoopExternalEventProducer.java} | 16 +-
.../jms/JMSMultiExternalEventProducer.java | 153 ++++++++++++++++
.../repository/ExternalEventRepository.java | 8 +-
.../external/repository/domain/ExternalEvent.java | 5 +-
.../repository/domain/ExternalEventView.java} | 25 ++-
.../external/service/ExternalEventService.java | 5 +-
.../external/service/message/MessageFactory.java | 4 +-
.../infrastructure/jobs/service/StepName.java | 2 +-
.../src/main/resources/application.properties | 1 +
.../db/changelog/tenant/changelog-tenant.xml | 1 +
.../0085_add_aggregate_root_id_external_events.xml | 32 ++++
.../core/service/HashingServiceTest.java | 62 +++++++
.../business/domain/BulkBusinessEventTest.java | 64 +++++++
.../BusinessEventNotifierServiceImplTest.java | 5 +
.../jobs/SendAsynchronousEventsTaskletTest.java | 82 ++++++---
.../producer/EventsJMSIntegrationTest.java | 120 -------------
.../jms/JMSMultiExternalEventProducerTest.java | 200 +++++++++++++++++++++
lombok.config | 4 +-
46 files changed, 882 insertions(+), 298 deletions(-)
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanAccountsStayedLockedBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanAccountsStayedLockedBusinessEvent.java
index 274210414..d9280ae56 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanAccountsStayedLockedBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanAccountsStayedLockedBusinessEvent.java
@@ -39,4 +39,9 @@ public class LoanAccountsStayedLockedBusinessEvent extends AbstractBusinessEvent
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return null;
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index b28722c43..99b50c5fd 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -165,6 +165,7 @@ public class FineractProperties {
private String brokerUrl;
private String brokerUsername;
private String brokerPassword;
+ private int producerCount;
public boolean isBrokerPasswordProtected() {
return StringUtils.isNotBlank(brokerUsername) || StringUtils.isNotBlank(brokerPassword);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/ActiveMQMessageFactory.java
similarity index 62%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/ActiveMQMessageFactory.java
index 1ff3eafd5..cc982c8c0 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/ActiveMQMessageFactory.java
@@ -16,21 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.business.domain.loan;
+package org.apache.fineract.infrastructure.core.messaging.jms;
-import org.apache.fineract.infrastructure.event.business.domain.AbstractBusinessEvent;
-import org.apache.fineract.portfolio.loanaccount.domain.Loan;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.springframework.stereotype.Component;
-public abstract class LoanBusinessEvent extends AbstractBusinessEvent<Loan> {
-
- private static final String CATEGORY = "Loan";
-
- public LoanBusinessEvent(Loan value) {
- super(value);
- }
+@Component
+public class ActiveMQMessageFactory implements MessageFactory {
@Override
- public String getCategory() {
- return CATEGORY;
+ public BytesMessage createByteMessage(byte[] msg) throws JMSException {
+ ActiveMQBytesMessage result = new ActiveMQBytesMessage();
+ result.writeBytes(msg);
+ return result;
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/MessageFactory.java
similarity index 78%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/MessageFactory.java
index c2876066b..1f3ed3927 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/MessageFactory.java
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.business.domain;
+package org.apache.fineract.infrastructure.core.messaging.jms;
-public interface BusinessEvent<T> {
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
- T get();
+public interface MessageFactory {
- String getType();
-
- String getCategory();
+ BytesMessage createByteMessage(byte[] msg) throws JMSException;
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/HashingService.java
similarity index 72%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/HashingService.java
index c2876066b..148ad7980 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/HashingService.java
@@ -16,13 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.business.domain;
+package org.apache.fineract.infrastructure.core.service;
-public interface BusinessEvent<T> {
+import com.google.common.hash.Hashing;
+import org.springframework.stereotype.Component;
- T get();
+@Component
+public class HashingService {
- String getType();
-
- String getCategory();
+ public int consistentHash(long input, int buckets) {
+ return Hashing.consistentHash(input, buckets);
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/MeasuringUtil.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/MeasuringUtil.java
new file mode 100644
index 000000000..29f7f4c6a
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/MeasuringUtil.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.core.service;
+
+import java.time.Duration;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.springframework.util.StopWatch;
+
+public final class MeasuringUtil {
+
+ private MeasuringUtil() {}
+
+ public static void measure(Runnable r, Consumer<Duration> c) {
+ measure(() -> {
+ r.run();
+ return null;
+ }, c);
+ }
+
+ public static <T> T measure(Supplier<T> s, Consumer<Duration> c) {
+ return measure(s, (result, timeTaken) -> c.accept(timeTaken));
+ }
+
+ public static <T> T measure(Supplier<T> s, BiConsumer<T, Duration> c) {
+ StopWatch sw = new StopWatch();
+ sw.start();
+ T result = s.get();
+ sw.stop();
+ c.accept(result, Duration.ofMillis(sw.getTotalTimeMillis()));
+ return result;
+ }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEvent.java
index 32f842dc6..b1e6eae27 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEvent.java
@@ -18,7 +18,11 @@
*/
package org.apache.fineract.infrastructure.event.business.domain;
+import static java.util.stream.Collectors.toSet;
+
import java.util.List;
+import java.util.Objects;
+import java.util.Set;
public class BulkBusinessEvent extends AbstractBusinessEvent<List<BusinessEvent<?>>> {
@@ -27,6 +31,14 @@ public class BulkBusinessEvent extends AbstractBusinessEvent<List<BusinessEvent<
public BulkBusinessEvent(List<BusinessEvent<?>> value) {
super(value);
+ verifySameAggregate(value);
+ }
+
+ private void verifySameAggregate(List<BusinessEvent<?>> events) {
+ Set<Long> aggregateRootIds = events.stream().map(BusinessEvent::getAggregateRootId).filter(Objects::nonNull).collect(toSet());
+ if (aggregateRootIds.size() > 1) {
+ throw new IllegalArgumentException("The business events are related to multiple aggregate roots which is not allowed");
+ }
}
@Override
@@ -38,4 +50,9 @@ public class BulkBusinessEvent extends AbstractBusinessEvent<List<BusinessEvent<
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().iterator().next().getAggregateRootId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
index c2876066b..65c2a9ebb 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
@@ -25,4 +25,6 @@ public interface BusinessEvent<T> {
String getType();
String getCategory();
+
+ Long getAggregateRootId();
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/client/ClientBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/client/ClientBusinessEvent.java
index 78acd5432..cbe497a3a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/client/ClientBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/client/ClientBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class ClientBusinessEvent extends AbstractBusinessEvent<Client>
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/FixedDepositAccountBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/FixedDepositAccountBusinessEvent.java
index ae38e0ccc..770ccb86f 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/FixedDepositAccountBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/FixedDepositAccountBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class FixedDepositAccountBusinessEvent extends AbstractBusinessE
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/RecurringDepositAccountBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/RecurringDepositAccountBusinessEvent.java
index 1c2b60b53..d2c57ea59 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/RecurringDepositAccountBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/deposit/RecurringDepositAccountBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class RecurringDepositAccountBusinessEvent extends AbstractBusin
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/group/GroupsBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/group/GroupsBusinessEvent.java
index 1c0f0030d..70172f48d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/group/GroupsBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/group/GroupsBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class GroupsBusinessEvent extends AbstractBusinessEvent<CommandP
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getResourceId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanAdjustTransactionBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanAdjustTransactionBusinessEvent.java
index dc4b98907..c8629beb4 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanAdjustTransactionBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanAdjustTransactionBusinessEvent.java
@@ -43,6 +43,11 @@ public class LoanAdjustTransactionBusinessEvent extends AbstractBusinessEvent<Lo
return CATEGORY;
}
+ @Override
+ public Long getAggregateRootId() {
+ return get().getTransactionToAdjust().getLoan().getId();
+ }
+
@RequiredArgsConstructor
@Getter
public static class Data {
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java
index 1ff3eafd5..62efeab55 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/LoanBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class LoanBusinessEvent extends AbstractBusinessEvent<Loan> {
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/charge/LoanChargeBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/charge/LoanChargeBusinessEvent.java
index 71381e6a6..be42639be 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/charge/LoanChargeBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/charge/LoanChargeBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class LoanChargeBusinessEvent extends AbstractBusinessEvent<Loan
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getLoan().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/product/LoanProductBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/product/LoanProductBusinessEvent.java
index 352315b26..46f2d7071 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/product/LoanProductBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/product/LoanProductBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class LoanProductBusinessEvent extends AbstractBusinessEvent<Loa
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/repayment/LoanRepaymentBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/repayment/LoanRepaymentBusinessEvent.java
index 4ae53fad8..a25909ff6 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/repayment/LoanRepaymentBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/repayment/LoanRepaymentBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class LoanRepaymentBusinessEvent extends AbstractBusinessEvent<L
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getLoan().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/transaction/LoanTransactionBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/transaction/LoanTransactionBusinessEvent.java
index 541f302c4..5a6b434b2 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/transaction/LoanTransactionBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/loan/transaction/LoanTransactionBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class LoanTransactionBusinessEvent extends AbstractBusinessEvent
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getLoan().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/SavingsAccountBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/SavingsAccountBusinessEvent.java
index e09b45c98..6cc16da41 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/SavingsAccountBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/SavingsAccountBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class SavingsAccountBusinessEvent extends AbstractBusinessEvent<
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/transaction/SavingsAccountTransactionBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/transaction/SavingsAccountTransactionBusinessEvent.java
index fe56ce09f..5b92a41d8 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/transaction/SavingsAccountTransactionBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/savings/transaction/SavingsAccountTransactionBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class SavingsAccountTransactionBusinessEvent extends AbstractBus
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getSavingsAccount().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareAccountBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareAccountBusinessEvent.java
index b4f90febf..da524deb9 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareAccountBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareAccountBusinessEvent.java
@@ -33,4 +33,9 @@ public abstract class ShareAccountBusinessEvent extends AbstractBusinessEvent<Sh
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get().getId();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareProductDividentsCreateBusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareProductDividentsCreateBusinessEvent.java
index 0e25d7ece..f790d4dae 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareProductDividentsCreateBusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/share/ShareProductDividentsCreateBusinessEvent.java
@@ -38,4 +38,9 @@ public class ShareProductDividentsCreateBusinessEvent extends AbstractBusinessEv
public String getCategory() {
return CATEGORY;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return get();
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
similarity index 84%
rename from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java
rename to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
index 2f3c89f25..b38f90e58 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
@@ -28,10 +28,11 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
-public class ExternalEventJMSBrokerConfiguration {
+public class ExternalEventJMSConfiguration {
@Autowired
private FineractProperties fineractProperties;
@@ -60,4 +61,13 @@ public class ExternalEventJMSBrokerConfiguration {
public ActiveMQQueue activeMqQueue() {
return new ActiveMQQueue(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventQueueName());
}
+
+ @Bean("externalEventJmsProducerExecutor")
+ public ThreadPoolTaskExecutor externalEventJmsProducerExecutor() {
+ ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+ threadPoolTaskExecutor.setCorePoolSize(10);
+ threadPoolTaskExecutor.setMaxPoolSize(100);
+ threadPoolTaskExecutor.setThreadNamePrefix("externalEventJms");
+ return threadPoolTaskExecutor;
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java
deleted file mode 100644
index 8ed972584..000000000
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.fineract.infrastructure.event.external.config;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import org.apache.fineract.infrastructure.core.config.FineractProperties;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Import;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.dsl.IntegrationFlow;
-import org.springframework.integration.dsl.IntegrationFlows;
-import org.springframework.integration.jms.dsl.Jms;
-
-@Configuration
-@EnableIntegration
-@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
-@Import(value = { ExternalEventJMSBrokerConfiguration.class })
-public class ExternalEventJMSProducerConfiguration {
-
- @Autowired
- private DirectChannel outboundRequestsEvents;
-
- @Autowired
- private FineractProperties fineractProperties;
-
- @Bean
- public IntegrationFlow outboundFlowEvents(ConnectionFactory connectionFactory,
- @Qualifier("eventDestination") Destination eventDestination) {
- return IntegrationFlows.from(outboundRequestsEvents) //
- .handle(Jms.outboundAdapter(connectionFactory).destination(eventDestination)).get();
- }
-
-}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java
deleted file mode 100644
index b99275830..000000000
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.fineract.infrastructure.event.external.config;
-
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.config.EnableIntegration;
-
-@Configuration
-@EnableIntegration
-@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "true")
-public class ExternalEventProducerConfiguration {
-
- @Bean
- public DirectChannel outboundRequestsEvents() {
- return new DirectChannel();
- }
-}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
index 30810773a..55cc4a46d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
@@ -19,6 +19,7 @@
package org.apache.fineract.infrastructure.event.external.jobs;
import org.apache.fineract.infrastructure.jobs.service.JobName;
+import org.apache.fineract.infrastructure.jobs.service.StepName;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
@@ -40,7 +41,7 @@ public class SendAsynchronousEventsConfig {
@Bean
protected Step sendAsynchronousEventsStep() {
- return steps.get(JobName.SEND_ASYNCHRONOUS_EVENTS.name()).tasklet(tasklet).build();
+ return steps.get(StepName.SEND_ASYNCHRONOUS_EVENTS_STEP.name()).tasklet(tasklet).build();
}
@Bean
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
index 289f20591..35733e664 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
@@ -18,8 +18,17 @@
*/
package org.apache.fineract.infrastructure.event.external.jobs;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.fineract.infrastructure.core.service.MeasuringUtil.measure;
+
+import com.google.common.collect.Lists;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.avro.MessageV1;
@@ -28,8 +37,8 @@ import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.core.service.DateUtils;
import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
-import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
import org.springframework.batch.core.StepContribution;
@@ -56,8 +65,9 @@ public class SendAsynchronousEventsTasklet implements Tasklet {
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
try {
if (isDownstreamChannelEnabled()) {
- List<ExternalEvent> events = getQueuedEventsBatch();
- processEvents(events);
+ List<ExternalEventView> events = getQueuedEventsBatch();
+ log.debug("Queued events size: {}", events.size());
+ sendEvents(events);
}
} catch (Exception e) {
log.error("Error occurred while processing events: ", e);
@@ -69,20 +79,66 @@ public class SendAsynchronousEventsTasklet implements Tasklet {
return fineractProperties.getEvents().getExternal().getProducer().getJms().isEnabled();
}
- private List<ExternalEvent> getQueuedEventsBatch() {
+ private List<ExternalEventView> getQueuedEventsBatch() {
int readBatchSize = getBatchSize();
Pageable batchSize = PageRequest.ofSize(readBatchSize);
- return repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize);
+ return measure(() -> repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize),
+ (events, timeTaken) -> log.debug("Loaded {} events in {}ms", events.size(), timeTaken.toMillis()));
+ }
+
+ private void sendEvents(List<ExternalEventView> queuedEvents) {
+ Map<Long, List<byte[]>> partitions = generatePartitions(queuedEvents);
+ List<Long> eventIds = queuedEvents.stream().map(ExternalEventView::getId).toList();
+ sendEventsToProducer(partitions);
+ markEventsAsSent(eventIds);
}
- private void processEvents(List<ExternalEvent> queuedEvents) throws IOException {
- for (ExternalEvent event : queuedEvents) {
- MessageV1 message = messageFactory.createMessage(event);
- byte[] byteMessage = byteBufferConverter.convert(message.toByteBuffer());
- eventProducer.sendEvent(byteMessage);
- event.setStatus(ExternalEventStatus.SENT);
- event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
- repository.save(event);
+ private void sendEventsToProducer(Map<Long, List<byte[]>> partitions) {
+ eventProducer.sendEvents(partitions);
+ }
+
+ private void markEventsAsSent(List<Long> eventIds) {
+ OffsetDateTime sentAt = DateUtils.getOffsetDateTimeOfTenant();
+
+ // Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters
+ List<List<Long>> partitions = Lists.partition(eventIds, 5_000);
+ partitions.forEach(partitionedEventIds -> {
+ measure(() -> {
+ repository.markEventsSent(partitionedEventIds, sentAt);
+ }, timeTaken -> {
+ log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
+ });
+ });
+ }
+
+ private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView> queuedEvents) {
+ Map<Long, List<ExternalEventView>> initialPartitions = queuedEvents.stream().collect(groupingBy(externalEvent -> {
+ Long aggregateRootId = externalEvent.getAggregateRootId();
+ if (aggregateRootId == null) {
+ aggregateRootId = -1L;
+ }
+ return aggregateRootId;
+ }));
+ Map<Long, List<byte[]>> partitions = measure(
+ () -> initialPartitions.entrySet().stream().collect(toMap(Map.Entry::getKey, e -> createMessages(e.getValue()))),
+ timeTaken -> {
+ log.debug("Took {}ms to create message partitions", timeTaken.toMillis());
+ });
+ return partitions;
+ }
+
+ private List<byte[]> createMessages(List<ExternalEventView> events) {
+ try {
+ List<byte[]> messages = new ArrayList<>();
+ for (ExternalEventView event : events) {
+ MessageV1 message = messageFactory.createMessage(event);
+ ByteBuffer toByteBuffer = message.toByteBuffer();
+ byte[] convert = byteBufferConverter.convert(toByteBuffer);
+ messages.add(convert);
+ }
+ return messages;
+ } catch (IOException e) {
+ throw new RuntimeException("Error while serializing the message", e);
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
index daf5b0151..14dd60ce5 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
@@ -18,15 +18,11 @@
*/
package org.apache.fineract.infrastructure.event.external.producer;
+import java.util.List;
+import java.util.Map;
import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.integration.annotation.Gateway;
-import org.springframework.integration.annotation.MessagingGateway;
-@MessagingGateway(name = "externalEventGateway")
-@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "true")
public interface ExternalEventProducer {
- @Gateway(requestChannel = "outboundRequestsEvents", replyTimeout = 2, requestTimeout = 200)
- void sendEvent(byte[] message) throws AcknowledgementTimeoutException;
+ void sendEvents(Map<Long, List<byte[]>> partitions) throws AcknowledgementTimeoutException;
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/NoopExternalEventProducer.java
similarity index 71%
rename from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java
rename to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/NoopExternalEventProducer.java
index 4f60f0a31..8e6227eb1 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/NoopExternalEventProducer.java
@@ -18,16 +18,18 @@
*/
package org.apache.fineract.infrastructure.event.external.producer;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Service;
+import org.springframework.stereotype.Component;
-@Service
-@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "false")
-public class DummyExternalEventProducerImpl implements ExternalEventProducer {
+@Component
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "false")
+@Slf4j
+public class NoopExternalEventProducer implements ExternalEventProducer {
@Override
- public void sendEvent(byte[] message) throws AcknowledgementTimeoutException {
- return;
- }
+ public void sendEvents(Map<Long, List<byte[]>> messages) throws AcknowledgementTimeoutException {}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
new file mode 100644
index 000000000..e063e44f4
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer.jms;
+
+import static org.apache.fineract.infrastructure.core.service.MeasuringUtil.measure;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
+import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.core.task.AsyncTaskExecutor;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
+public class JMSMultiExternalEventProducer implements ExternalEventProducer, InitializingBean {
+
+ @Qualifier("eventDestination")
+ private final Destination destination;
+
+ private final ConnectionFactory connectionFactory;
+
+ private final MessageFactory messageFactory;
+
+ @Qualifier("externalEventJmsProducerExecutor")
+ private final AsyncTaskExecutor taskExecutor;
+
+ private final HashingService hashingService;
+
+ private final FineractProperties fineractProperties;
+
+ private final List<MessageProducer> producers = new ArrayList<>();
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ int producerCount = getProducerCount();
+ try (Connection connection = connectionFactory.createConnection()) {
+ for (int i = 0; i < producerCount; i++) {
+ // It's crucial to create the session within the loop, otherwise the producers won't be handled as
+ // parallel
+ // producers
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ producers.add(producer);
+ }
+ }
+ log.info("Initialized JMS multi producer for external events with {} parallel producers", producerCount);
+ }
+
+ private int getProducerCount() {
+ return fineractProperties.getEvents().getExternal().getProducer().getJms().getProducerCount();
+ }
+
+ @Override
+ public void sendEvents(Map<Long, List<byte[]>> partitions) throws AcknowledgementTimeoutException {
+ Map<Integer, List<byte[]>> indexedPartitions = mapPartitionsToProducers(partitions);
+ measure(() -> {
+ List<Future<?>> tasks = sendPartitions(indexedPartitions);
+ waitForSendingCompletion(tasks);
+ }, timeTaken -> {
+ if (log.isDebugEnabled()) {
+ // in case execution is faster than 1sec
+ long seconds = Math.max(timeTaken.toSeconds(), 1L);
+ Integer eventCount = partitions.values().stream().map(Collection::size).reduce(0, Integer::sum);
+ log.debug("Sent messages with {} msg/s", (eventCount / seconds));
+ }
+ });
+ }
+
+ private List<Future<?>> sendPartitions(Map<Integer, List<byte[]>> indexedPartitions) {
+ List<Future<?>> tasks = new ArrayList<>();
+ for (Map.Entry<Integer, List<byte[]>> entry : indexedPartitions.entrySet()) {
+ Integer producerIndex = entry.getKey();
+ List<byte[]> messages = entry.getValue();
+ Future<?> future = createSendingTask(producerIndex, messages);
+ tasks.add(future);
+ }
+ return tasks;
+ }
+
+ private Future<?> createSendingTask(Integer producerIndex, List<byte[]> messages) {
+ return taskExecutor.submit(() -> {
+ MessageProducer messageProducer = producers.get(producerIndex);
+
+ for (byte[] message : messages) {
+ try {
+ messageProducer.send(destination, messageFactory.createByteMessage(message));
+ } catch (JMSException e) {
+ throw new RuntimeException("Error while sending the message", e);
+ }
+ }
+ });
+ }
+
+ private Map<Integer, List<byte[]>> mapPartitionsToProducers(Map<Long, List<byte[]>> partitions) {
+ Map<Integer, List<byte[]>> indexedPartitions = new HashMap<>();
+ for (Map.Entry<Long, List<byte[]>> partition : partitions.entrySet()) {
+ Long key = partition.getKey();
+ List<byte[]> messages = partition.getValue();
+
+ int producerIndex = hashingService.consistentHash(key, getProducerCount());
+ indexedPartitions.putIfAbsent(producerIndex, new ArrayList<>());
+ indexedPartitions.get(producerIndex).addAll(messages);
+ }
+ return indexedPartitions;
+ }
+
+ private void waitForSendingCompletion(List<Future<?>> tasks) {
+ try {
+ for (Future<?> task : tasks) {
+ task.get();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
index 4a04bfa84..648e22565 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
@@ -19,9 +19,11 @@
package org.apache.fineract.infrastructure.event.external.repository;
import java.time.LocalDate;
+import java.time.OffsetDateTime;
import java.util.List;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
@@ -29,9 +31,13 @@ import org.springframework.data.jpa.repository.Query;
public interface ExternalEventRepository extends JpaRepository<ExternalEvent, Long> {
- List<ExternalEvent> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
+ List<ExternalEventView> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
@Modifying(flushAutomatically = true)
@Query("delete from ExternalEvent e where e.status = :status and e.businessDate <= :dateForPurgeCriteria")
void deleteOlderEventsWithSentStatus(ExternalEventStatus status, LocalDate dateForPurgeCriteria);
+
+ @Modifying
+ @Query("UPDATE ExternalEvent e SET e.status = org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus.SENT, e.sentAt = :sentAt WHERE e.id IN :ids")
+ void markEventsSent(List<Long> ids, OffsetDateTime sentAt);
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEvent.java
index e237a38e7..2ab426a55 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEvent.java
@@ -70,7 +70,10 @@ public class ExternalEvent extends AbstractPersistableCustom {
@Column(name = "business_date", nullable = false)
private LocalDate businessDate;
- public ExternalEvent(String type, String category, String schema, byte[] data, String idempotencyKey) {
+ @Column(name = "aggregate_root_id", nullable = true)
+ private Long aggregateRootId;
+
+ public ExternalEvent(String type, String category, String schema, byte[] data, String idempotencyKey, Long aggregateRootId) {
this.type = type;
this.category = category;
this.schema = schema;
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEventView.java
similarity index 65%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEventView.java
index c2876066b..4c7b43caf 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/business/domain/BusinessEvent.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/domain/ExternalEventView.java
@@ -16,13 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.business.domain;
+package org.apache.fineract.infrastructure.event.external.repository.domain;
-public interface BusinessEvent<T> {
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
- T get();
+public interface ExternalEventView {
+
+ Long getId();
String getType();
String getCategory();
+
+ String getSchema();
+
+ byte[] getData();
+
+ OffsetDateTime getCreatedAt();
+
+ ExternalEventStatus getStatus();
+
+ OffsetDateTime getSentAt();
+
+ String getIdempotencyKey();
+
+ LocalDate getBusinessDate();
+
+ Long getAggregateRootId();
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
index b158eee6b..c3c3690b4 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
@@ -85,7 +85,7 @@ public class ExternalEventService {
byte[] data = byteBufferConverter.convert(avroDto.toByteBuffer());
return new ExternalEvent(bulkBusinessEvent.getType(), bulkBusinessEvent.getCategory(), BulkMessagePayloadV1.class.getName(), data,
- idempotencyKey);
+ idempotencyKey, bulkBusinessEvent.getAggregateRootId());
}
private <T> ExternalEvent handleRegularBusinessEvent(BusinessEvent<T> event) throws IOException {
@@ -95,8 +95,9 @@ public class ExternalEventService {
BusinessEventSerializer serializer = serializerFactory.create(event);
String schema = serializer.getSupportedSchema().getName();
byte[] data = serializer.serialize(event);
+ Long aggregateRootId = event.getAggregateRootId();
- return new ExternalEvent(eventType, eventCategory, schema, data, idempotencyKey);
+ return new ExternalEvent(eventType, eventCategory, schema, data, idempotencyKey, aggregateRootId);
}
private void flushChangesBeforeSerialization() {
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
index 2218cd458..389a1ab63 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
@@ -27,7 +27,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
-import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageBusinessDate;
import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageCategory;
import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageCreatedAt;
@@ -66,7 +66,7 @@ public class MessageFactory implements InitializingBean {
return result;
}
- public MessageV1 createMessage(ExternalEvent event) {
+ public MessageV1 createMessage(ExternalEventView event) {
MessageId id = new MessageId(event.getId().intValue());
MessageSource source = new MessageSource(SOURCE_UUID);
MessageType type = new MessageType(event.getType());
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StepName.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StepName.java
index 2dbac082c..bdf0abc16 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StepName.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StepName.java
@@ -19,5 +19,5 @@
package org.apache.fineract.infrastructure.jobs.service;
public enum StepName {
- PURGE_PROCESSED_COMMANDS_STEP;
+ PURGE_PROCESSED_COMMANDS_STEP, SEND_ASYNCHRONOUS_EVENTS_STEP
}
diff --git a/fineract-provider/src/main/resources/application.properties b/fineract-provider/src/main/resources/application.properties
index 385400e3c..a322b44b4 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -62,6 +62,7 @@ fineract.events.external.producer.jms.event-topic-name=${FINERACT_EXTERNAL_EVENT
fineract.events.external.producer.jms.broker-url=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL:tcp://127.0.0.1:61616}
fineract.events.external.producer.jms.broker-username=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_USERNAME:}
fineract.events.external.producer.jms.broker-password=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_PASSWORD:}
+fineract.events.external.producer.jms.producer-count=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT:1}
fineract.idempotency-key-header-name=${FINERACT_IDEMPOTENCY_KEY_HEADER_NAME:Idempotency-Key}
diff --git a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
index 932f1fec5..d81bba3f1 100644
--- a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
+++ b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
@@ -104,4 +104,5 @@
<include file="parts/0082_add_external_event_default_configuration.xml" relativeToChangelogFile="true" />
<include file="parts/0083_add_loan_transaction_enum_values.xml" relativeToChangelogFile="true" />
<include file="parts/0084_add_general_accounting_table_reports.xml" relativeToChangelogFile="true" />
+ <include file="parts/0085_add_aggregate_root_id_external_events.xml" relativeToChangelogFile="true" />
</databaseChangeLog>
diff --git a/fineract-provider/src/main/resources/db/changelog/tenant/parts/0085_add_aggregate_root_id_external_events.xml b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0085_add_aggregate_root_id_external_events.xml
new file mode 100644
index 000000000..906fc50b3
--- /dev/null
+++ b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0085_add_aggregate_root_id_external_events.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.3.xsd">
+ <changeSet author="fineract" id="1">
+ <addColumn tableName="m_external_event">
+ <column name="aggregate_root_id" type="BIGINT">
+ <constraints nullable="true"/>
+ </column>
+ </addColumn>
+ </changeSet>
+</databaseChangeLog>
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/service/HashingServiceTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/service/HashingServiceTest.java
new file mode 100644
index 000000000..305e32fc6
--- /dev/null
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/service/HashingServiceTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.core.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Random;
+import org.junit.jupiter.api.Test;
+
+class HashingServiceTest {
+
+ private final Random rnd = new Random();
+
+ private HashingService underTest = new HashingService();
+
+ @Test
+ public void testConsistentHashGeneratesHashesConsistentlyForTheSameValue() {
+ // given
+ int initialResult = underTest.consistentHash(1L, 20);
+ // when & then
+ for (int i = 0; i < 20_000; i++) {
+ int result = underTest.consistentHash(1L, 20);
+ assertThat(result).isEqualTo(initialResult);
+ }
+ }
+
+ @Test
+ public void testConsistentHashWorksForNegativeValues() {
+ // given
+ // when
+ int result = underTest.consistentHash(-1L, 20);
+ // then
+ assertThat(result).isBetween(0, 19);
+ }
+
+ @Test
+ public void testConsistentHashGeneratesHashesWithinBuckets() {
+ // given
+ int buckets = 10;
+ // when & then
+ for (int i = 0; i < 20_000; i++) {
+ int result = underTest.consistentHash(rnd.nextLong(), buckets);
+ assertThat(result).isLessThan(buckets);
+ }
+ }
+}
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEventTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEventTest.java
new file mode 100644
index 000000000..e2eb0c389
--- /dev/null
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/domain/BulkBusinessEventTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.business.domain;
+
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import org.apache.fineract.infrastructure.event.business.domain.client.ClientActivateBusinessEvent;
+import org.apache.fineract.infrastructure.event.business.domain.client.ClientCreateBusinessEvent;
+import org.apache.fineract.portfolio.client.domain.Client;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class BulkBusinessEventTest {
+
+ @Test
+ public void testConstructorWorksForSameAggregateId() {
+ // given
+ // when
+ new BulkBusinessEvent(List.of(new ClientCreateBusinessEvent(client(1L)), new ClientActivateBusinessEvent(client(1L))));
+ // then no exception thrown
+ }
+
+ @Test
+ public void testConstructorThrowsExceptionForDifferentAggregateId() {
+ // given
+ // when
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new BulkBusinessEvent(
+ List.of(new ClientCreateBusinessEvent(client(1L)), new ClientActivateBusinessEvent(client(2L)))));
+ // then no exception thrown
+ }
+
+ @Test
+ public void testConstructorWorksForNullAggregateId() {
+ // given
+ // when
+ new BulkBusinessEvent(List.of(new ClientCreateBusinessEvent(client(1L)), new ClientActivateBusinessEvent(client(null))));
+ // then no exception thrown
+ }
+
+ private Client client(Long id) {
+ Client client = mock(Client.class);
+ given(client.getId()).willReturn(id);
+ return client;
+ }
+
+}
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/service/BusinessEventNotifierServiceImplTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/service/BusinessEventNotifierServiceImplTest.java
index 81da91e2f..56984cc84 100644
--- a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/service/BusinessEventNotifierServiceImplTest.java
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/business/service/BusinessEventNotifierServiceImplTest.java
@@ -226,6 +226,11 @@ class BusinessEventNotifierServiceImplTest {
public String getCategory() {
return null;
}
+
+ @Override
+ public Long getAggregateRootId() {
+ return null;
+ }
}
}
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
index 0fdfca0fd..f505856d9 100644
--- a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
@@ -20,7 +20,6 @@ package org.apache.fineract.infrastructure.event.external.jobs;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -34,6 +33,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
@@ -43,8 +43,7 @@ import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
-import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
-import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
import org.junit.jupiter.api.BeforeEach;
@@ -54,12 +53,15 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.data.domain.Pageable;
@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
class SendAsynchronousEventsTaskletTest {
@Mock
@@ -81,6 +83,8 @@ class SendAsynchronousEventsTaskletTest {
private SendAsynchronousEventsTasklet underTest;
private RepeatStatus resultStatus;
+ private Random rnd = new Random();
+
@BeforeEach
public void setUp() {
ThreadLocalContextUtil.setTenant(new FineractPlatformTenant(1L, "default", "Default", "Asia/Kolkata", null));
@@ -108,8 +112,9 @@ class SendAsynchronousEventsTaskletTest {
@Test
public void givenBatchSize2WhenTaskExecutionThenSend2Events() throws Exception {
// given
- List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey"),
- new ExternalEvent("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey"));
+ List<ExternalEventView> events = Arrays.asList(
+ createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L),
+ createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L));
// Dummy Message
MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
"anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
@@ -117,64 +122,95 @@ class SendAsynchronousEventsTaskletTest {
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
- doNothing().when(eventProducer).sendEvent(Mockito.any());
// when
- resultStatus = this.underTest.execute(stepContribution, chunkContext);
+ resultStatus = underTest.execute(stepContribution, chunkContext);
// then
- verify(eventProducer, times(2)).sendEvent(new byte[0]);
- verify(repository, times(2)).save(Mockito.any(ExternalEvent.class));
+ verify(eventProducer).sendEvents(Mockito.any());
+ verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
assertEquals(RepeatStatus.FINISHED, resultStatus);
}
@Test
public void givenBatchSize2WhenEventSendFailsThenExecutionStops() throws Exception {
// given
- List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey"),
- new ExternalEvent("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey"));
+ List<ExternalEventView> events = Arrays.asList(
+ createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L),
+ createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L));
MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
"anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
doThrow(new AcknowledgementTimeoutException("Event Send Exception", new RuntimeException())).when(eventProducer)
- .sendEvent(Mockito.any());
+ .sendEvents(Mockito.any());
// when
- resultStatus = this.underTest.execute(stepContribution, chunkContext);
+ resultStatus = underTest.execute(stepContribution, chunkContext);
// then
- verify(repository, times(0)).save(Mockito.any(ExternalEvent.class));
+ verify(repository, times(0)).markEventsSent(Mockito.any(), Mockito.any());
assertEquals(RepeatStatus.FINISHED, resultStatus);
}
@Test
public void givenOneEventWhenEventSentThenEventStatusUpdates() throws Exception {
// given
- ArgumentCaptor<ExternalEvent> externalEventArgumentCaptor = ArgumentCaptor.forClass(ExternalEvent.class);
- List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey"));
+ List<ExternalEventView> events = Arrays
+ .asList(createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", 1L));
MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
"anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
- doNothing().when(eventProducer).sendEvent(Mockito.any());
// when
- resultStatus = this.underTest.execute(stepContribution, chunkContext);
+ resultStatus = underTest.execute(stepContribution, chunkContext);
// then
verify(messageFactory).createMessage(Mockito.any());
- verify(repository).save(externalEventArgumentCaptor.capture());
- ExternalEvent externalEvent = externalEventArgumentCaptor.getValue();
- assertThat(externalEvent.getStatus()).isEqualTo(ExternalEventStatus.SENT);
+ verify(eventProducer).sendEvents(Mockito.any());
+ verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
+ assertEquals(RepeatStatus.FINISHED, resultStatus);
+ }
+
+ @Test
+ public void testExecuteShouldHandleNullAggregateId() throws Exception {
+ // given
+ List<ExternalEventView> events = Arrays
+ .asList(createExternalEventView("aType", "aCategory", "aSchema", new byte[0], "aIdempotencyKey", null));
+ MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aBusinessDate", "aTenantId",
+ "anidempotencyKey", "aSchema", Mockito.mock(ByteBuffer.class));
+ when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
+ when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
+ byte[] byteMsg = new byte[0];
+ when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(byteMsg);
+ // when
+ resultStatus = underTest.execute(stepContribution, chunkContext);
+ // then
+ verify(messageFactory).createMessage(Mockito.any());
+ verify(eventProducer).sendEvents(Map.of(-1L, List.of(byteMsg)));
+ verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
assertEquals(RepeatStatus.FINISHED, resultStatus);
}
@Test
public void givenEventBatchSizeIsConfiguredAs10WhenTaskExecutionThenEventReadPageSizeIsCorrect() {
ArgumentCaptor<Pageable> externalEventPageSizeArgumentCaptor = ArgumentCaptor.forClass(Pageable.class);
- List<ExternalEvent> events = new ArrayList<>();
+ List<ExternalEventView> events = new ArrayList<>();
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
// when
- resultStatus = this.underTest.execute(stepContribution, chunkContext);
+ resultStatus = underTest.execute(stepContribution, chunkContext);
// then
verify(repository).findByStatusOrderById(Mockito.any(), externalEventPageSizeArgumentCaptor.capture());
assertThat(externalEventPageSizeArgumentCaptor.getValue().getPageSize()).isEqualTo(10);
}
+
+ private ExternalEventView createExternalEventView(String type, String category, String schema, byte[] data, String idempotencyKey,
+ Long aggregateRootId) {
+ ExternalEventView result = Mockito.mock(ExternalEventView.class);
+ Mockito.when(result.getId()).thenReturn(rnd.nextLong());
+ Mockito.when(result.getType()).thenReturn(type);
+ Mockito.when(result.getCategory()).thenReturn(category);
+ Mockito.when(result.getSchema()).thenReturn(schema);
+ Mockito.when(result.getData()).thenReturn(data);
+ Mockito.when(result.getIdempotencyKey()).thenReturn(idempotencyKey);
+ Mockito.when(result.getAggregateRootId()).thenReturn(aggregateRootId);
+ return result;
+ }
}
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java
deleted file mode 100644
index bc666406e..000000000
--- a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.fineract.infrastructure.event.external.producer;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.ConnectionFactory;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.config.GlobalChannelInterceptor;
-import org.springframework.integration.dsl.IntegrationFlow;
-import org.springframework.integration.dsl.IntegrationFlows;
-import org.springframework.integration.handler.LoggingHandler;
-import org.springframework.integration.jms.dsl.Jms;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.support.ChannelInterceptor;
-import org.springframework.stereotype.Component;
-import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-
-@ExtendWith(SpringExtension.class)
-@ContextConfiguration
-@DirtiesContext
-@TestPropertySource(properties = { "fineract.events.external.enabled=true" })
-public class EventsJMSIntegrationTest {
-
- @Autowired
- @Qualifier("outboundRequestsEvents")
- private DirectChannel outboundRequestsEvents;
-
- @Autowired
- private ContextConfiguration.TestChannelInterceptor testChannelInterceptor;
-
- @Autowired
- private ExternalEventProducer underTest;
-
- @Test
- public void testJmsDownstreamChannelIntegration() {
- assertThat(outboundRequestsEvents.getSubscriberCount()).isEqualTo(1);
- }
-
- @Test
- void given2EventsThenOutBoundChannelIsInvokedTwice() {
- // when
- underTest.sendEvent(new byte[0]);
- underTest.sendEvent(new byte[0]);
- // then
- assertTrue(outboundRequestsEvents.getInterceptors().contains(this.testChannelInterceptor));
- assertThat(testChannelInterceptor.getInvoked()).isEqualTo(2);
-
- }
-
- @Configuration
- @IntegrationComponentScan
- @EnableIntegration
- public static class ContextConfiguration {
-
- private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
-
- @Bean
- public DirectChannel outboundRequestsEvents() {
- return new DirectChannel();
- }
-
- @Bean
- public IntegrationFlow outboundFlow() {
- return IntegrationFlows.from("outboundRequestsEvents") //
- .log(LoggingHandler.Level.DEBUG) //
- .handle(Jms.outboundAdapter(connectionFactory).destination("destinationChannel")).get();
- }
-
- @Component
- @GlobalChannelInterceptor(patterns = "outboundRequestsEvents")
- public static class TestChannelInterceptor implements ChannelInterceptor {
-
- private final AtomicInteger invoked = new AtomicInteger();
-
- public int getInvoked() {
- return invoked.get();
- }
-
- @Override
- public Message<?> preSend(Message<?> message, MessageChannel channel) {
- this.invoked.incrementAndGet();
- return message;
- }
-
- }
-
- }
-}
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
new file mode 100644
index 000000000..503717ed8
--- /dev/null
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer.jms;
+
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
+import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.springframework.core.task.AsyncTaskExecutor;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class JMSMultiExternalEventProducerTest {
+
+ private static final int PRODUCER_COUNT = 3;
+ private final Random rnd = new Random();
+
+ @Mock
+ private Destination destination;
+ @Mock
+ private ActiveMQConnectionFactory connectionFactory;
+ @Mock
+ private MessageFactory messageFactory;
+
+ @Mock
+ private HashingService hashingService;
+
+ private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
+
+ @Mock
+ private Connection connection;
+ @Mock
+ private Session session1;
+ @Mock
+ private Session session2;
+ @Mock
+ private Session session3;
+
+ @Mock
+ private MessageProducer producer1;
+ @Mock
+ private MessageProducer producer2;
+ @Mock
+ private MessageProducer producer3;
+
+ private JMSMultiExternalEventProducer underTest;
+
+ private FineractProperties fineractProperties;
+
+ private List<MessageProducer> producers = new ArrayList<>();
+
+ @Test
+ public void testAfterPropertiesShouldCreateMultipleSessions() throws Exception {
+ // given
+ initializeMocks();
+ // when
+ underTest.afterPropertiesSet();
+ // then
+ verify(connection, times(3)).createSession(false, Session.AUTO_ACKNOWLEDGE);
+ verify(session1).createProducer(destination);
+ verify(session2).createProducer(destination);
+ verify(session3).createProducer(destination);
+ verify(connection).close();
+ }
+
+ @Test
+ public void testSendEventsShouldWork() throws Exception {
+ // given
+ initializeMocks();
+ underTest.afterPropertiesSet();
+
+ byte[] msg1 = createMessage();
+ List<byte[]> messages = new ArrayList<>();
+ messages.add(msg1);
+ Map<Long, List<byte[]>> partitions = Map.of(1L, messages);
+
+ BytesMessage bytesMsg1 = Mockito.mock(BytesMessage.class);
+ given(messageFactory.createByteMessage(msg1)).willReturn(bytesMsg1);
+ given(hashingService.consistentHash(1L, producers.size())).willReturn(0);
+ // when
+ underTest.sendEvents(partitions);
+ // then
+ verify(producer1).send(destination, bytesMsg1);
+ }
+
+ @Test
+ public void testSendEventsBalancesBetweenProducers() throws Exception {
+ // given
+ initializeMocks();
+ underTest.afterPropertiesSet();
+
+ byte[] msg1 = createMessage();
+ byte[] msg2 = createMessage();
+ byte[] msg3 = createMessage();
+ List<byte[]> messages1 = new ArrayList<>();
+ messages1.add(msg1);
+ List<byte[]> messages2 = new ArrayList<>();
+ messages2.add(msg2);
+ List<byte[]> messages3 = new ArrayList<>();
+ messages3.add(msg3);
+ Map<Long, List<byte[]>> partitions = Map.of(1L, messages1, 2L, messages2, 3L, messages3);
+
+ BytesMessage bytesMsg1 = Mockito.mock(BytesMessage.class);
+ BytesMessage bytesMsg2 = Mockito.mock(BytesMessage.class);
+ BytesMessage bytesMsg3 = Mockito.mock(BytesMessage.class);
+ given(messageFactory.createByteMessage(msg1)).willReturn(bytesMsg1);
+ given(messageFactory.createByteMessage(msg2)).willReturn(bytesMsg2);
+ given(messageFactory.createByteMessage(msg3)).willReturn(bytesMsg3);
+ given(hashingService.consistentHash(1L, producers.size())).willReturn(0);
+ given(hashingService.consistentHash(2L, producers.size())).willReturn(1);
+ given(hashingService.consistentHash(3L, producers.size())).willReturn(2);
+ // when
+ underTest.sendEvents(partitions);
+ // then
+ verify(producer1).send(destination, bytesMsg1);
+ verify(producer2).send(destination, bytesMsg2);
+ verify(producer3).send(destination, bytesMsg3);
+ }
+
+ private byte[] createMessage() throws IOException {
+ MessageV1 messageV1 = new MessageV1();
+ messageV1.setId(rnd.nextInt());
+ messageV1.setSource("");
+ messageV1.setBusinessDate("");
+ messageV1.setCategory("");
+ messageV1.setCreatedAt("");
+ messageV1.setDataschema("");
+ messageV1.setIdempotencyKey("");
+ messageV1.setTenantId("");
+ messageV1.setType("");
+ messageV1.setData(ByteBuffer.wrap(new byte[0]));
+ return messageV1.toByteBuffer().array();
+ }
+
+ private void initializeMocks() throws JMSException {
+ FineractProperties.FineractExternalEventsProducerJmsProperties jms = new FineractProperties.FineractExternalEventsProducerJmsProperties();
+ jms.setProducerCount(PRODUCER_COUNT);
+ FineractProperties.FineractExternalEventsProducerProperties producer = new FineractProperties.FineractExternalEventsProducerProperties();
+ producer.setJms(jms);
+ FineractProperties.FineractExternalEventsProperties external = new FineractProperties.FineractExternalEventsProperties();
+ external.setProducer(producer);
+ FineractProperties.FineractEventsProperties events = new FineractProperties.FineractEventsProperties();
+ events.setExternal(external);
+ fineractProperties = new FineractProperties();
+ fineractProperties.setEvents(events);
+ underTest = new JMSMultiExternalEventProducer(destination, connectionFactory, messageFactory, taskExecutor, hashingService,
+ fineractProperties);
+
+ given(connectionFactory.createConnection()).willReturn(connection);
+ given(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(session1, session2, session3);
+ given(session1.createProducer(destination)).willReturn(producer1);
+ given(session2.createProducer(destination)).willReturn(producer2);
+ given(session3.createProducer(destination)).willReturn(producer3);
+
+ producers.add(producer1);
+ producers.add(producer2);
+ producers.add(producer3);
+ }
+}
diff --git a/lombok.config b/lombok.config
index aa692161e..f542c6381 100644
--- a/lombok.config
+++ b/lombok.config
@@ -18,4 +18,6 @@
#
config.stopBubbling = true
-lombok.addLombokGeneratedAnnotation = true
\ No newline at end of file
+lombok.addLombokGeneratedAnnotation = true
+
+lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
\ No newline at end of file