You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ad...@apache.org on 2023/06/14 13:06:37 UTC

[fineract] branch develop updated: [FINERACT-1941] change task executors to threadpool task executor

This is an automated email from the ASF dual-hosted git repository.

adamsaghy pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new 78d9a0201 [FINERACT-1941] change task executors to threadpool task executor
78d9a0201 is described below

commit 78d9a0201eb069986cb33e821a1a5e350461eba1
Author: taskain7 <ta...@gmail.com>
AuthorDate: Wed Jun 14 02:57:47 2023 +0200

    [FINERACT-1941] change task executors to threadpool task executor
---
 .../core/config/FineractProperties.java            | 12 +++++
 .../service/AsyncLoanCOBExecutorServiceImpl.java   |  3 +-
 .../SendMessageToSmsGatewayConfig.java             |  8 +++-
 .../SendMessageToSmsGatewayTasklet.java            | 15 ++----
 .../configuration/async/SpringAsyncConfig.java     |  3 +-
 .../core/config/TaskExecutorConfig.java            | 50 +++++++++++++++++++
 .../core/config/TaskExecutorConstant.java          | 31 ++++++++++++
 .../config/ExternalEventJMSConfiguration.java      |  9 ++--
 .../jms/JMSMultiExternalEventProducer.java         |  3 +-
 .../SmsMessageScheduledJobServiceImpl.java         | 22 ++++-----
 .../RecalculateInterestForLoanConfig.java          |  9 +++-
 .../RecalculateInterestForLoanTasklet.java         | 26 +++++-----
 .../PostInterestForSavingTasklet.java              | 56 +++++++++++-----------
 .../src/main/resources/application.properties      |  5 ++
 .../src/test/resources/application-test.properties |  5 ++
 15 files changed, 182 insertions(+), 75 deletions(-)

