You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by al...@apache.org on 2022/09/27 15:06:35 UTC
[fineract] branch develop updated: FINERACT-1694 JMS ActiveMQ downstream channel integration for events
This is an automated email from the ASF dual-hosted git repository.
aleks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new ee0184c0a FINERACT-1694 JMS ActiveMQ downstream channel integration for events
ee0184c0a is described below
commit ee0184c0a5823f648b80b5374d88c0851c559b6e
Author: Ruchi Dhamankar <ru...@gmail.com>
AuthorDate: Wed Sep 21 21:42:49 2022 +0530
FINERACT-1694 JMS ActiveMQ downstream channel integration for events
---
.../boot/AbstractApplicationConfiguration.java | 2 +
.../core/config/FineractProperties.java | 1 +
.../ExternalEventJMSBrokerConfiguration.java | 42 ++++++++
.../ExternalEventJMSProducerConfiguration.java | 56 ++++++++++
.../ExternalEventProducerConfiguration.java} | 22 ++--
.../jobs/SendAsynchronousEventsTasklet.java | 7 +-
...pl.java => DummyExternalEventProducerImpl.java} | 7 +-
.../external/producer/ExternalEventProducer.java | 9 +-
.../src/main/resources/application.properties | 1 +
.../jobs/SendAsynchronousEventsTaskletTest.java | 31 ++++--
.../producer/EventsJMSIntegrationTest.java | 120 +++++++++++++++++++++
.../src/test/resources/application-test.properties | 2 +
12 files changed, 276 insertions(+), 24 deletions(-)
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/boot/AbstractApplicationConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/boot/AbstractApplicationConfiguration.java
index 0c0b6c4be..dcc5fe71d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/boot/AbstractApplicationConfiguration.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/boot/AbstractApplicationConfiguration.java
@@ -30,6 +30,7 @@ import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfigurat
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@@ -45,6 +46,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableWebSecurity
@EnableConfigurationProperties({ FineractProperties.class, LiquibaseProperties.class })
@ComponentScan(basePackages = "org.apache.fineract.**")
+@IntegrationComponentScan(basePackages = "org.apache.fineract.**")
public abstract class AbstractApplicationConfiguration {
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index a0639e6aa..996b0116b 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -151,5 +151,6 @@ public class FineractProperties {
private boolean enabled;
private String eventQueueName;
+ private String brokerUrl;
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java
new file mode 100644
index 000000000..44efcb292
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSBrokerConfiguration.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.config;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
+public class ExternalEventJMSBrokerConfiguration {
+
+ @Autowired
+ private FineractProperties fineractProperties;
+
+ @Bean
+ public ActiveMQConnectionFactory connectionFactory() {
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+ connectionFactory.setBrokerURL(fineractProperties.getEvents().getExternal().getProducer().getJms().getBrokerUrl());
+ connectionFactory.setTrustAllPackages(true);
+ return connectionFactory;
+ }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java
new file mode 100644
index 000000000..bc765b146
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSProducerConfiguration.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.config;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.dsl.IntegrationFlows;
+import org.springframework.integration.handler.LoggingHandler;
+import org.springframework.integration.jms.dsl.Jms;
+
+@Configuration
+@EnableIntegration
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
+@Import(value = { ExternalEventJMSBrokerConfiguration.class })
+public class ExternalEventJMSProducerConfiguration {
+
+ @Autowired
+ private DirectChannel outboundRequests;
+
+ @Autowired
+ private FineractProperties fineractProperties;
+
+ @Bean
+ public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
+ return IntegrationFlows.from(outboundRequests) //
+ .log(LoggingHandler.Level.DEBUG) //
+ .handle(Jms.outboundAdapter(connectionFactory)
+ .destination(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventQueueName()))
+ .get();
+ }
+
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java
similarity index 55%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java
index 067307b6e..6896228cb 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventProducerConfiguration.java
@@ -16,17 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.producer;
+package org.apache.fineract.infrastructure.event.external.config;
-import org.apache.fineract.avro.MessageV1;
-import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
-import org.springframework.stereotype.Service;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.config.EnableIntegration;
-@Service
-public class ExternalEventProducerImpl implements ExternalEventProducer {
+@Configuration
+@EnableIntegration
+@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "true")
+public class ExternalEventProducerConfiguration {
- @Override
- public void sendEvent(MessageV1 message) throws AcknowledgementTimeoutException {
- return;
+ @Bean
+ public DirectChannel outboundRequests() {
+ return new DirectChannel();
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
index 27ad7b9ac..1dfe90157 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
@@ -18,6 +18,7 @@
*/
package org.apache.fineract.infrastructure.event.external.jobs;
+import java.io.IOException;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -29,6 +30,7 @@ import org.apache.fineract.infrastructure.event.external.repository.ExternalEven
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
@@ -46,6 +48,7 @@ public class SendAsynchronousEventsTasklet implements Tasklet {
private final ExternalEventRepository repository;
private final ExternalEventProducer eventProducer;
private final MessageFactory messageFactory;
+ private final ByteBufferConverter byteBufferConverter;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
@@ -65,10 +68,10 @@ public class SendAsynchronousEventsTasklet implements Tasklet {
return events;
}
- private void processEvents(List<ExternalEvent> queuedEvents) {
+ private void processEvents(List<ExternalEvent> queuedEvents) throws IOException {
for (ExternalEvent event : queuedEvents) {
MessageV1 message = messageFactory.createMessage(event);
- eventProducer.sendEvent(message);
+ eventProducer.sendEvent(byteBufferConverter.convert(message.toByteBuffer()));
event.setStatus(ExternalEventStatus.SENT);
event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
repository.save(event);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java
similarity index 76%
rename from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
rename to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java
index 067307b6e..4f60f0a31 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/DummyExternalEventProducerImpl.java
@@ -18,15 +18,16 @@
*/
package org.apache.fineract.infrastructure.event.external.producer;
-import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@Service
-public class ExternalEventProducerImpl implements ExternalEventProducer {
+@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "false")
+public class DummyExternalEventProducerImpl implements ExternalEventProducer {
@Override
- public void sendEvent(MessageV1 message) throws AcknowledgementTimeoutException {
+ public void sendEvent(byte[] message) throws AcknowledgementTimeoutException {
return;
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
index 355f57bee..bf720ca29 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
@@ -18,10 +18,15 @@
*/
package org.apache.fineract.infrastructure.event.external.producer;
-import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.integration.annotation.Gateway;
+import org.springframework.integration.annotation.MessagingGateway;
+@MessagingGateway(name = "externalEventGateway")
+@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "true")
public interface ExternalEventProducer {
- void sendEvent(MessageV1 message) throws AcknowledgementTimeoutException;
+ @Gateway(requestChannel = "outboundRequests", replyTimeout = 2, requestTimeout = 200)
+ void sendEvent(byte[] message) throws AcknowledgementTimeoutException;
}
diff --git a/fineract-provider/src/main/resources/application.properties b/fineract-provider/src/main/resources/application.properties
index 44a6c83f8..73a4864ff 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -57,6 +57,7 @@ fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
+fineract.events.external.producer.jms.broker-url=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL:tcp://127.0.0.1:61616}
# Logging pattern for the console
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
index 8550fbb5b..fe7c89778 100644
--- a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.Arrays;
@@ -43,6 +44,7 @@ import org.apache.fineract.infrastructure.event.external.repository.ExternalEven
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -69,6 +71,8 @@ class SendAsynchronousEventsTaskletTest {
private StepContribution stepContribution;
@Mock
private ChunkContext chunkContext;
+ @Mock
+ private ByteBufferConverter byteBufferConverter;
private SendAsynchronousEventsTasklet underTest;
private RepeatStatus resultStatus;
@@ -78,7 +82,7 @@ class SendAsynchronousEventsTaskletTest {
ThreadLocalContextUtil
.setBusinessDates(new HashMap<>(Map.of(BusinessDateType.BUSINESS_DATE, LocalDate.now(ZoneId.systemDefault()))));
configureExternalEventsProducerReadBatchSizeProperty(1000);
- underTest = new SendAsynchronousEventsTasklet(fineractProperties, repository, eventProducer, messageFactory);
+ underTest = new SendAsynchronousEventsTasklet(fineractProperties, repository, eventProducer, messageFactory, byteBufferConverter);
}
private void configureExternalEventsProducerReadBatchSizeProperty(int readBatchSize) {
@@ -97,13 +101,18 @@ class SendAsynchronousEventsTaskletTest {
// given
List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aSchema", new byte[0], "aIdemtpotencyKey"),
new ExternalEvent("aType", "aSchema", new byte[0], "aIdemtpotencyKey"));
+ // Dummy Message
+ MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aTennantId", "anidempotencyKey",
+ "aSchema", Mockito.mock(ByteBuffer.class));
+
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
- when(messageFactory.createMessage(Mockito.any())).thenReturn(new MessageV1());
- doNothing().when(eventProducer).sendEvent(Mockito.any(MessageV1.class));
+ when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
+ when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
+ doNothing().when(eventProducer).sendEvent(Mockito.any());
// when
resultStatus = this.underTest.execute(stepContribution, chunkContext);
// then
- verify(eventProducer, times(2)).sendEvent(Mockito.any(MessageV1.class));
+ verify(eventProducer, times(2)).sendEvent(new byte[0]);
verify(repository, times(2)).save(Mockito.any(ExternalEvent.class));
assertEquals(RepeatStatus.FINISHED, resultStatus);
}
@@ -113,10 +122,13 @@ class SendAsynchronousEventsTaskletTest {
// given
List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aSchema", new byte[0], "aIdemtpotencyKey"),
new ExternalEvent("aType", "aSchema", new byte[0], "aIdemtpotencyKey"));
+ MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aTennantId", "anidempotencyKey",
+ "aSchema", Mockito.mock(ByteBuffer.class));
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
- when(messageFactory.createMessage(Mockito.any())).thenReturn(new MessageV1());
+ when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
+ when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
doThrow(new AcknowledgementTimeoutException("Event Send Exception", new RuntimeException())).when(eventProducer)
- .sendEvent(Mockito.any(MessageV1.class));
+ .sendEvent(Mockito.any());
// when
resultStatus = this.underTest.execute(stepContribution, chunkContext);
// then
@@ -129,9 +141,12 @@ class SendAsynchronousEventsTaskletTest {
// given
ArgumentCaptor<ExternalEvent> externalEventArgumentCaptor = ArgumentCaptor.forClass(ExternalEvent.class);
List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", "aSchema", new byte[0], "aIdemtpotencyKey"));
+ MessageV1 dummyMessage = new MessageV1(1, "aSource", "aType", "nocategory", "aCreateDate", "aTennantId", "anidempotencyKey",
+ "aSchema", Mockito.mock(ByteBuffer.class));
when(repository.findByStatusOrderById(Mockito.any(), Mockito.any())).thenReturn(events);
- when(messageFactory.createMessage(Mockito.any())).thenReturn(new MessageV1());
- doNothing().when(eventProducer).sendEvent(Mockito.any(MessageV1.class));
+ when(messageFactory.createMessage(Mockito.any())).thenReturn(dummyMessage);
+ when(byteBufferConverter.convert(Mockito.any(ByteBuffer.class))).thenReturn(new byte[0]);
+ doNothing().when(eventProducer).sendEvent(Mockito.any());
// when
resultStatus = this.underTest.execute(stepContribution, chunkContext);
// then
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java
new file mode 100644
index 000000000..d7a611d93
--- /dev/null
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/EventsJMSIntegrationTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.ConnectionFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.config.GlobalChannelInterceptor;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.dsl.IntegrationFlows;
+import org.springframework.integration.handler.LoggingHandler;
+import org.springframework.integration.jms.dsl.Jms;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.stereotype.Component;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+@ExtendWith(SpringExtension.class)
+@ContextConfiguration
+@DirtiesContext
+@TestPropertySource(properties = { "fineract.events.external.enabled=true" })
+public class EventsJMSIntegrationTest {
+
+ @Autowired
+ @Qualifier("outboundRequests")
+ private DirectChannel outboundRequests;
+
+ @Autowired
+ private ContextConfiguration.TestChannelInterceptor testChannelInterceptor;
+
+ @Autowired
+ private ExternalEventProducer underTest;
+
+ @Test
+ public void testJmsDownstreamChannelIntegration() {
+ assertThat(outboundRequests.getSubscriberCount()).isEqualTo(1);
+ }
+
+ @Test
+ void given2EventsThenOutBoundChannelIsInvokedTwice() {
+ // when
+ underTest.sendEvent(new byte[0]);
+ underTest.sendEvent(new byte[0]);
+ // then
+ assertTrue(outboundRequests.getInterceptors().contains(this.testChannelInterceptor));
+ assertThat(testChannelInterceptor.getInvoked()).isEqualTo(2);
+
+ }
+
+ @Configuration
+ @IntegrationComponentScan
+ @EnableIntegration
+ public static class ContextConfiguration {
+
+ private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+ @Bean
+ public DirectChannel outboundRequests() {
+ return new DirectChannel();
+ }
+
+ @Bean
+ public IntegrationFlow outboundFlow() {
+ return IntegrationFlows.from("outboundRequests") //
+ .log(LoggingHandler.Level.DEBUG) //
+ .handle(Jms.outboundAdapter(connectionFactory).destination("destinationChannel")).get();
+ }
+
+ @Component
+ @GlobalChannelInterceptor(patterns = "outboundRequests")
+ public static class TestChannelInterceptor implements ChannelInterceptor {
+
+ private final AtomicInteger invoked = new AtomicInteger();
+
+ public int getInvoked() {
+ return invoked.get();
+ }
+
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel) {
+ this.invoked.incrementAndGet();
+ return message;
+ }
+
+ }
+
+ }
+}
diff --git a/fineract-provider/src/test/resources/application-test.properties b/fineract-provider/src/test/resources/application-test.properties
index 3de58a747..b43d7260b 100644
--- a/fineract-provider/src/test/resources/application-test.properties
+++ b/fineract-provider/src/test/resources/application-test.properties
@@ -49,6 +49,8 @@ fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
+fineract.events.external.producer.jms.broker-url=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL:tcp://127.0.0.1:61616}
+
management.health.jms.enabled=false