You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ar...@apache.org on 2023/02/07 10:38:47 UTC

[fineract] branch develop updated: FINERACT-1724: Batch worker JMS message acknowledgement support

This is an automated email from the ASF dual-hosted git repository.

arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new 395b7057c FINERACT-1724: Batch worker JMS message acknowledgement support
395b7057c is described below

commit 395b7057cfc9cd166e088a81a841a90193ab5040
Author: Arnold Galovics <ga...@gmail.com>
AuthorDate: Thu Feb 2 14:19:44 2023 +0100

    FINERACT-1724: Batch worker JMS message acknowledgement support
---
 .../springbatch/InputChannelInterceptor.java       |  8 ++-
 .../JmsBatchWorkerMessageListener.java             | 68 +++++++++++++++++++
 .../messagehandler/JmsManagerConfig.java           |  4 +-
 .../messagehandler/JmsWorkerConfig.java            | 53 +++++++++------
 .../StepExecutionRequestHandler.java               | 77 ++++++++++++++++++++++
 5 files changed, 186 insertions(+), 24 deletions(-)

diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/InputChannelInterceptor.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/InputChannelInterceptor.java
index e3e71a2b9..19f5d5da8 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/InputChannelInterceptor.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/InputChannelInterceptor.java
@@ -21,6 +21,7 @@ package org.apache.fineract.infrastructure.springbatch;
 import org.apache.fineract.infrastructure.core.domain.ActionContext;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
 import org.jetbrains.annotations.NotNull;
+import org.springframework.batch.integration.partition.StepExecutionRequest;
 import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
