You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by al...@apache.org on 2022/09/27 15:06:35 UTC

[fineract] branch develop updated: FINERACT-1694 JMS ActiveMQ downstream channel integration for events

This is an automated email from the ASF dual-hosted git repository.

aleks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new ee0184c0a FINERACT-1694 JMS ActiveMQ downstream channel integration for events
ee0184c0a is described below

commit ee0184c0a5823f648b80b5374d88c0851c559b6e
Author: Ruchi Dhamankar <ru...@gmail.com>
AuthorDate: Wed Sep 21 21:42:49 2022 +0530

    FINERACT-1694 JMS ActiveMQ downstream channel integration for events
---
 .../boot/AbstractApplicationConfiguration.java     |   2 +
 .../core/config/FineractProperties.java            |   1 +
 .../ExternalEventJMSBrokerConfiguration.java       |  42 ++++++++
 .../ExternalEventJMSProducerConfiguration.java     |  56 ++++++++++
 .../ExternalEventProducerConfiguration.java}       |  22 ++--
 .../jobs/SendAsynchronousEventsTasklet.java        |   7 +-
 ...pl.java => DummyExternalEventProducerImpl.java} |   7 +-
 .../external/producer/ExternalEventProducer.java   |   9 +-
 .../src/main/resources/application.properties      |   1 +
 .../jobs/SendAsynchronousEventsTaskletTest.java    |  31 ++++--
 .../producer/EventsJMSIntegrationTest.java         | 120 +++++++++++++++++++++
 .../src/test/resources/application-test.properties |   2 +
 12 files changed, 276 insertions(+), 24 deletions(-)

diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/boot/AbstractApplicationConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/boot/AbstractApplicationConfiguration.java
index 0c0b6c4be..dcc5fe71d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/boot/AbstractApplicationConfiguration.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/boot/AbstractApplicationConfiguration.java
@@ -30,6 +30,7 @@ import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfigurat
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
 import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
 import org.springframework.transaction.annotation.EnableTransactionManagement;
 
@@ -45,6 +46,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
 @EnableWebSecurity
 @EnableConfigurationProperties({ FineractProperties.class, LiquibaseProperties.class })
 @ComponentScan(basePackages = "org.apache.fineract.**")
