You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ar...@apache.org on 2023/02/07 10:38:47 UTC
[fineract] branch develop updated: FINERACT-1724: Batch worker JMS message acknowledgement support
This is an automated email from the ASF dual-hosted git repository.
arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new 395b7057c FINERACT-1724: Batch worker JMS message acknowledgement support
395b7057c is described below
commit 395b7057cfc9cd166e088a81a841a90193ab5040
Author: Arnold Galovics <ga...@gmail.com>
AuthorDate: Thu Feb 2 14:19:44 2023 +0100
FINERACT-1724: Batch worker JMS message acknowledgement support
---
.../springbatch/InputChannelInterceptor.java | 8 ++-
.../JmsBatchWorkerMessageListener.java | 68 +++++++++++++++++++
.../messagehandler/JmsManagerConfig.java | 4 +-
.../messagehandler/JmsWorkerConfig.java | 53 +++++++++------
.../StepExecutionRequestHandler.java | 77 ++++++++++++++++++++++
5 files changed, 186 insertions(+), 24 deletions(-)
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/InputChannelInterceptor.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/InputChannelInterceptor.java
index e3e71a2b9..19f5d5da8 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/InputChannelInterceptor.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/InputChannelInterceptor.java
@@ -21,6 +21,7 @@ package org.apache.fineract.infrastructure.springbatch;
import org.apache.fineract.infrastructure.core.domain.ActionContext;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.jetbrains.annotations.NotNull;
+import org.springframework.batch.integration.partition.StepExecutionRequest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@@ -30,7 +31,12 @@ import org.springframework.messaging.support.GenericMessage;
public class InputChannelInterceptor implements ExecutorChannelInterceptor {
@Override
- public Message<?> beforeHandle(Message<?> message, @NotNull MessageChannel channel, @NotNull MessageHandler handler) {
+ public Message<StepExecutionRequest> beforeHandle(Message<?> message, @NotNull MessageChannel channel,
+ @NotNull MessageHandler handler) {
+ return beforeHandleMessage(message);
+ }
+
+ public Message<StepExecutionRequest> beforeHandleMessage(Message<?> message) {
ContextualMessage castedMessage = (ContextualMessage) message.getPayload();
ThreadLocalContextUtil.init(castedMessage.getContext());
ThreadLocalContextUtil.setActionContext(ActionContext.COB);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsBatchWorkerMessageListener.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsBatchWorkerMessageListener.java
new file mode 100644
index 000000000..0e4fb2547
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsBatchWorkerMessageListener.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.springbatch.messagehandler;
+
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.infrastructure.springbatch.ContextualMessage;
+import org.apache.fineract.infrastructure.springbatch.InputChannelInterceptor;
+import org.apache.fineract.infrastructure.springbatch.messagehandler.conditions.JmsWorkerCondition;
+import org.springframework.batch.integration.partition.StepExecutionRequest;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.jms.support.converter.MessagingMessageConverter;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+@Conditional(JmsWorkerCondition.class)
+public class JmsBatchWorkerMessageListener implements MessageListener, InitializingBean {
+
+ private final StepExecutionRequestHandler stepExecutionRequestHandler;
+ private final InputChannelInterceptor inputInterceptor;
+ private MessagingMessageConverter converter;
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ converter = new MessagingMessageConverter();
+ }
+
+ @Override
+ @SuppressWarnings({ "unchecked" })
+ public void onMessage(javax.jms.Message message) {
+ try {
+ Message<ContextualMessage> msg = (Message<ContextualMessage>) converter.fromMessage(message);
+ log.debug("Received JMS partition message {}", msg);
+ Message<StepExecutionRequest> requestMessage = inputInterceptor.beforeHandleMessage(msg);
+ stepExecutionRequestHandler.handle(requestMessage.getPayload());
+ } catch (Exception e) {
+ log.error("Exception while processing JMS message", e);
+ }
+
+ try {
+ message.acknowledge();
+ } catch (JMSException e) {
+ throw new RuntimeException("Unable to acknowledge message", e);
+ }
+ }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsManagerConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsManagerConfig.java
index a0bd8c384..475f53d89 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsManagerConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsManagerConfig.java
@@ -18,7 +18,7 @@
*/
package org.apache.fineract.infrastructure.springbatch.messagehandler;
-import org.apache.activemq.ActiveMQConnectionFactory;
+import javax.jms.ConnectionFactory;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.springbatch.OutputChannelInterceptor;
import org.apache.fineract.infrastructure.springbatch.messagehandler.conditions.JmsManagerCondition;
@@ -48,7 +48,7 @@ public class JmsManagerConfig {
private FineractProperties fineractProperties;
@Bean
- public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
+ public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(outboundRequests) //
.intercept(outputInterceptor) //
.log(LoggingHandler.Level.DEBUG) //
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsWorkerConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsWorkerConfig.java
index 3571718f7..636b96182 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsWorkerConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsWorkerConfig.java
@@ -18,42 +18,53 @@
*/
package org.apache.fineract.infrastructure.springbatch.messagehandler;
-import org.apache.activemq.ActiveMQConnectionFactory;
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
-import org.apache.fineract.infrastructure.springbatch.InputChannelInterceptor;
import org.apache.fineract.infrastructure.springbatch.messagehandler.conditions.JmsWorkerCondition;
-import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
+import org.springframework.batch.core.step.StepLocator;
+import org.springframework.batch.integration.partition.BeanFactoryStepLocator;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
-import org.springframework.integration.channel.QueueChannel;
-import org.springframework.integration.dsl.IntegrationFlow;
-import org.springframework.integration.dsl.IntegrationFlows;
-import org.springframework.integration.handler.LoggingHandler;
-import org.springframework.integration.jms.dsl.Jms;
+import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
+import org.springframework.jms.config.SimpleJmsListenerEndpoint;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
-@Configuration
-@EnableBatchIntegration
+@Configuration(proxyBeanMethods = false)
@Conditional(JmsWorkerCondition.class)
@Import(value = { JmsBrokerConfiguration.class })
public class JmsWorkerConfig {
- @Autowired
- private QueueChannel inboundRequests;
- @Autowired
- private InputChannelInterceptor inputInterceptor;
@Autowired
private FineractProperties fineractProperties;
@Bean
- public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
- return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory) //
- .configureListenerContainer(c -> c.subscriptionDurable(false)) //
- .destination(fineractProperties.getRemoteJobMessageHandler().getJms().getRequestQueueName())) //
- .channel(inboundRequests) //
- .intercept(inputInterceptor).log(LoggingHandler.Level.DEBUG) //
- .get();
+ public DefaultJmsListenerContainerFactory jmsBatchWorkerListenerContainerFactory(ConnectionFactory connectionFactory) {
+ DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
+ factory.setConcurrency("1-1"); // at least one consumer and at most one consumer
+ factory.setConnectionFactory(connectionFactory);
+ factory.setPubSubDomain(false);
+ factory.setSessionTransacted(false);
+ factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
+ return factory;
+ }
+
+ @Bean
+ public DefaultMessageListenerContainer jmsBatchWorkerMessageListenerContainer(
+ @Qualifier("jmsBatchWorkerListenerContainerFactory") DefaultJmsListenerContainerFactory factory,
+ JmsBatchWorkerMessageListener messageListener) {
+ SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
+ endpoint.setDestination(fineractProperties.getRemoteJobMessageHandler().getJms().getRequestQueueName());
+ endpoint.setMessageListener(messageListener);
+ return factory.createListenerContainer(endpoint);
+ }
+
+ @Bean
+ public StepLocator stepLocator() {
+ return new BeanFactoryStepLocator();
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/StepExecutionRequestHandler.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/StepExecutionRequestHandler.java
new file mode 100644
index 000000000..60844eb5c
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/StepExecutionRequestHandler.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.springbatch.messagehandler;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.fineract.infrastructure.springbatch.messagehandler.conditions.JmsWorkerCondition;
+import org.springframework.batch.core.BatchStatus;
+import org.springframework.batch.core.JobInterruptedException;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.explore.JobExplorer;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.step.StepLocator;
+import org.springframework.batch.integration.partition.StepExecutionRequest;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.dao.OptimisticLockingFailureException;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+@Conditional(JmsWorkerCondition.class)
+public class StepExecutionRequestHandler {
+
+ private final JobRepository jobRepository;
+ private final StepLocator stepLocator;
+ private final JobExplorer jobExplorer;
+
+ public void handle(StepExecutionRequest request) {
+
+ Long jobExecutionId = request.getJobExecutionId();
+ Long stepExecutionId = request.getStepExecutionId();
+ String stepName = request.getStepName();
+
+ StepExecution stepExecution = jobExplorer.getStepExecution(jobExecutionId, stepExecutionId);
+ if (stepExecution == null) {
+ throw new IllegalStateException("stepExecution cannot be null");
+ }
+
+ /*
+ * no need to check the status of the StepExecution because only a single worker can work on a particular
+ * partition due to the fact that a JMS queue is used and not a topic (i.e. only one consumer receives a single
+ * message)
+ */
+ Step step = stepLocator.getStep(stepName);
+ try {
+ step.execute(stepExecution);
+ } catch (JobInterruptedException e) {
+ // based on org.springframework.batch.core.step.AbstractStep.determineBatchStatus
+ stepExecution.addFailureException(e);
+ stepExecution.setStatus(BatchStatus.STOPPED);
+ } catch (OptimisticLockingFailureException e) {
+ // no need to do anything, just another worker picked up a partition that's being processed
+ // since we're using queues instead of topics, only a single worker should receive the msg
+ } catch (Exception e) {
+ stepExecution.addFailureException(e);
+ stepExecution.setStatus(BatchStatus.FAILED);
+ } finally {
+ jobRepository.update(stepExecution);
+ }
+ }
+}