@@ -30,7 +31,12 @@ import org.springframework.messaging.support.GenericMessage;
 public class InputChannelInterceptor implements ExecutorChannelInterceptor {
 
     @Override
-    public Message<?> beforeHandle(Message<?> message, @NotNull MessageChannel channel, @NotNull MessageHandler handler) {
+    public Message<StepExecutionRequest> beforeHandle(Message<?> message, @NotNull MessageChannel channel,
+            @NotNull MessageHandler handler) {
+        return beforeHandleMessage(message);
+    }
+
+    public Message<StepExecutionRequest> beforeHandleMessage(Message<?> message) {
         ContextualMessage castedMessage = (ContextualMessage) message.getPayload();
         ThreadLocalContextUtil.init(castedMessage.getContext());
         ThreadLocalContextUtil.setActionContext(ActionContext.COB);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsBatchWorkerMessageListener.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsBatchWorkerMessageListener.java
new file mode 100644
index 000000000..0e4fb2547
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsBatchWorkerMessageListener.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.springbatch.messagehandler;
+
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.infrastructure.springbatch.ContextualMessage;
+import org.apache.fineract.infrastructure.springbatch.InputChannelInterceptor;
+import org.apache.fineract.infrastructure.springbatch.messagehandler.conditions.JmsWorkerCondition;
+import org.springframework.batch.integration.partition.StepExecutionRequest;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.jms.support.converter.MessagingMessageConverter;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+@Conditional(JmsWorkerCondition.class)
+public class JmsBatchWorkerMessageListener implements MessageListener, InitializingBean {
+
+    private final StepExecutionRequestHandler stepExecutionRequestHandler;
+    private final InputChannelInterceptor inputInterceptor;
+    private MessagingMessageConverter converter;
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        converter = new MessagingMessageConverter();
+    }
+
+    @Override
+    @SuppressWarnings({ "unchecked" })
+    public void onMessage(javax.jms.Message message) {
+        try {
+            Message<ContextualMessage> msg = (Message<ContextualMessage>) converter.fromMessage(message);
+            log.debug("Received JMS partition message {}", msg);
+            Message<StepExecutionRequest> requestMessage = inputInterceptor.beforeHandleMessage(msg);
+            stepExecutionRequestHandler.handle(requestMessage.getPayload());
+        } catch (Exception e) {
+            log.error("Exception while processing JMS message", e);
+        }
+
+        try {
+            message.acknowledge();
+        } catch (JMSException e) {
+            throw new RuntimeException("Unable to acknowledge message", e);
+        }
+    }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsManagerConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsManagerConfig.java
index a0bd8c384..475f53d89 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsManagerConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsManagerConfig.java
@@ -18,7 +18,7 @@
  */
 package org.apache.fineract.infrastructure.springbatch.messagehandler;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
+import javax.jms.ConnectionFactory;
 import org.apache.fineract.infrastructure.core.config.FineractProperties;
 import org.apache.fineract.infrastructure.springbatch.OutputChannelInterceptor;
 import org.apache.fineract.infrastructure.springbatch.messagehandler.conditions.JmsManagerCondition;
@@ -48,7 +48,7 @@ public class JmsManagerConfig {
     private FineractProperties fineractProperties;
 
     @Bean
-    public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
+    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
         return IntegrationFlows.from(outboundRequests) //
                 .intercept(outputInterceptor) //
                 .log(LoggingHandler.Level.DEBUG) //
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsWorkerConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsWorkerConfig.java
index 3571718f7..636b96182 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsWorkerConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/JmsWorkerConfig.java
@@ -18,42 +18,53 @@
  */
 package org.apache.fineract.infrastructure.springbatch.messagehandler;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
 import org.apache.fineract.infrastructure.core.config.FineractProperties;
-import org.apache.fineract.infrastructure.springbatch.InputChannelInterceptor;
 import org.apache.fineract.infrastructure.springbatch.messagehandler.conditions.JmsWorkerCondition;
-import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
+import org.springframework.batch.core.step.StepLocator;
+import org.springframework.batch.integration.partition.BeanFactoryStepLocator;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Conditional;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
-import org.springframework.integration.channel.QueueChannel;
-import org.springframework.integration.dsl.IntegrationFlow;
-import org.springframework.integration.dsl.IntegrationFlows;
-import org.springframework.integration.handler.LoggingHandler;
-import org.springframework.integration.jms.dsl.Jms;
+import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
+import org.springframework.jms.config.SimpleJmsListenerEndpoint;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
 
-@Configuration
-@EnableBatchIntegration
+@Configuration(proxyBeanMethods = false)
 @Conditional(JmsWorkerCondition.class)
 @Import(value = { JmsBrokerConfiguration.class })
 public class JmsWorkerConfig {
 
-    @Autowired
-    private QueueChannel inboundRequests;
-    @Autowired
-    private InputChannelInterceptor inputInterceptor;
     @Autowired
     private FineractProperties fineractProperties;
 
     @Bean
-    public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
-        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory) //
-                .configureListenerContainer(c -> c.subscriptionDurable(false)) //
-                .destination(fineractProperties.getRemoteJobMessageHandler().getJms().getRequestQueueName())) //
-                .channel(inboundRequests) //
-                .intercept(inputInterceptor).log(LoggingHandler.Level.DEBUG) //
-                .get();
+    public DefaultJmsListenerContainerFactory jmsBatchWorkerListenerContainerFactory(ConnectionFactory connectionFactory) {
+        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
+        factory.setConcurrency("1-1"); // at least one consumer and at most one consumer
+        factory.setConnectionFactory(connectionFactory);
+        factory.setPubSubDomain(false);
+        factory.setSessionTransacted(false);
+        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
+        return factory;
+    }
+
+    @Bean
+    public DefaultMessageListenerContainer jmsBatchWorkerMessageListenerContainer(
+            @Qualifier("jmsBatchWorkerListenerContainerFactory") DefaultJmsListenerContainerFactory factory,
+            JmsBatchWorkerMessageListener messageListener) {
+        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
+        endpoint.setDestination(fineractProperties.getRemoteJobMessageHandler().getJms().getRequestQueueName());
+        endpoint.setMessageListener(messageListener);
+        return factory.createListenerContainer(endpoint);
+    }
+
+    @Bean
+    public StepLocator stepLocator() {
+        return new BeanFactoryStepLocator();
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/StepExecutionRequestHandler.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/StepExecutionRequestHandler.java
new file mode 100644
index 000000000..60844eb5c
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/messagehandler/StepExecutionRequestHandler.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.springbatch.messagehandler;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.fineract.infrastructure.springbatch.messagehandler.conditions.JmsWorkerCondition;
+import org.springframework.batch.core.BatchStatus;
+import org.springframework.batch.core.JobInterruptedException;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.explore.JobExplorer;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.step.StepLocator;
+import org.springframework.batch.integration.partition.StepExecutionRequest;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.dao.OptimisticLockingFailureException;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+@Conditional(JmsWorkerCondition.class)
+public class StepExecutionRequestHandler {
+
+    private final JobRepository jobRepository;
+    private final StepLocator stepLocator;
+    private final JobExplorer jobExplorer;
+
+    public void handle(StepExecutionRequest request) {
+
+        Long jobExecutionId = request.getJobExecutionId();
+        Long stepExecutionId = request.getStepExecutionId();
+        String stepName = request.getStepName();
+
+        StepExecution stepExecution = jobExplorer.getStepExecution(jobExecutionId, stepExecutionId);
+        if (stepExecution == null) {
+            throw new IllegalStateException("stepExecution cannot be null");
+        }
+
+        /*
+         * no need to check the status of the StepExecution because only a single worker can work on a particular
+         * partition due to the fact that a JMS queue is used and not a topic (i.e. only one consumer receives a single
+         * message)
+         */
+        Step step = stepLocator.getStep(stepName);
+        try {
+            step.execute(stepExecution);
+        } catch (JobInterruptedException e) {
+            // based on org.springframework.batch.core.step.AbstractStep.determineBatchStatus
+            stepExecution.addFailureException(e);
+            stepExecution.setStatus(BatchStatus.STOPPED);
+        } catch (OptimisticLockingFailureException e) {
+            // no need to do anything, just another worker picked up a partition that's being processed
+            // since we're using queues instead of topics, only a single worker should receive the msg
+        } catch (Exception e) {
+            stepExecution.addFailureException(e);
+            stepExecution.setStatus(BatchStatus.FAILED);
+        } finally {
+            jobRepository.update(stepExecution);
+        }
+    }
+}