You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ad...@apache.org on 2023/06/14 13:06:37 UTC
[fineract] branch develop updated: [FINERACT-1941] change task executors to threadpool task executor
This is an automated email from the ASF dual-hosted git repository.
adamsaghy pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new 78d9a0201 [FINERACT-1941] change task executors to threadpool task executor
78d9a0201 is described below
commit 78d9a0201eb069986cb33e821a1a5e350461eba1
Author: taskain7 <ta...@gmail.com>
AuthorDate: Wed Jun 14 02:57:47 2023 +0200
[FINERACT-1941] change task executors to threadpool task executor
---
.../core/config/FineractProperties.java | 12 +++++
.../service/AsyncLoanCOBExecutorServiceImpl.java | 3 +-
.../SendMessageToSmsGatewayConfig.java | 8 +++-
.../SendMessageToSmsGatewayTasklet.java | 15 ++----
.../configuration/async/SpringAsyncConfig.java | 3 +-
.../core/config/TaskExecutorConfig.java | 50 +++++++++++++++++++
.../core/config/TaskExecutorConstant.java | 31 ++++++++++++
.../config/ExternalEventJMSConfiguration.java | 9 ++--
.../jms/JMSMultiExternalEventProducer.java | 3 +-
.../SmsMessageScheduledJobServiceImpl.java | 22 ++++-----
.../RecalculateInterestForLoanConfig.java | 9 +++-
.../RecalculateInterestForLoanTasklet.java | 26 +++++-----
.../PostInterestForSavingTasklet.java | 56 +++++++++++-----------
.../src/main/resources/application.properties | 5 ++
.../src/test/resources/application-test.properties | 5 ++
15 files changed, 182 insertions(+), 75 deletions(-)
diff --git a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index 8f15543a6..cbfae0298 100644
--- a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++ b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -46,6 +46,8 @@ public class FineractProperties {
private FineractEventsProperties events;
+ private FineractTaskExecutor taskExecutor;
+
private FineractContentProperties content;
private FineractReportProperties report;
@@ -190,6 +192,14 @@ public class FineractProperties {
private FineractExternalEventsProperties external;
}
+ @Getter
+ @Setter
+ public static class FineractTaskExecutor {
+
+ private int defaultTaskExecutorCorePoolSize;
+ private int defaultTaskExecutorMaxPoolSize;
+ }
+
@Getter
@Setter
public static class FineractExternalEventsProperties {
@@ -217,6 +227,8 @@ public class FineractProperties {
private String brokerPassword;
private int producerCount;
private boolean asyncSendEnabled;
+ private int threadPoolTaskExecutorCorePoolSize;
+ private int threadPoolTaskExecutorMaxPoolSize;
public boolean isBrokerPasswordProtected() {
return StringUtils.isNotBlank(brokerUsername) || StringUtils.isNotBlank(brokerPassword);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
index 6f3fc7fba..d92538cc4 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.fineract.cob.data.LoanIdAndLastClosedBusinessDate;
import org.apache.fineract.cob.loan.LoanCOBConstant;
import org.apache.fineract.cob.loan.RetrieveLoanIdService;
import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
import org.apache.fineract.infrastructure.core.domain.FineractContext;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
@@ -58,7 +59,7 @@ public class AsyncLoanCOBExecutorServiceImpl implements AsyncLoanCOBExecutorServ
private final RetrieveLoanIdService retrieveLoanIdService;
@Override
- @Async("loanCOBCatchUpThreadPoolTaskExecutor")
+ @Async(TaskExecutorConstant.LOAN_COB_CATCH_UP_TASK_EXECUTOR_BEAN_NAME)
public void executeLoanCOBCatchUpAsync(FineractContext context) {
try {
ThreadLocalContextUtil.init(context);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayConfig.java
index 0b4fbb8db..5fff6f6b3 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayConfig.java
@@ -19,6 +19,7 @@
package org.apache.fineract.infrastructure.campaigns.jobs.sendmessagetosmsgateway;
import org.apache.fineract.infrastructure.campaigns.helper.SmsConfigUtils;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
import org.apache.fineract.infrastructure.gcm.service.NotificationSenderService;
import org.apache.fineract.infrastructure.jobs.service.JobName;
import org.apache.fineract.infrastructure.sms.domain.SmsMessageRepository;
@@ -29,8 +30,10 @@ import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@@ -46,6 +49,9 @@ public class SendMessageToSmsGatewayConfig {
private NotificationSenderService notificationSenderService;
@Autowired
private SmsConfigUtils smsConfigUtils;
+ @Autowired
+ @Qualifier(TaskExecutorConstant.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
+ private ThreadPoolTaskExecutor taskExecutor;
@Bean
protected Step sendMessageToSmsGatewayStep() {
@@ -61,6 +67,6 @@ public class SendMessageToSmsGatewayConfig {
@Bean
public SendMessageToSmsGatewayTasklet sendMessageToSmsGatewayTasklet() {
- return new SendMessageToSmsGatewayTasklet(smsMessageRepository, notificationSenderService, smsConfigUtils);
+ return new SendMessageToSmsGatewayTasklet(smsMessageRepository, notificationSenderService, smsConfigUtils, taskExecutor);
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayTasklet.java
index a263ac6e7..edd9abbc7 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/campaigns/jobs/sendmessagetosmsgateway/SendMessageToSmsGatewayTasklet.java
@@ -18,15 +18,12 @@
*/
package org.apache.fineract.infrastructure.campaigns.jobs.sendmessagetosmsgateway;
-import jakarta.annotation.PostConstruct;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@@ -52,6 +49,7 @@ import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.client.RestTemplate;
@Slf4j
@@ -59,16 +57,11 @@ import org.springframework.web.client.RestTemplate;
public class SendMessageToSmsGatewayTasklet implements Tasklet {
private final SmsMessageRepository smsMessageRepository;
- private ExecutorService genericExecutorService;
private final NotificationSenderService notificationSenderService;
private final SmsConfigUtils smsConfigUtils;
+ private final ThreadPoolTaskExecutor taskExecutor;
private final RestTemplate restTemplate = new RestTemplate();
- @PostConstruct
- public void initializeExecutorService() {
- genericExecutorService = Executors.newSingleThreadExecutor();
- }
-
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
int pageLimit = 200;
@@ -102,7 +95,7 @@ public class SendMessageToSmsGatewayTasklet implements Tasklet {
if (!toSaveMessages.isEmpty()) {
smsMessageRepository.saveAll(toSaveMessages);
smsMessageRepository.flush();
- genericExecutorService.execute(new SmsTask(ThreadLocalContextUtil.getTenant(), apiQueueResourceDataCollection));
+ taskExecutor.execute(new SmsTask(ThreadLocalContextUtil.getTenant(), apiQueueResourceDataCollection));
}
if (!toSendNotificationMessages.isEmpty()) {
notificationSenderService.sendNotification(toSendNotificationMessages);
@@ -135,7 +128,7 @@ public class SendMessageToSmsGatewayTasklet implements Tasklet {
@Override
public void onApplicationEvent(ContextClosedEvent event) {
- genericExecutorService.shutdown();
+ taskExecutor.shutdown();
log.info("Shutting down the ExecutorService");
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/configuration/async/SpringAsyncConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/configuration/async/SpringAsyncConfig.java
index a731937e5..12c1b0a9b 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/configuration/async/SpringAsyncConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/configuration/async/SpringAsyncConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.fineract.infrastructure.configuration.async;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -29,7 +30,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer {
- @Bean(name = "loanCOBCatchUpThreadPoolTaskExecutor")
+ @Bean(name = TaskExecutorConstant.LOAN_COB_CATCH_UP_TASK_EXECUTOR_BEAN_NAME)
public ThreadPoolTaskExecutor loanCOBCatchUpThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(1);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConfig.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConfig.java
new file mode 100644
index 000000000..9a1b1cdc0
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConfig.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.core.config;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Scope;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@Configuration
+public class TaskExecutorConfig {
+
+ @Autowired
+ private FineractProperties fineractProperties;
+
+ @Bean(TaskExecutorConstant.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
+ public ThreadPoolTaskExecutor fineractDefaultThreadPoolTaskExecutor() {
+ ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+ threadPoolTaskExecutor.setCorePoolSize(fineractProperties.getTaskExecutor().getDefaultTaskExecutorCorePoolSize());
+ threadPoolTaskExecutor.setMaxPoolSize(fineractProperties.getTaskExecutor().getDefaultTaskExecutorMaxPoolSize());
+ return threadPoolTaskExecutor;
+ }
+
+ @Bean(TaskExecutorConstant.CONFIGURABLE_TASK_EXECUTOR_BEAN_NAME)
+ @Scope(scopeName = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+ public ThreadPoolTaskExecutor fineractConfigurableThreadPoolTaskExecutor() {
+ ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+ threadPoolTaskExecutor.setCorePoolSize(fineractProperties.getTaskExecutor().getDefaultTaskExecutorCorePoolSize());
+ threadPoolTaskExecutor.setMaxPoolSize(fineractProperties.getTaskExecutor().getDefaultTaskExecutorMaxPoolSize());
+ return threadPoolTaskExecutor;
+ }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConstant.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConstant.java
new file mode 100644
index 000000000..59c066824
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/TaskExecutorConstant.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.core.config;
+
+public final class TaskExecutorConstant {
+
+ private TaskExecutorConstant() {
+
+ }
+
+ public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "fineractDefaultThreadPoolTaskExecutor";
+ public static final String CONFIGURABLE_TASK_EXECUTOR_BEAN_NAME = "fineractConfigurableThreadPoolTaskExecutor";
+ public static final String EVENT_TASK_EXECUTOR_BEAN_NAME = "externalEventJmsProducerExecutor";
+ public static final String LOAN_COB_CATCH_UP_TASK_EXECUTOR_BEAN_NAME = "loanCOBCatchUpThreadPoolTaskExecutor";
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
index 228b155da..5ec6b5b59 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.core.config.FineractProperties.FineractExternalEventsProducerJmsProperties;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
@@ -68,11 +69,13 @@ public class ExternalEventJMSConfiguration {
return new ActiveMQQueue(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventQueueName());
}
- @Bean("externalEventJmsProducerExecutor")
+ @Bean(TaskExecutorConstant.EVENT_TASK_EXECUTOR_BEAN_NAME)
public ThreadPoolTaskExecutor externalEventJmsProducerExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
- threadPoolTaskExecutor.setCorePoolSize(10);
- threadPoolTaskExecutor.setMaxPoolSize(100);
+ threadPoolTaskExecutor.setCorePoolSize(
+ fineractProperties.getEvents().getExternal().getProducer().getJms().getThreadPoolTaskExecutorCorePoolSize());
+ threadPoolTaskExecutor
+ .setMaxPoolSize(fineractProperties.getEvents().getExternal().getProducer().getJms().getThreadPoolTaskExecutorMaxPoolSize());
threadPoolTaskExecutor.setThreadNamePrefix("externalEventJms");
return threadPoolTaskExecutor;
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
index 233ece737..f6fbb1123 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
@@ -38,6 +38,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
import org.apache.fineract.infrastructure.core.service.HashingService;
import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
@@ -61,7 +62,7 @@ public class JMSMultiExternalEventProducer implements ExternalEventProducer {
private final MessageFactory messageFactory;
- @Qualifier("externalEventJmsProducerExecutor")
+ @Qualifier(TaskExecutorConstant.EVENT_TASK_EXECUTOR_BEAN_NAME)
private final AsyncTaskExecutor taskExecutor;
private final HashingService hashingService;
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/sms/scheduler/SmsMessageScheduledJobServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/sms/scheduler/SmsMessageScheduledJobServiceImpl.java
index daea2192e..3dfbc46ec 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/sms/scheduler/SmsMessageScheduledJobServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/sms/scheduler/SmsMessageScheduledJobServiceImpl.java
@@ -18,21 +18,19 @@
*/
package org.apache.fineract.infrastructure.sms.scheduler;
-import jakarta.annotation.PostConstruct;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.infrastructure.campaigns.helper.SmsConfigUtils;
import org.apache.fineract.infrastructure.campaigns.sms.constants.SmsCampaignConstants;
import org.apache.fineract.infrastructure.campaigns.sms.domain.SmsCampaign;
import org.apache.fineract.infrastructure.campaigns.sms.exception.ConnectionFailureException;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
import org.apache.fineract.infrastructure.core.domain.FineractContext;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.gcm.service.NotificationSenderService;
@@ -40,6 +38,7 @@ import org.apache.fineract.infrastructure.sms.data.SmsMessageApiQueueResourceDat
import org.apache.fineract.infrastructure.sms.domain.SmsMessage;
import org.apache.fineract.infrastructure.sms.domain.SmsMessageRepository;
import org.apache.fineract.infrastructure.sms.domain.SmsMessageStatusType;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.core.ParameterizedTypeReference;
@@ -47,6 +46,7 @@ import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
@@ -62,14 +62,8 @@ public class SmsMessageScheduledJobServiceImpl implements SmsMessageScheduledJob
private final RestTemplate restTemplate = new RestTemplate();
private final SmsConfigUtils smsConfigUtils;
private final NotificationSenderService notificationSenderService;
- private ExecutorService genericExecutorService;
- private ExecutorService triggeredExecutorService;
-
- @PostConstruct
- public void initializeExecutorService() {
- genericExecutorService = Executors.newSingleThreadExecutor();
- triggeredExecutorService = Executors.newSingleThreadExecutor();
- }
+ @Qualifier(TaskExecutorConstant.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
+ private final ThreadPoolTaskExecutor taskExecutor;
private void connectAndSendToIntermediateServer(Collection<SmsMessageApiQueueResourceData> apiQueueResourceDatas) {
Map<String, Object> hostConfig = this.smsConfigUtils.getMessageGateWayRequestURI("sms",
@@ -114,7 +108,7 @@ public class SmsMessageScheduledJobServiceImpl implements SmsMessageScheduledJob
if (toSaveMessages.size() > 0) {
this.smsMessageRepository.saveAll(toSaveMessages);
this.smsMessageRepository.flush();
- this.triggeredExecutorService.execute(new SmsTask(apiQueueResourceDatas, ThreadLocalContextUtil.getContext()));
+ this.taskExecutor.execute(new SmsTask(apiQueueResourceDatas, ThreadLocalContextUtil.getContext()));
}
if (!toSendNotificationMessages.isEmpty()) {
this.notificationSenderService.sendNotification(toSendNotificationMessages);
@@ -141,7 +135,7 @@ public class SmsMessageScheduledJobServiceImpl implements SmsMessageScheduledJob
this.smsMessageRepository.saveAll(smsMessages);
request.append(SmsMessageApiQueueResourceData.toJsonString(apiQueueResourceDatas));
log.debug("Sending triggered SMS to specific provider with request - {}", request);
- this.triggeredExecutorService.execute(new SmsTask(apiQueueResourceDatas, ThreadLocalContextUtil.getContext()));
+ this.taskExecutor.execute(new SmsTask(apiQueueResourceDatas, ThreadLocalContextUtil.getContext()));
} catch (Exception e) {
log.error("Error occured.", e);
}
@@ -165,7 +159,7 @@ public class SmsMessageScheduledJobServiceImpl implements SmsMessageScheduledJob
@Override
public void onApplicationEvent(ContextClosedEvent event) {
- genericExecutorService.shutdown();
+ taskExecutor.shutdown();
log.info("Shutting down the ExecutorService");
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanConfig.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanConfig.java
index eb02c057c..1a84505b1 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanConfig.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.fineract.portfolio.loanaccount.jobs.recalculateinterestforloan;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
import org.apache.fineract.infrastructure.jobs.service.JobName;
import org.apache.fineract.organisation.office.service.OfficeReadPlatformService;
import org.apache.fineract.portfolio.loanaccount.service.LoanReadPlatformService;
@@ -30,8 +31,10 @@ import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@@ -54,6 +57,10 @@ public class RecalculateInterestForLoanConfig {
@Autowired
private OfficeReadPlatformService officeReadPlatformService;
+ @Autowired
+ @Qualifier(TaskExecutorConstant.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
+ private ThreadPoolTaskExecutor taskExecutor;
+
@Bean
protected Step recalculateInterestForLoanStep() {
return new StepBuilder(JobName.RECALCULATE_INTEREST_FOR_LOAN.name(), jobRepository)
@@ -69,6 +76,6 @@ public class RecalculateInterestForLoanConfig {
@Bean
public RecalculateInterestForLoanTasklet recalculateInterestForLoanTasklet() {
return new RecalculateInterestForLoanTasklet(loanReadPlatformService, loanWritePlatformService, recalculateInterestPoster,
- officeReadPlatformService);
+ officeReadPlatformService, taskExecutor);
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanTasklet.java
index 586dff06a..5ab55574d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/jobs/recalculateinterestforloan/RecalculateInterestForLoanTasklet.java
@@ -25,12 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
import org.apache.fineract.infrastructure.jobs.exception.JobExecutionException;
import org.apache.fineract.organisation.office.data.OfficeData;
import org.apache.fineract.organisation.office.exception.OfficeNotFoundException;
@@ -43,6 +42,8 @@ import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Slf4j
@RequiredArgsConstructor
@@ -52,6 +53,8 @@ public class RecalculateInterestForLoanTasklet implements Tasklet {
private final LoanWritePlatformService loanWritePlatformService;
private final RecalculateInterestPoster recalculateInterestPoster;
private final OfficeReadPlatformService officeReadPlatformService;
+ @Qualifier(TaskExecutorConstant.CONFIGURABLE_TASK_EXECUTOR_BEAN_NAME)
+ private final ThreadPoolTaskExecutor taskExecutor;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
@@ -91,8 +94,8 @@ public class RecalculateInterestForLoanTasklet implements Tasklet {
private void recalculateInterest(OfficeData office, int threadPoolSize, int batchSize) {
final int pageSize = batchSize * threadPoolSize;
-
- final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
+ taskExecutor.setCorePoolSize(threadPoolSize);
+ taskExecutor.setMaxPoolSize(threadPoolSize);
Long maxLoanIdInList = 0L;
final String officeHierarchy = office.getHierarchy() + "%";
@@ -103,16 +106,14 @@ public class RecalculateInterestForLoanTasklet implements Tasklet {
do {
int totalFilteredRecords = loanIds.size();
log.debug("Starting accrual - total filtered records - {}", totalFilteredRecords);
- recalculateInterest(loanIds, threadPoolSize, batchSize, executorService);
+ recalculateInterest(loanIds, threadPoolSize, batchSize);
maxLoanIdInList += pageSize + 1;
loanIds = Collections.synchronizedList(
this.loanReadPlatformService.fetchLoansForInterestRecalculation(pageSize, maxLoanIdInList, officeHierarchy));
} while (!CollectionUtils.isEmpty(loanIds));
-
- executorService.shutdownNow();
}
- private void recalculateInterest(List<Long> loanIds, int threadPoolSize, int batchSize, final ExecutorService executorService) {
+ private void recalculateInterest(List<Long> loanIds, int threadPoolSize, int batchSize) {
List<Callable<Void>> posters = new ArrayList<>();
int fromIndex = 0;
@@ -149,12 +150,9 @@ public class RecalculateInterestForLoanTasklet implements Tasklet {
}
}
- try {
- List<Future<Void>> responses = executorService.invokeAll(posters);
- checkCompletion(responses);
- } catch (InterruptedException e1) {
- log.error("Interrupted while recalculateInterest", e1);
- }
+ List<Future<Void>> responses = new ArrayList<>();
+ posters.forEach(poster -> responses.add(taskExecutor.submit(poster)));
+ checkCompletion(responses);
}
private <T> List<T> safeSubList(List<T> list, int fromIndex, int toIndex) {
diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/jobs/postinterestforsavings/PostInterestForSavingTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/jobs/postinterestforsavings/PostInterestForSavingTasklet.java
index 209667244..90407ae23 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/jobs/postinterestforsavings/PostInterestForSavingTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/jobs/postinterestforsavings/PostInterestForSavingTasklet.java
@@ -27,13 +27,12 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
+import org.apache.fineract.infrastructure.core.config.TaskExecutorConstant;
import org.apache.fineract.infrastructure.core.domain.FineractContext;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.portfolio.savings.data.SavingsAccountData;
@@ -43,7 +42,9 @@ import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
@RequiredArgsConstructor
@@ -55,15 +56,18 @@ public class PostInterestForSavingTasklet implements Tasklet {
private final ConfigurationDomainService configurationDomainService;
private final Queue<List<SavingsAccountData>> queue = new ArrayDeque<>();
private final ApplicationContext applicationContext;
+ @Qualifier(TaskExecutorConstant.CONFIGURABLE_TASK_EXECUTOR_BEAN_NAME)
+ private final ThreadPoolTaskExecutor taskExecutor;
private final int queueSize = 1;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
final int threadPoolSize = Integer.parseInt((String) chunkContext.getStepContext().getJobParameters().get("thread-pool-size"));
+ taskExecutor.setCorePoolSize(threadPoolSize);
+ taskExecutor.setMaxPoolSize(threadPoolSize);
final int batchSize = Integer.parseInt((String) chunkContext.getStepContext().getJobParameters().get("batch-size"));
final int pageSize = batchSize * threadPoolSize;
Long maxSavingsIdInList = 0L;
- final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
final boolean backdatedTxnsAllowedTill = this.configurationDomainService.retrievePivotDateConfig();
long start = System.currentTimeMillis();
@@ -84,16 +88,15 @@ public class PostInterestForSavingTasklet implements Tasklet {
log.debug("Starting Interest posting - total records - {}", totalFilteredRecords);
List<SavingsAccountData> queueElement = queue.element();
maxSavingsIdInList = queueElement.get(queueElement.size() - 1).getId();
- postInterest(queue.remove(), threadPoolSize, executorService, backdatedTxnsAllowedTill, pageSize, maxSavingsIdInList);
+ postInterest(queue.remove(), threadPoolSize, backdatedTxnsAllowedTill, pageSize, maxSavingsIdInList);
} while (!CollectionUtils.isEmpty(queue));
}
- executorService.shutdownNow();
}
return RepeatStatus.FINISHED;
}
- private void postInterest(List<SavingsAccountData> savingsAccounts, int threadPoolSize, ExecutorService executorService,
- final boolean backdatedTxnsAllowedTill, final int pageSize, Long maxSavingsIdInList) {
+ private void postInterest(List<SavingsAccountData> savingsAccounts, int threadPoolSize, final boolean backdatedTxnsAllowedTill,
+ final int pageSize, Long maxSavingsIdInList) {
List<Callable<Void>> posters = new ArrayList<>();
int fromIndex = 0;
int size = savingsAccounts.size();
@@ -156,30 +159,27 @@ public class PostInterestForSavingTasklet implements Tasklet {
}
}
- try {
- List<Future<Void>> responses = executorService.invokeAll(posters);
- Long maxId = maxSavingsIdInList;
- if (!queue.isEmpty()) {
- maxId = Math.max(maxSavingsIdInList, queue.element().get(queue.element().size() - 1).getId());
- }
+ List<Future<Void>> responses = new ArrayList<>();
+ posters.forEach(poster -> responses.add(taskExecutor.submit(poster)));
+ Long maxId = maxSavingsIdInList;
+ if (!queue.isEmpty()) {
+ maxId = Math.max(maxSavingsIdInList, queue.element().get(queue.element().size() - 1).getId());
+ }
- while (queue.size() <= queueSize) {
- log.debug("Fetching while threads are running!..:: this is not supposed to run........");
- savingsAccounts = Collections.synchronizedList(this.savingAccountReadPlatformService
- .retrieveAllSavingsDataForInterestPosting(backdatedTxnsAllowedTill, pageSize, ACTIVE.getValue(), maxId));
- if (savingsAccounts.isEmpty()) {
- break;
- }
- maxId = savingsAccounts.get(savingsAccounts.size() - 1).getId();
- log.debug("Add to the Queue");
- queue.add(savingsAccounts);
+ while (queue.size() <= queueSize) {
+ log.debug("Fetching while threads are running!..:: this is not supposed to run........");
+ savingsAccounts = Collections.synchronizedList(this.savingAccountReadPlatformService
+ .retrieveAllSavingsDataForInterestPosting(backdatedTxnsAllowedTill, pageSize, ACTIVE.getValue(), maxId));
+ if (savingsAccounts.isEmpty()) {
+ break;
}
-
- checkCompletion(responses);
- log.debug("Queue size {}", queue.size());
- } catch (InterruptedException e1) {
- log.error("Interrupted while postInterest", e1);
+ maxId = savingsAccounts.get(savingsAccounts.size() - 1).getId();
+ log.debug("Add to the Queue");
+ queue.add(savingsAccounts);
}
+
+ checkCompletion(responses);
+ log.debug("Queue size {}", queue.size());
}
private <T> List<T> safeSubList(List<T> list, int fromIndex, int toIndex) {
diff --git a/fineract-provider/src/main/resources/application.properties b/fineract-provider/src/main/resources/application.properties
index fd3832da3..eaacad14f 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -83,6 +83,11 @@ fineract.events.external.producer.jms.broker-url=${FINERACT_EXTERNAL_EVENTS_PROD
fineract.events.external.producer.jms.broker-username=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_USERNAME:}
fineract.events.external.producer.jms.broker-password=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_PASSWORD:}
fineract.events.external.producer.jms.producer-count=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT:1}
+fineract.events.external.producer.jms.thread-pool-task-executor-core-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE:10}
+fineract.events.external.producer.jms.thread-pool-task-executor-max-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE:100}
+
+fineract.task-executor.default-task-executor-core-pool-size=${FINERACT_DEFAULT_TASK_EXECUTOR_CORE_POOL_SIZE:10}
+fineract.task-executor.default-task-executor-max-pool-size=${FINERACT_DEFAULT_TASK_EXECUTOR_MAX_POOL_SIZE:100}
fineract.idempotency-key-header-name=${FINERACT_IDEMPOTENCY_KEY_HEADER_NAME:Idempotency-Key}
diff --git a/fineract-provider/src/test/resources/application-test.properties b/fineract-provider/src/test/resources/application-test.properties
index f81768f7b..584ff5a36 100644
--- a/fineract-provider/src/test/resources/application-test.properties
+++ b/fineract-provider/src/test/resources/application-test.properties
@@ -53,6 +53,11 @@ fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRO
fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
fineract.events.external.producer.jms.broker-url=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL:tcp://127.0.0.1:61616}
+fineract.events.external.producer.jms.thread-pool-task-executor-core-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE:10}
+fineract.events.external.producer.jms.thread-pool-task-executor-max-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE:100}
+
+fineract.task-executor.default-task-executor-core-pool-size=${FINERACT_DEFAULT_TASK_EXECUTOR_CORE_POOL_SIZE:10}
+fineract.task-executor.default-task-executor-max-pool-size=${FINERACT_DEFAULT_TASK_EXECUTOR_MAX_POOL_SIZE:100}
fineract.loan.transactionprocessor.creocore.enabled=true
fineract.loan.transactionprocessor.early-repayment.enabled=true