+@IntegrationComponentScan(basePackages = "org.apache.fineract.**")
 public abstract class AbstractApplicationConfiguration {
 
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index a0639e6aa..996b0116b 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -151,5 +151,6 @@ public class FineractProperties {
 
         private boolean enabled;
         private String eventQueueName;
+        private String brokerUrl;
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java
new file mode 100644
index 000000000..44efcb292
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.config;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
+public class ExternalEventJMSBrokerConfiguration {
+
+    @Autowired
+    private FineractProperties fineractProperties;
+
+    @Bean
+    public ActiveMQConnectionFactory connectionFactory() {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+        connectionFactory.setBrokerURL(fineractProperties.getEvents().getExternal().getProducer().getJms().getBrokerUrl());
+        connectionFactory.setTrustAllPackages(true);
+        return connectionFactory;
+    }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java
new file mode 100644
index 000000000..bc765b146
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.config;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.dsl.IntegrationFlows;
+import org.springframework.integration.handler.LoggingHandler;
+import org.springframework.integration.jms.dsl.Jms;
+
+@Configuration
+@EnableIntegration
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
+@Import(value = { ExternalEventJMSBrokerConfiguration.class })
+public class ExternalEventJMSProducerConfiguration {
+
+    @Autowired
+    private DirectChannel outboundRequests;
+
+    @Autowired
+    private FineractProperties fineractProperties;
+
+    @Bean
+    public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
+        return IntegrationFlows.from(outboundRequests) //
+                .log(LoggingHandler.Level.DEBUG) //
+                .handle(Jms.outboundAdapter(connectionFactory)
+                        .destination(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventQueueName()))
+                .get();
+    }
+
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java
similarity index 55%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java
index 067307b6e..6896228cb 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java
@@ -16,17 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.producer;
+package org.apache.fineract.infrastructure.event.external.config;
 
-import org.apache.fineract.avro.MessageV1;
-import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
-import org.springframework.stereotype.Service;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.config.EnableIntegration;
 
-@Service
-public class ExternalEventProducerImpl implements ExternalEventProducer {
+@Configuration
+@EnableIntegration
+@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "true")
+public class ExternalEventProducerConfiguration {
 
-    @Override
-    public void sendEvent(MessageV1 message) throws AcknowledgementTimeoutException {
-        return;
+    @Bean
+    public DirectChannel outboundRequests() {
+        return new DirectChannel();
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
index 27ad7b9ac..1dfe90157 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
@@ -18,6 +18,7 @@
  */
 package org.apache.fineract.infrastructure.event.external.jobs;
 
+import java.io.IOException;
 import java.util.List;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -29,6 +30,7 @@ import org.apache.fineract.infrastructure.event.external.repository.ExternalEven
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
 import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
 import org.springframework.batch.core.StepContribution;
 import org.springframework.batch.core.scope.context.ChunkContext;
 import org.springframework.batch.core.step.tasklet.Tasklet;
@@ -46,6 +48,7 @@ public class SendAsynchronousEventsTasklet implements Tasklet {
     private final ExternalEventRepository repository;
     private final ExternalEventProducer eventProducer;
     private final MessageFactory messageFactory;
+    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
@@ -65,10 +68,10 @@ public class SendAsynchronousEventsTasklet implements Tasklet {
         return events;
     }
 
-    private void processEvents(List<ExternalEvent> queuedEvents) {
+    private void processEvents(List<ExternalEvent> queuedEvents) throws IOException {
         for (ExternalEvent event : queuedEvents) {
             MessageV1 message = messageFactory.createMessage(event);
-            eventProducer.sendEvent(message);
+            eventProducer.sendEvent(byteBufferConverter.convert(message.toByteBuffer()));
             event.setStatus(ExternalEventStatus.SENT);
             event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
             repository.save(event);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java
similarity index 76%
rename from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
rename to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java
index 067307b6e..4f60f0a31 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java
@@ -18,15 +18,16 @@
  */
 package org.apache.fineract.infrastructure.event.external.producer;
 
-import org.apache.fineract.avro.MessageV1;
 import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Service;
 
 @Service
-public class ExternalEventProducerImpl implements ExternalEventProducer {
+@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "false")
+public class DummyExternalEventProducerImpl implements ExternalEventProducer {
 
     @Override
-    public void sendEvent(MessageV1 message) throws AcknowledgementTimeoutException {
+    public void sendEvent(byte[] message) throws AcknowledgementTimeoutException {
         return;
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
index 355f57bee..bf720ca29 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
@@ -18,10 +18,15 @@
  */
 package org.apache.fineract.infrastructure.event.external.producer;
 
-import org.apache.fineract.avro.MessageV1;
 import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.integration.annotation.Gateway;
+import org.springframework.integration.annotation.MessagingGateway;
 
+@MessagingGateway(name = "externalEventGateway")
+@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "true")
 public interface ExternalEventProducer {
 
-    void sendEvent(MessageV1 message) throws AcknowledgementTimeoutException;
+    @Gateway(requestChannel = "outboundRequests", replyTimeout = 2, requestTimeout = 200)
+    void sendEvent(byte[] message) throws AcknowledgementTimeoutException;
 }
diff --git a/fineract-provider/src/main/resources/application.properties b/fineract-provider/src/main/resources/application.properties
index 44a6c83f8..73a4864ff 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -57,6 +57,7 @@ fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
 fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
 fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
 fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
+fineract.events.external.producer.jms.broker-url=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL:tcp://127.0.0.1:61616}
 
 
 # Logging pattern for the console
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
index 8550fbb5b..fe7c89778 100644
--- a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.nio.ByteBuffer;
 import java.time.LocalDate;
 import java.time.ZoneId;
 import java.util.Arrays;
@@ -43,6 +44,7 @@ import org.apache.fineract.infrastructure.event.external.repository.ExternalEven
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
 import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -69,6 +71,8 @@ class SendAsynchronousEventsTaskletTest {
     private StepContribution stepContribution;
     @Mock
     private ChunkContext chunkContext;
+    @Mock
+    private ByteBufferConverter byteBufferConverter;
     private SendAsynchronousEventsTasklet underTest;
     private RepeatStatus resultStatus;
 
@@ -78,7 +82,7 @@ class SendAsynchronousEventsTaskletTest {
         ThreadLocalContextUtil
                 .setBusinessDates(new HashMap<>(Map.of(BusinessDateType.BUSINESS_DATE, LocalDate.now(ZoneId.systemDefault()))));
         configureExternalEventsProducerReadBatchSizeProperty(1000);
-        underTest = new SendAsynchronousEventsTasklet(fineractProperties, repository, eventProducer, messageFactory);
+        underTest = new SendAsynchronousEventsTasklet(fineractProperties, repository, eventProducer, messageFactory, byteBufferConverter);
     }
 
     private void configureExternalEventsProducerReadBatchSizeProperty(int readBatchSize) {
@@ -97,13 +101,18 @@ class SendAsynchronousEventsTaskletTest {
         // given
         List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aSchema", new byte[0], "aIdemtpotencyKey"),
                 new ExternalEvent("aType", "aSchema", new byte[0], "aIdemtpotencyKey"));
+        // Dummy Message
+        MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aTennantId", "anidempotencyKey",
+                "aSchema", Mockito.mock(ByteBuffer.class));
+
         when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
-        when(messageFactory.createMessage(Mockito.any())).thenReturn(new MessageV1());
-        doNothing().when(eventProducer).sendEvent(Mockito.any(MessageV1.class));
+        when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
+        when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
+        doNothing().when(eventProducer).sendEvent(Mockito.any());
         // when
         resultStatus = this.underTest.execute(stepContribution, chunkContext);
         // then
-        verify(eventProducer, times(2)).sendEvent(Mockito.any(MessageV1.class));
+        verify(eventProducer, times(2)).sendEvent(new byte[0]);
         verify(repository, times(2)).save(Mockito.any(ExternalEvent.class));
         assertEquals(RepeatStatus.FINISHED, resultStatus);
     }
@@ -113,10 +122,13 @@ class SendAsynchronousEventsTaskletTest {
         // given
         List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aSchema", new byte[0], "aIdemtpotencyKey"),
                 new ExternalEvent("aType", "aSchema", new byte[0], "aIdemtpotencyKey"));
+        MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aTennantId", "anidempotencyKey",
+                "aSchema", Mockito.mock(ByteBuffer.class));
         when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
-        when(messageFactory.createMessage(Mockito.any())).thenReturn(new MessageV1());
+        when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
+        when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
         doThrow(new AcknowledgementTimeoutException("Event Send Exception", new RuntimeException())).when(eventProducer)
-                .sendEvent(Mockito.any(MessageV1.class));
+                .sendEvent(Mockito.any());
         // when
         resultStatus = this.underTest.execute(stepContribution, chunkContext);
         // then
@@ -129,9 +141,12 @@ class SendAsynchronousEventsTaskletTest {
         // given
         ArgumentCaptor<ExternalEvent> externalEventArgumentCaptor = ArgumentCaptor.forClass(ExternalEvent.class);
         List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aSchema", new byte[0], "aIdemtpotencyKey"));
+        MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aTennantId", "anidempotencyKey",
+                "aSchema", Mockito.mock(ByteBuffer.class));
         when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
-        when(messageFactory.createMessage(Mockito.any())).thenReturn(new MessageV1());
-        doNothing().when(eventProducer).sendEvent(Mockito.any(MessageV1.class));
+        when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
+        when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
+        doNothing().when(eventProducer).sendEvent(Mockito.any());
         // when
         resultStatus = this.underTest.execute(stepContribution, chunkContext);
         // then
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java
new file mode 100644
index 000000000..d7a611d93
--- /dev/null
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.ConnectionFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.config.GlobalChannelInterceptor;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.dsl.IntegrationFlows;
+import org.springframework.integration.handler.LoggingHandler;
+import org.springframework.integration.jms.dsl.Jms;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.stereotype.Component;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+@ExtendWith(SpringExtension.class)
+@ContextConfiguration
+@DirtiesContext
+@TestPropertySource(properties = { "fineract.events.external.enabled=true" })
+public class EventsJMSIntegrationTest {
+
+    @Autowired
+    @Qualifier("outboundRequests")
+    private DirectChannel outboundRequests;
+
+    @Autowired
+    private ContextConfiguration.TestChannelInterceptor testChannelInterceptor;
+
+    @Autowired
+    private ExternalEventProducer underTest;
+
+    @Test
+    public void testJmsDownstreamChannelIntegration() {
+        assertThat(outboundRequests.getSubscriberCount()).isEqualTo(1);
+    }
+
+    @Test
+    void given2EventsThenOutBoundChannelIsInvokedTwice() {
+        // when
+        underTest.sendEvent(new byte[0]);
+        underTest.sendEvent(new byte[0]);
+        // then
+        assertTrue(outboundRequests.getInterceptors().contains(this.testChannelInterceptor));
+        assertThat(testChannelInterceptor.getInvoked()).isEqualTo(2);
+
+    }
+
+    @Configuration
+    @IntegrationComponentScan
+    @EnableIntegration
+    public static class ContextConfiguration {
+
+        private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+        @Bean
+        public DirectChannel outboundRequests() {
+            return new DirectChannel();
+        }
+
+        @Bean
+        public IntegrationFlow outboundFlow() {
+            return IntegrationFlows.from("outboundRequests") //
+                    .log(LoggingHandler.Level.DEBUG) //
+                    .handle(Jms.outboundAdapter(connectionFactory).destination("destinationChannel")).get();
+        }
+
+        @Component
+        @GlobalChannelInterceptor(patterns = "outboundRequests")
+        public static class TestChannelInterceptor implements ChannelInterceptor {
+
+            private final AtomicInteger invoked = new AtomicInteger();
+
+            public int getInvoked() {
+                return invoked.get();
+            }
+
+            @Override
+            public Message<?> preSend(Message<?> message, MessageChannel channel) {
+                this.invoked.incrementAndGet();
+                return message;
+            }
+
+        }
+
+    }
+}
diff --git a/fineract-provider/src/test/resources/application-test.properties b/fineract-provider/src/test/resources/application-test.properties
index 3de58a747..b43d7260b 100644
--- a/fineract-provider/src/test/resources/application-test.properties
+++ b/fineract-provider/src/test/resources/application-test.properties
@@ -49,6 +49,8 @@ fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
 fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
 fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
 fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
+fineract.events.external.producer.jms.broker-url=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL:tcp://127.0.0.1:61616}
+
 
 management.health.jms.enabled=false