You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ar...@apache.org on 2022/11/30 13:03:01 UTC
[fineract] branch develop updated: [FINERACT-1678] Resilient manager instance
This is an automated email from the ASF dual-hosted git repository.
arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new de0910f33 [FINERACT-1678] Resilient manager instance
de0910f33 is described below
commit de0910f33705c8bc6f579617ce2c774eee5b3115
Author: taskain7 <ta...@gmail.com>
AuthorDate: Mon Nov 28 03:36:05 2022 +0100
[FINERACT-1678] Resilient manager instance
---
.../apache/fineract/cob/loan/LoanCOBConstant.java | 2 +
.../cob/loan/LoanCOBManagerConfiguration.java | 4 +-
.../external/service/JdbcTemplateFactory.java | 6 ++
.../data/partitionedjobs/PartitionedJob.java} | 28 +++---
.../jobs/domain/JobExecutionRepository.java | 87 ++++++++++++++++++
.../infrastructure/jobs/service/JobStarter.java | 2 +
.../jobs/service/StuckJobExecutorService.java} | 18 +---
.../jobs/service/StuckJobExecutorServiceImpl.java | 102 +++++++++++++++++++++
.../jobs/service/StuckJobListener.java | 89 ++++++++++++++++++
9 files changed, 309 insertions(+), 29 deletions(-)
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
index 8c7a4f66d..f8a142826 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
@@ -30,6 +30,8 @@ public final class LoanCOBConstant {
public static final String INLINE_LOAN_COB_JOB_NAME = "INLINE_LOAN_COB";
public static final String BUSINESS_DATE_PARAMETER_NAME = "BusinessDate";
+ public static final String LOAN_COB_PARTITIONER_STEP = "Loan COB partition - Step";
+
private LoanCOBConstant() {
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBManagerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBManagerConfiguration.java
index 0f86c67a0..1990bbdd5 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBManagerConfiguration.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBManagerConfiguration.java
@@ -85,8 +85,8 @@ public class LoanCOBManagerConfiguration {
@Bean
public Step loanCOBStep() {
- return stepBuilderFactory.get("Loan COB partition - Step").partitioner(LoanCOBConstant.LOAN_COB_WORKER_STEP, partitioner(null))
- .outputChannel(outboundRequests).build();
+ return stepBuilderFactory.get(LoanCOBConstant.LOAN_COB_PARTITIONER_STEP)
+ .partitioner(LoanCOBConstant.LOAN_COB_WORKER_STEP, partitioner(null)).outputChannel(outboundRequests).build();
}
@Bean
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/JdbcTemplateFactory.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/JdbcTemplateFactory.java
index 4929e050e..af162baa7 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/JdbcTemplateFactory.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/JdbcTemplateFactory.java
@@ -23,6 +23,7 @@ import lombok.RequiredArgsConstructor;
import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
import org.apache.fineract.infrastructure.core.service.migration.TenantDataSourceFactory;
import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Component;
@RequiredArgsConstructor
@@ -35,4 +36,9 @@ public class JdbcTemplateFactory {
DataSource tenantDataSource = tenantDataSourceFactory.create(tenant);
return new JdbcTemplate(tenantDataSource);
}
+
+ public NamedParameterJdbcTemplate createNamedParameterJdbcTemplate(FineractPlatformTenant tenant) {
+ DataSource tenantDataSource = tenantDataSourceFactory.create(tenant);
+ return new NamedParameterJdbcTemplate(tenantDataSource);
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/JdbcTemplateFactory.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/data/partitionedjobs/PartitionedJob.java
similarity index 56%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/JdbcTemplateFactory.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/data/partitionedjobs/PartitionedJob.java
index 4929e050e..4e476b0bc 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/JdbcTemplateFactory.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/data/partitionedjobs/PartitionedJob.java
@@ -16,23 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service;
+package org.apache.fineract.infrastructure.jobs.data.partitionedjobs;
-import javax.sql.DataSource;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
-import org.apache.fineract.infrastructure.core.service.migration.TenantDataSourceFactory;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.stereotype.Component;
+import org.apache.fineract.cob.loan.LoanCOBConstant;
@RequiredArgsConstructor
-@Component
-public class JdbcTemplateFactory {
+public enum PartitionedJob {
- private final TenantDataSourceFactory tenantDataSourceFactory;
+ LOAN_COB(LoanCOBConstant.LOAN_COB_PARTITIONER_STEP);
- public JdbcTemplate create(FineractPlatformTenant tenant) {
- DataSource tenantDataSource = tenantDataSourceFactory.create(tenant);
- return new JdbcTemplate(tenantDataSource);
+ @Getter
+ private final String partitionerStepName;
+
+ public static boolean existsByJobName(String jobName) {
+ PartitionedJob partitionedJob = null;
+ for (PartitionedJob job : values()) {
+ if (jobName.equalsIgnoreCase(job.name())) {
+ partitionedJob = job;
+ }
+ }
+ return partitionedJob != null;
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java
new file mode 100644
index 000000000..620961d0d
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.jobs.domain;
+
+import static org.springframework.batch.core.BatchStatus.COMPLETED;
+import static org.springframework.batch.core.BatchStatus.FAILED;
+import static org.springframework.batch.core.BatchStatus.STARTED;
+
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class JobExecutionRepository {
+
+ private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
+
+ public List<String> getStuckJobNames(NamedParameterJdbcTemplate jdbcTemplate) {
+ return jdbcTemplate.queryForList(
+ "SELECT bji.JOB_NAME as STUCK_JOB_NAME FROM BATCH_JOB_INSTANCE bji "
+ + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID "
+ + "WHERE bje.STATUS IN (:statuses) AND bje.JOB_INSTANCE_ID NOT IN ("
+ + "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji "
+ + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID "
+ + "WHERE bje.STATUS = :completedStatus)",
+ Map.of("statuses", List.of(STARTED.name(), FAILED.name()), "completedStatus", COMPLETED.name()), String.class);
+ }
+
+ public Long getStuckJobCountByJobName(String jobName) {
+ return namedParameterJdbcTemplate.queryForObject(
+ "SELECT COUNT(*) as STUCK_JOB_COUNT FROM BATCH_JOB_INSTANCE bji "
+ + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID "
+ + "WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN ("
+ + "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji "
+ + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID "
+ + "WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)",
+ Map.of("statuses", List.of(STARTED.name(), FAILED.name()), "jobName", jobName, "completedStatus", COMPLETED.name()),
+ Long.class);
+ }
+
+ public List<Long> getStuckJobIdsByJobName(String jobName) {
+ return namedParameterJdbcTemplate.queryForList(
+ "SELECT bje.JOB_EXECUTION_ID FROM BATCH_JOB_INSTANCE bji "
+ + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID "
+ + "WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN ("
+ + "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji "
+ + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID "
+ + "WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)",
+ Map.of("statuses", List.of(STARTED.name(), FAILED.name()), "jobName", jobName, "completedStatus", COMPLETED.name()),
+ Long.class);
+ }
+
+ public Long getNotCompletedPartitionsCount(Long jobExecutionId, String partitionerStepName) {
+ return namedParameterJdbcTemplate.queryForObject(
+ "SELECT COUNT(bse.STEP_EXECUTION_ID) FROM BATCH_STEP_EXECUTION bse "
+ + "WHERE bse.JOB_EXECUTION_ID = :jobExecutionId AND bse.STEP_NAME <> :stepName AND bse.status <> :status",
+ Map.of("jobExecutionId", jobExecutionId, "stepName", partitionerStepName, "status", COMPLETED.name()), Long.class);
+ }
+
+ public void updateJobStatusToFailed(Long stuckJobId, String partitionerStepName) {
+ namedParameterJdbcTemplate.update(
+ "UPDATE BATCH_STEP_EXECUTION SET STATUS = :status WHERE JOB_EXECUTION_ID = :jobExecutionId AND STEP_NAME = :stepName",
+ Map.of("status", FAILED.name(), "jobExecutionId", stuckJobId, "stepName", partitionerStepName));
+ namedParameterJdbcTemplate.update(
+ "UPDATE BATCH_JOB_EXECUTION SET STATUS = :status, START_TIME = null, END_TIME = null WHERE JOB_EXECUTION_ID = :jobExecutionId",
+ Map.of("status", FAILED.name(), "jobExecutionId", stuckJobId));
+ }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobStarter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobStarter.java
index 43e0e3df5..3b2f59921 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobStarter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobStarter.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.infrastructure.core.domain.FineractContext;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
@@ -45,6 +46,7 @@ import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.stereotype.Component;
@Component
+@Slf4j
@RequiredArgsConstructor
public class JobStarter {
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StuckJobExecutorService.java
similarity index 52%
copy from fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StuckJobExecutorService.java
index 8c7a4f66d..70e365308 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StuckJobExecutorService.java
@@ -16,21 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.cob.loan;
+package org.apache.fineract.infrastructure.jobs.service;
-public final class LoanCOBConstant {
+public interface StuckJobExecutorService {
- public static final String JOB_NAME = "LOAN_COB";
- public static final String LOAN_COB_JOB_NAME = "LOAN_CLOSE_OF_BUSINESS";
- public static final String LOAN_IDS = "loanIds";
- public static final String BUSINESS_STEP_MAP = "businessStepMap";
- public static final String LOAN_COB_WORKER_STEP = "loanCOBWorkerStep";
-
- public static final String ALREADY_LOCKED_BY_INLINE_COB_OR_PROCESSED_LOAN_IDS = "alreadyLockedOrProcessedLoanIds";
- public static final String INLINE_LOAN_COB_JOB_NAME = "INLINE_LOAN_COB";
- public static final String BUSINESS_DATE_PARAMETER_NAME = "BusinessDate";
-
- private LoanCOBConstant() {
-
- }
+ void executeStuckJob(String jobName, Long jobId);
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StuckJobExecutorServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StuckJobExecutorServiceImpl.java
new file mode 100644
index 000000000..d69aed9f5
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StuckJobExecutorServiceImpl.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.jobs.service;
+
+import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRES_NEW;
+
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.infrastructure.jobs.data.partitionedjobs.PartitionedJob;
+import org.apache.fineract.infrastructure.jobs.domain.JobExecutionRepository;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.batch.core.launch.JobOperator;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallbackWithoutResult;
+import org.springframework.transaction.support.TransactionTemplate;
+
+@Service
+@Slf4j
+@RequiredArgsConstructor
+public class StuckJobExecutorServiceImpl implements StuckJobExecutorService {
+
+ private final JobExecutionRepository jobExecutionRepository;
+ private final TransactionTemplate transactionTemplate;
+ private final JobOperator jobOperator;
+ private final JobRegisterService jobRegisterService;
+
+ @Override
+ public void executeStuckJob(String jobName, Long jobId) {
+ if (isPartitionedJob(jobName) && areThereStuckJobs(jobName)) {
+ // Restarting stuck Partitioned jobs
+ List<Long> stuckJobIds = getStuckJobIds(jobName);
+ stuckJobIds.forEach(stuckJobId -> handleStuckJob(stuckJobId, getPartitionerStepName(jobName)));
+ } else {
+ // Executing stuck Tasklet jobs
+ jobRegisterService.executeJobWithParameters(jobId, null);
+ }
+ }
+
+ private boolean isPartitionedJob(String jobName) {
+ return PartitionedJob.existsByJobName(jobName);
+ }
+
+ private String getPartitionerStepName(String name) {
+ return PartitionedJob.valueOf(name).getPartitionerStepName();
+ }
+
+ private boolean areThereStuckJobs(String jobName) {
+ Long stuckJobCount = jobExecutionRepository.getStuckJobCountByJobName(jobName);
+ return stuckJobCount != 0L;
+ }
+
+ private List<Long> getStuckJobIds(String jobName) {
+ return jobExecutionRepository.getStuckJobIdsByJobName(jobName);
+ }
+
+ private void handleStuckJob(Long stuckJobId, String partitionerStepName) {
+ try {
+ waitUntilAllPartitionsFinished(stuckJobId, partitionerStepName);
+ transactionTemplate.setPropagationBehavior(PROPAGATION_REQUIRES_NEW);
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+
+ @Override
+ protected void doInTransactionWithoutResult(@NotNull TransactionStatus status) {
+ jobExecutionRepository.updateJobStatusToFailed(stuckJobId, partitionerStepName);
+ }
+ });
+ jobOperator.restart(stuckJobId);
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while handling a stuck job", e);
+ }
+ }
+
+ private void waitUntilAllPartitionsFinished(Long stuckJobId, String partitionerStepName) throws InterruptedException {
+ while (!areAllPartitionsCompleted(stuckJobId, partitionerStepName)) {
+ log.info("Sleeping for a second to wait for the partitions to complete for job {}", stuckJobId);
+ Thread.sleep(1000);
+ }
+ }
+
+ private boolean areAllPartitionsCompleted(Long stuckJobId, String partitionerStepName) {
+ Long notCompletedPartitions = jobExecutionRepository.getNotCompletedPartitionsCount(stuckJobId, partitionerStepName);
+ return notCompletedPartitions == 0L;
+ }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StuckJobListener.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StuckJobListener.java
new file mode 100644
index 000000000..948f7be59
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/StuckJobListener.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.jobs.service;
+
+import java.time.LocalDate;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
+import org.apache.fineract.infrastructure.businessdate.service.BusinessDateReadPlatformService;
+import org.apache.fineract.infrastructure.core.domain.ActionContext;
+import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import org.apache.fineract.infrastructure.event.external.service.JdbcTemplateFactory;
+import org.apache.fineract.infrastructure.jobs.domain.JobExecutionRepository;
+import org.apache.fineract.infrastructure.jobs.service.jobname.JobNameData;
+import org.apache.fineract.infrastructure.jobs.service.jobname.JobNameProvider;
+import org.apache.fineract.infrastructure.security.service.TenantDetailsService;
+import org.springframework.batch.core.configuration.JobRegistry;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+@ConditionalOnProperty(value = "fineract.mode.batch-manager-enabled", havingValue = "true")
+public class StuckJobListener implements ApplicationListener<ContextRefreshedEvent> {
+
+ private final JobExecutionRepository jobExecutionRepository;
+ private final JdbcTemplateFactory jdbcTemplateFactory;
+ private final TenantDetailsService tenantDetailsService;
+ private final JobNameProvider jobNameProvider;
+ private final JobRegistry jobRegistry;
+ private final BusinessDateReadPlatformService businessDateReadPlatformService;
+ private final StuckJobExecutorService stuckJobExecutorService;
+
+ @Override
+ public void onApplicationEvent(ContextRefreshedEvent event) {
+ if (!jobRegistry.getJobNames().isEmpty()) {
+ List<FineractPlatformTenant> allTenants = tenantDetailsService.findAllTenants();
+ allTenants.forEach(tenant -> {
+ NamedParameterJdbcTemplate namedParameterJdbcTemplate = jdbcTemplateFactory.createNamedParameterJdbcTemplate(tenant);
+ List<String> stuckJobNames = jobExecutionRepository.getStuckJobNames(namedParameterJdbcTemplate);
+ if (!stuckJobNames.isEmpty()) {
+ try {
+ ThreadLocalContextUtil.setTenant(tenant);
+ HashMap<BusinessDateType, LocalDate> businessDates = businessDateReadPlatformService.getBusinessDates();
+ ThreadLocalContextUtil.setActionContext(ActionContext.DEFAULT);
+ ThreadLocalContextUtil.setBusinessDates(businessDates);
+ stuckJobNames.forEach(stuckJobName -> {
+ String sql = "select id from job where name = :jobName";
+ Optional<JobNameData> jobNameDataOptional = jobNameProvider.provide().stream()
+ .filter(jobNameData -> stuckJobName.equals(jobNameData.getEnumStyleName())).findFirst();
+ JobNameData jobNameData = jobNameDataOptional
+ .orElseThrow(() -> new IllegalArgumentException("Job not found by name: " + stuckJobName));
+ Long stuckJobId = namedParameterJdbcTemplate.queryForObject(sql,
+ Map.of("jobName", jobNameData.getHumanReadableName()), Long.class);
+ stuckJobExecutorService.executeStuckJob(stuckJobName, stuckJobId);
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ ThreadLocalContextUtil.reset();
+ }
+ }
+ });
+ }
+ }
+}