diff --git a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index 8f15543a6..cbfae0298 100644
--- a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++ b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -46,6 +46,8 @@ public class FineractProperties {
 
     private FineractEventsProperties events;
 
+    private FineractTaskExecutor taskExecutor;
+
     private FineractContentProperties content;
 
     private FineractReportProperties report;
@@ -190,6 +192,14 @@ public class FineractProperties {
         private FineractExternalEventsProperties external;
     }
 
+    @Getter
+    @Setter
+    public static class FineractTaskExecutor {
+
+        private int defaultTaskExecutorCorePoolSize;
+        private int defaultTaskExecutorMaxPoolSize;
+    }
+
     @Getter
     @Setter
     public static class FineractExternalEventsProperties {
@@ -217,6 +227,8 @@ public class FineractProperties {
         private String brokerPassword;
         private int producerCount;
         private boolean asyncSendEnabled;
+        private int threadPoolTaskExecutorCorePoolSize;
+        private int threadPoolTaskExecutorMaxPoolSize;
 
         public boolean isBrokerPasswordProtected() {
             return StringUtils.isNotBlank(brokerUsername) || StringUtils.isNotBlank(brokerPassword);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
index 6f3fc7fba..d92538cc4 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.fineract.cob.data.LoanIdAndLastClosedBusinessDate;
 import org.apache.fineract.cob.loan.LoanCOBConstant;
 import org.apache.fineract.cob.loan.RetrieveLoanIdService;
 import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
 import org.apache.fineract.infrastructure.core.domain.FineractContext;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
 import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
@@ -58,7 +59,7 @@ public class AsyncLoanCOBExecutorServiceImpl implements AsyncLoanCOBExecutorServ
     private final RetrieveLoanIdService retrieveLoanIdService;
 
     @Override
-    @Async("loanCOBCatchUpThreadPoolTaskExecutor")
+    @Async(TaskExecutorConstant.LOAN_COB_CATCH_UP_TASK_EXECUTOR_BEAN_NAME)
     public void executeLoanCOBCatchUpAsync(FineractContext context) {
         try {
             ThreadLocalContextUtil.init(context);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayConfig.java
index 0b4fbb8db..5fff6f6b3 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayConfig.java
@@ -19,6 +19,7 @@
 package org.apache.fineract.infrastructure.campaigns.jobs.sendmessagetosmsgateway;
 
 import org.apache.fineract.infrastructure.campaigns.helper.SmsConfigUtils;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
 import org.apache.fineract.infrastructure.gcm.service.NotificationSenderService;
 import org.apache.fineract.infrastructure.jobs.service.JobName;
 import org.apache.fineract.infrastructure.sms.domain.SmsMessageRepository;
@@ -29,8 +30,10 @@ import org.springframework.batch.core.launch.support.RunIdIncrementer;
 import org.springframework.batch.core.repository.JobRepository;
 import org.springframework.batch.core.step.builder.StepBuilder;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.transaction.PlatformTransactionManager;
 
 @Configuration
@@ -46,6 +49,9 @@ public class SendMessageToSmsGatewayConfig {
     private NotificationSenderService notificationSenderService;
     @Autowired
     private SmsConfigUtils smsConfigUtils;
+    @Autowired
+    @Qualifier(TaskExecutorConstant.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
+    private ThreadPoolTaskExecutor taskExecutor;
 
     @Bean
     protected Step sendMessageToSmsGatewayStep() {
@@ -61,6 +67,6 @@ public class SendMessageToSmsGatewayConfig {
 
     @Bean
     public SendMessageToSmsGatewayTasklet sendMessageToSmsGatewayTasklet() {
-        return new SendMessageToSmsGatewayTasklet(smsMessageRepository, notificationSenderService, smsConfigUtils);
+        return new SendMessageToSmsGatewayTasklet(smsMessageRepository, notificationSenderService, smsConfigUtils, taskExecutor);
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayTasklet.java
index a263ac6e7..edd9abbc7 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayTasklet.java
@@ -18,15 +18,12 @@
  */
 package org.apache.fineract.infrastructure.campaigns.jobs.sendmessagetosmsgateway;
 
-import jakarta.annotation.PostConstruct;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
@@ -52,6 +49,7 @@ import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpMethod;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.web.client.RestTemplate;
 
 @Slf4j
@@ -59,16 +57,11 @@ import org.springframework.web.client.RestTemplate;
 public class SendMessageToSmsGatewayTasklet implements Tasklet {
 
     private final SmsMessageRepository smsMessageRepository;
-    private ExecutorService genericExecutorService;
     private final NotificationSenderService notificationSenderService;
     private final SmsConfigUtils smsConfigUtils;
+    private final ThreadPoolTaskExecutor taskExecutor;
     private final RestTemplate restTemplate = new RestTemplate();
 
-    @PostConstruct
-    public void initializeExecutorService() {
-        genericExecutorService = Executors.newSingleThreadExecutor();
-    }
-
     @Override
     public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
         int pageLimit = 200;
@@ -102,7 +95,7 @@ public class SendMessageToSmsGatewayTasklet implements Tasklet {
                     if (!toSaveMessages.isEmpty()) {
                         smsMessageRepository.saveAll(toSaveMessages);
                         smsMessageRepository.flush();
-                        genericExecutorService.execute(new SmsTask(ThreadLocalContextUtil.getTenant(), apiQueueResourceDataCollection));
+                        taskExecutor.execute(new SmsTask(ThreadLocalContextUtil.getTenant(), apiQueueResourceDataCollection));
                     }
                     if (!toSendNotificationMessages.isEmpty()) {
                         notificationSenderService.sendNotification(toSendNotificationMessages);
@@ -135,7 +128,7 @@ public class SendMessageToSmsGatewayTasklet implements Tasklet {
 
         @Override
         public void onApplicationEvent(ContextClosedEvent event) {
-            genericExecutorService.shutdown();
+            taskExecutor.shutdown();
             log.info("Shutting down the ExecutorService");
         }
     }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/configuration/async/SpringAsyncConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/configuration/async/SpringAsyncConfig.java
index a731937e5..12c1b0a9b 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/configuration/async/SpringAsyncConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/configuration/async/SpringAsyncConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.fineract.infrastructure.configuration.async;
 
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
 import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -29,7 +30,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 @EnableAsync
 public class SpringAsyncConfig implements AsyncConfigurer {
 
-    @Bean(name = "loanCOBCatchUpThreadPoolTaskExecutor")
+    @Bean(name = TaskExecutorConstant.LOAN_COB_CATCH_UP_TASK_EXECUTOR_BEAN_NAME)
     public ThreadPoolTaskExecutor loanCOBCatchUpThreadPoolTaskExecutor() {
         ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
         threadPoolTaskExecutor.setMaxPoolSize(1);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConfig.java
new file mode 100644
index 000000000..9a1b1cdc0
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConfig.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.core.config;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Scope;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@Configuration
+public class TaskExecutorConfig {
+
+    @Autowired
+    private FineractProperties fineractProperties;
+
+    @Bean(TaskExecutorConstant.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
+    public ThreadPoolTaskExecutor fineractDefaultThreadPoolTaskExecutor() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+        threadPoolTaskExecutor.setCorePoolSize(fineractProperties.getTaskExecutor().getDefaultTaskExecutorCorePoolSize());
+        threadPoolTaskExecutor.setMaxPoolSize(fineractProperties.getTaskExecutor().getDefaultTaskExecutorMaxPoolSize());
+        return threadPoolTaskExecutor;
+    }
+
+    @Bean(TaskExecutorConstant.CONFIGURABLE_TASK_EXECUTOR_BEAN_NAME)
+    @Scope(scopeName = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+    public ThreadPoolTaskExecutor fineractConfigurableThreadPoolTaskExecutor() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+        threadPoolTaskExecutor.setCorePoolSize(fineractProperties.getTaskExecutor().getDefaultTaskExecutorCorePoolSize());
+        threadPoolTaskExecutor.setMaxPoolSize(fineractProperties.getTaskExecutor().getDefaultTaskExecutorMaxPoolSize());
+        return threadPoolTaskExecutor;
+    }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConstant.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConstant.java
new file mode 100644
index 000000000..59c066824
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConstant.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.core.config;
+
+public final class TaskExecutorConstant {
+
+    private TaskExecutorConstant() {
+
+    }
+
+    public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "fineractDefaultThreadPoolTaskExecutor";
+    public static final String CONFIGURABLE_TASK_EXECUTOR_BEAN_NAME = "fineractConfigurableThreadPoolTaskExecutor";
+    public static final String EVENT_TASK_EXECUTOR_BEAN_NAME = "externalEventJmsProducerExecutor";
+    public static final String LOAN_COB_CATCH_UP_TASK_EXECUTOR_BEAN_NAME = "loanCOBCatchUpThreadPoolTaskExecutor";
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
index 228b155da..5ec6b5b59 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.fineract.infrastructure.core.config.FineractProperties;
 import org.apache.fineract.infrastructure.core.config.FineractProperties.FineractExternalEventsProducerJmsProperties;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
@@ -68,11 +69,13 @@ public class ExternalEventJMSConfiguration {
         return new ActiveMQQueue(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventQueueName());
     }
 
-    @Bean("externalEventJmsProducerExecutor")
+    @Bean(TaskExecutorConstant.EVENT_TASK_EXECUTOR_BEAN_NAME)
     public ThreadPoolTaskExecutor externalEventJmsProducerExecutor() {
         ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
-        threadPoolTaskExecutor.setCorePoolSize(10);
-        threadPoolTaskExecutor.setMaxPoolSize(100);
+        threadPoolTaskExecutor.setCorePoolSize(
+                fineractProperties.getEvents().getExternal().getProducer().getJms().getThreadPoolTaskExecutorCorePoolSize());
+        threadPoolTaskExecutor
+                .setMaxPoolSize(fineractProperties.getEvents().getExternal().getProducer().getJms().getThreadPoolTaskExecutorMaxPoolSize());
         threadPoolTaskExecutor.setThreadNamePrefix("externalEventJms");
         return threadPoolTaskExecutor;
     }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
index 233ece737..f6fbb1123 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
@@ -38,6 +38,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
 import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
 import org.apache.fineract.infrastructure.core.service.HashingService;
 import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
@@ -61,7 +62,7 @@ public class JMSMultiExternalEventProducer implements ExternalEventProducer {
 
     private final MessageFactory messageFactory;
 
-    @Qualifier("externalEventJmsProducerExecutor")
+    @Qualifier(TaskExecutorConstant.EVENT_TASK_EXECUTOR_BEAN_NAME)
     private final AsyncTaskExecutor taskExecutor;
 
     private final HashingService hashingService;
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/sms/scheduler/SmsMessageScheduledJobServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/sms/scheduler/SmsMessageScheduledJobServiceImpl.java
index daea2192e..3dfbc46ec 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/sms/scheduler/SmsMessageScheduledJobServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/sms/scheduler/SmsMessageScheduledJobServiceImpl.java
@@ -18,21 +18,19 @@
  */
 package org.apache.fineract.infrastructure.sms.scheduler;
 
-import jakarta.annotation.PostConstruct;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.fineract.infrastructure.campaigns.helper.SmsConfigUtils;
 import org.apache.fineract.infrastructure.campaigns.sms.constants.SmsCampaignConstants;
 import org.apache.fineract.infrastructure.campaigns.sms.domain.SmsCampaign;
 import org.apache.fineract.infrastructure.campaigns.sms.exception.ConnectionFailureException;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
 import org.apache.fineract.infrastructure.core.domain.FineractContext;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
 import org.apache.fineract.infrastructure.gcm.service.NotificationSenderService;
@@ -40,6 +38,7 @@ import org.apache.fineract.infrastructure.sms.data.SmsMessageApiQueueResourceDat
 import org.apache.fineract.infrastructure.sms.domain.SmsMessage;
 import org.apache.fineract.infrastructure.sms.domain.SmsMessageRepository;
 import org.apache.fineract.infrastructure.sms.domain.SmsMessageStatusType;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.ApplicationListener;
 import org.springframework.context.event.ContextClosedEvent;
 import org.springframework.core.ParameterizedTypeReference;
@@ -47,6 +46,7 @@ import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpMethod;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
@@ -62,14 +62,8 @@ public class SmsMessageScheduledJobServiceImpl implements SmsMessageScheduledJob
     private final RestTemplate restTemplate = new RestTemplate();
     private final SmsConfigUtils smsConfigUtils;
     private final NotificationSenderService notificationSenderService;
-    private ExecutorService genericExecutorService;
-    private ExecutorService triggeredExecutorService;
-
-    @PostConstruct
-    public void initializeExecutorService() {
-        genericExecutorService = Executors.newSingleThreadExecutor();
-        triggeredExecutorService = Executors.newSingleThreadExecutor();
-    }
+    @Qualifier(TaskExecutorConstant.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
+    private final ThreadPoolTaskExecutor taskExecutor;
 
     private void connectAndSendToIntermediateServer(Collection<SmsMessageApiQueueResourceData> apiQueueResourceDatas) {
         Map<String, Object> hostConfig = this.smsConfigUtils.getMessageGateWayRequestURI("sms",
@@ -114,7 +108,7 @@ public class SmsMessageScheduledJobServiceImpl implements SmsMessageScheduledJob
                     if (toSaveMessages.size() > 0) {
                         this.smsMessageRepository.saveAll(toSaveMessages);
                         this.smsMessageRepository.flush();
-                        this.triggeredExecutorService.execute(new SmsTask(apiQueueResourceDatas, ThreadLocalContextUtil.getContext()));
+                        this.taskExecutor.execute(new SmsTask(apiQueueResourceDatas, ThreadLocalContextUtil.getContext()));
                     }
                     if (!toSendNotificationMessages.isEmpty()) {
                         this.notificationSenderService.sendNotification(toSendNotificationMessages);
@@ -141,7 +135,7 @@ public class SmsMessageScheduledJobServiceImpl implements SmsMessageScheduledJob
             this.smsMessageRepository.saveAll(smsMessages);
             request.append(SmsMessageApiQueueResourceData.toJsonString(apiQueueResourceDatas));
             log.debug("Sending triggered SMS to specific provider with request - {}", request);
-            this.triggeredExecutorService.execute(new SmsTask(apiQueueResourceDatas, ThreadLocalContextUtil.getContext()));
+            this.taskExecutor.execute(new SmsTask(apiQueueResourceDatas, ThreadLocalContextUtil.getContext()));
         } catch (Exception e) {
             log.error("Error occured.", e);
         }
@@ -165,7 +159,7 @@ public class SmsMessageScheduledJobServiceImpl implements SmsMessageScheduledJob
 
         @Override
         public void onApplicationEvent(ContextClosedEvent event) {
-            genericExecutorService.shutdown();
+            taskExecutor.shutdown();
             log.info("Shutting down the ExecutorService");
         }
     }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanConfig.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanConfig.java
index eb02c057c..1a84505b1 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.fineract.portfolio.loanaccount.jobs.recalculateinterestforloan;
 
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
 import org.apache.fineract.infrastructure.jobs.service.JobName;
 import org.apache.fineract.organisation.office.service.OfficeReadPlatformService;
 import org.apache.fineract.portfolio.loanaccount.service.LoanReadPlatformService;
@@ -30,8 +31,10 @@ import org.springframework.batch.core.launch.support.RunIdIncrementer;
 import org.springframework.batch.core.repository.JobRepository;
 import org.springframework.batch.core.step.builder.StepBuilder;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.transaction.PlatformTransactionManager;
 
 @Configuration
@@ -54,6 +57,10 @@ public class RecalculateInterestForLoanConfig {
     @Autowired
     private OfficeReadPlatformService officeReadPlatformService;
 
+    @Autowired
+    @Qualifier(TaskExecutorConstant.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
+    private ThreadPoolTaskExecutor taskExecutor;
+
     @Bean
     protected Step recalculateInterestForLoanStep() {
         return new StepBuilder(JobName.RECALCULATE_INTEREST_FOR_LOAN.name(), jobRepository)
@@ -69,6 +76,6 @@ public class RecalculateInterestForLoanConfig {
     @Bean
     public RecalculateInterestForLoanTasklet recalculateInterestForLoanTasklet() {
         return new RecalculateInterestForLoanTasklet(loanReadPlatformService, loanWritePlatformService, recalculateInterestPoster,
-                officeReadPlatformService);
+                officeReadPlatformService, taskExecutor);
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanTasklet.java
index 586dff06a..5ab55574d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanTasklet.java
@@ -25,12 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
 import org.apache.fineract.infrastructure.jobs.exception.JobExecutionException;
 import org.apache.fineract.organisation.office.data.OfficeData;
 import org.apache.fineract.organisation.office.exception.OfficeNotFoundException;
@@ -43,6 +42,8 @@ import org.springframework.batch.core.StepContribution;
 import org.springframework.batch.core.scope.context.ChunkContext;
 import org.springframework.batch.core.step.tasklet.Tasklet;
 import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -52,6 +53,8 @@ public class RecalculateInterestForLoanTasklet implements Tasklet {
     private final LoanWritePlatformService loanWritePlatformService;
     private final RecalculateInterestPoster recalculateInterestPoster;
     private final OfficeReadPlatformService officeReadPlatformService;
+    @Qualifier(TaskExecutorConstant.CONFIGURABLE_TASK_EXECUTOR_BEAN_NAME)
+    private final ThreadPoolTaskExecutor taskExecutor;
 
     @Override
     public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
@@ -91,8 +94,8 @@ public class RecalculateInterestForLoanTasklet implements Tasklet {
 
     private void recalculateInterest(OfficeData office, int threadPoolSize, int batchSize) {
         final int pageSize = batchSize * threadPoolSize;
-
-        final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
+        taskExecutor.setCorePoolSize(threadPoolSize);
+        taskExecutor.setMaxPoolSize(threadPoolSize);
 
         Long maxLoanIdInList = 0L;
         final String officeHierarchy = office.getHierarchy() + "%";
@@ -103,16 +106,14 @@ public class RecalculateInterestForLoanTasklet implements Tasklet {
         do {
             int totalFilteredRecords = loanIds.size();
             log.debug("Starting accrual - total filtered records - {}", totalFilteredRecords);
-            recalculateInterest(loanIds, threadPoolSize, batchSize, executorService);
+            recalculateInterest(loanIds, threadPoolSize, batchSize);
             maxLoanIdInList += pageSize + 1;
             loanIds = Collections.synchronizedList(
                     this.loanReadPlatformService.fetchLoansForInterestRecalculation(pageSize, maxLoanIdInList, officeHierarchy));
         } while (!CollectionUtils.isEmpty(loanIds));
-
-        executorService.shutdownNow();
     }
 
-    private void recalculateInterest(List<Long> loanIds, int threadPoolSize, int batchSize, final ExecutorService executorService) {
+    private void recalculateInterest(List<Long> loanIds, int threadPoolSize, int batchSize) {
 
         List<Callable<Void>> posters = new ArrayList<>();
         int fromIndex = 0;
@@ -149,12 +150,9 @@ public class RecalculateInterestForLoanTasklet implements Tasklet {
             }
         }
 
-        try {
-            List<Future<Void>> responses = executorService.invokeAll(posters);
-            checkCompletion(responses);
-        } catch (InterruptedException e1) {
-            log.error("Interrupted while recalculateInterest", e1);
-        }
+        List<Future<Void>> responses = new ArrayList<>();
+        posters.forEach(poster -> responses.add(taskExecutor.submit(poster)));
+        checkCompletion(responses);
     }
 
     private <T> List<T> safeSubList(List<T> list, int fromIndex, int toIndex) {
diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/jobs/postinterestforsavings/PostInterestForSavingTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/jobs/postinterestforsavings/PostInterestForSavingTasklet.java
index 209667244..90407ae23 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/jobs/postinterestforsavings/PostInterestForSavingTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/jobs/postinterestforsavings/PostInterestForSavingTasklet.java
@@ -27,13 +27,12 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
 import org.apache.fineract.infrastructure.core.domain.FineractContext;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
 import org.apache.fineract.portfolio.savings.data.SavingsAccountData;
@@ -43,7 +42,9 @@ import org.springframework.batch.core.StepContribution;
 import org.springframework.batch.core.scope.context.ChunkContext;
 import org.springframework.batch.core.step.tasklet.Tasklet;
 import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.ApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
 @RequiredArgsConstructor
@@ -55,15 +56,18 @@ public class PostInterestForSavingTasklet implements Tasklet {
     private final ConfigurationDomainService configurationDomainService;
     private final Queue<List<SavingsAccountData>> queue = new ArrayDeque<>();
     private final ApplicationContext applicationContext;
+    @Qualifier(TaskExecutorConstant.CONFIGURABLE_TASK_EXECUTOR_BEAN_NAME)
+    private final ThreadPoolTaskExecutor taskExecutor;
     private final int queueSize = 1;
 
     @Override
     public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
         final int threadPoolSize = Integer.parseInt((String) chunkContext.getStepContext().getJobParameters().get("thread-pool-size"));
+        taskExecutor.setCorePoolSize(threadPoolSize);
+        taskExecutor.setMaxPoolSize(threadPoolSize);
         final int batchSize = Integer.parseInt((String) chunkContext.getStepContext().getJobParameters().get("batch-size"));
         final int pageSize = batchSize * threadPoolSize;
         Long maxSavingsIdInList = 0L;
-        final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
         final boolean backdatedTxnsAllowedTill = this.configurationDomainService.retrievePivotDateConfig();
 
         long start = System.currentTimeMillis();
@@ -84,16 +88,15 @@ public class PostInterestForSavingTasklet implements Tasklet {
                     log.debug("Starting Interest posting - total records - {}", totalFilteredRecords);
                     List<SavingsAccountData> queueElement = queue.element();
                     maxSavingsIdInList = queueElement.get(queueElement.size() - 1).getId();
-                    postInterest(queue.remove(), threadPoolSize, executorService, backdatedTxnsAllowedTill, pageSize, maxSavingsIdInList);
+                    postInterest(queue.remove(), threadPoolSize, backdatedTxnsAllowedTill, pageSize, maxSavingsIdInList);
                 } while (!CollectionUtils.isEmpty(queue));
             }
-            executorService.shutdownNow();
         }
         return RepeatStatus.FINISHED;
     }
 
-    private void postInterest(List<SavingsAccountData> savingsAccounts, int threadPoolSize, ExecutorService executorService,
-            final boolean backdatedTxnsAllowedTill, final int pageSize, Long maxSavingsIdInList) {
+    private void postInterest(List<SavingsAccountData> savingsAccounts, int threadPoolSize, final boolean backdatedTxnsAllowedTill,
+            final int pageSize, Long maxSavingsIdInList) {
         List<Callable<Void>> posters = new ArrayList<>();
         int fromIndex = 0;
         int size = savingsAccounts.size();
@@ -156,30 +159,27 @@ public class PostInterestForSavingTasklet implements Tasklet {
             }
         }
 
-        try {
-            List<Future<Void>> responses = executorService.invokeAll(posters);
-            Long maxId = maxSavingsIdInList;
-            if (!queue.isEmpty()) {
-                maxId = Math.max(maxSavingsIdInList, queue.element().get(queue.element().size() - 1).getId());
-            }
+        List<Future<Void>> responses = new ArrayList<>();
+        posters.forEach(poster -> responses.add(taskExecutor.submit(poster)));
+        Long maxId = maxSavingsIdInList;
+        if (!queue.isEmpty()) {
+            maxId = Math.max(maxSavingsIdInList, queue.element().get(queue.element().size() - 1).getId());
+        }
 
-            while (queue.size() <= queueSize) {
-                log.debug("Fetching while threads are running!..:: this is not supposed to run........");
-                savingsAccounts = Collections.synchronizedList(this.savingAccountReadPlatformService
-                        .retrieveAllSavingsDataForInterestPosting(backdatedTxnsAllowedTill, pageSize, ACTIVE.getValue(), maxId));
-                if (savingsAccounts.isEmpty()) {
-                    break;
-                }
-                maxId = savingsAccounts.get(savingsAccounts.size() - 1).getId();
-                log.debug("Add to the Queue");
-                queue.add(savingsAccounts);
+        while (queue.size() <= queueSize) {
+            log.debug("Fetching while threads are running!..:: this is not supposed to run........");
+            savingsAccounts = Collections.synchronizedList(this.savingAccountReadPlatformService
+                    .retrieveAllSavingsDataForInterestPosting(backdatedTxnsAllowedTill, pageSize, ACTIVE.getValue(), maxId));
+            if (savingsAccounts.isEmpty()) {
+                break;
             }
-
-            checkCompletion(responses);
-            log.debug("Queue size {}", queue.size());
-        } catch (InterruptedException e1) {
-            log.error("Interrupted while postInterest", e1);
+            maxId = savingsAccounts.get(savingsAccounts.size() - 1).getId();
+            log.debug("Add to the Queue");
+            queue.add(savingsAccounts);
         }
+
+        checkCompletion(responses);
+        log.debug("Queue size {}", queue.size());
     }
 
     private <T> List<T> safeSubList(List<T> list, int fromIndex, int toIndex) {
diff --git a/fineract-provider/src/main/resources/application.properties b/fineract-provider/src/main/resources/application.properties
index fd3832da3..eaacad14f 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -83,6 +83,11 @@ fineract.events.external.producer.jms.broker-url=${FINERACT_EXTERNAL_EVENTS_PROD
 fineract.events.external.producer.jms.broker-username=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_USERNAME:}
 fineract.events.external.producer.jms.broker-password=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_PASSWORD:}
 fineract.events.external.producer.jms.producer-count=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT:1}
+fineract.events.external.producer.jms.thread-pool-task-executor-core-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE:10}
+fineract.events.external.producer.jms.thread-pool-task-executor-max-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE:100}
+
+fineract.task-executor.default-task-executor-core-pool-size=${FINERACT_DEFAULT_TASK_EXECUTOR_CORE_POOL_SIZE:10}
+fineract.task-executor.default-task-executor-max-pool-size=${FINERACT_DEFAULT_TASK_EXECUTOR_MAX_POOL_SIZE:100}
 
 fineract.idempotency-key-header-name=${FINERACT_IDEMPOTENCY_KEY_HEADER_NAME:Idempotency-Key}
 
diff --git a/fineract-provider/src/test/resources/application-test.properties b/fineract-provider/src/test/resources/application-test.properties
index f81768f7b..584ff5a36 100644
--- a/fineract-provider/src/test/resources/application-test.properties
+++ b/fineract-provider/src/test/resources/application-test.properties
@@ -53,6 +53,11 @@ fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRO
 fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
 fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
 fineract.events.external.producer.jms.broker-url=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL:tcp://127.0.0.1:61616}
+fineract.events.external.producer.jms.thread-pool-task-executor-core-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE:10}
+fineract.events.external.producer.jms.thread-pool-task-executor-max-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE:100}
+
+fineract.task-executor.default-task-executor-core-pool-size=${FINERACT_DEFAULT_TASK_EXECUTOR_CORE_POOL_SIZE:10}
+fineract.task-executor.default-task-executor-max-pool-size=${FINERACT_DEFAULT_TASK_EXECUTOR_MAX_POOL_SIZE:100}
 
 fineract.loan.transactionprocessor.creocore.enabled=true
 fineract.loan.transactionprocessor.early-repayment.enabled=true