You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by av...@apache.org on 2021/10/14 10:54:34 UTC
[fineract] branch develop updated: Feat: Node aware job scheduler
(#1886)
This is an automated email from the ASF dual-hosted git repository.
avikg pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new e4b270c Feat: Node aware job scheduler (#1886)
e4b270c is described below
commit e4b270c0f5be571d7268a87a2cd612acd4ef84d8
Author: Benura Abeywardena <43...@users.noreply.github.com>
AuthorDate: Thu Oct 14 16:24:21 2021 +0530
Feat: Node aware job scheduler (#1886)
---
docker-compose.yml | 1 +
.../glaccount/domain/GLAccountRepository.java | 3 ++-
.../domain/GLAccountRepositoryWrapper.java | 3 ++-
.../journalentry/data/JournalEntryData.java | 3 ++-
.../constants/ChartOfAcountsConstants.java | 4 +--
.../chartofaccounts/ChartOfAccountsWorkbook.java | 17 +++++++------
.../jobs/domain/ScheduledJobDetail.java | 18 ++++++++++++++
.../jobs/domain/ScheduledJobDetailRepository.java | 7 ++++++
.../exception/JobNodeIdMismatchingException.java} | 18 ++++++++------
.../infrastructure/jobs/service/JobName.java | 3 ++-
.../jobs/service/JobRegisterServiceImpl.java | 29 +++++++++++++++++++---
.../service/SchedularWritePlatformService.java | 2 +-
...dularWritePlatformServiceJpaRepositoryImpl.java | 4 +--
.../portfolio/loanaccount/domain/Loan.java | 3 ++-
.../service/ScheduledJobRunnerService.java | 2 ++
.../service/ScheduledJobRunnerServiceImpl.java | 26 ++++++++++++++++++-
.../core_db/V373__node_aware_scheduler_jobs.sql | 26 +++++++++++++++++++
kubernetes/fineract-server-deployment.yml | 2 ++
18 files changed, 141 insertions(+), 30 deletions(-)
diff --git a/docker-compose.yml b/docker-compose.yml
index 3724c34..2513129 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -45,6 +45,7 @@ services:
environment:
- DRIVERCLASS_NAME=org.drizzle.jdbc.DrizzleDriver
- PROTOCOL=jdbc
+ - node_id=1
- SUB_PROTOCOL=mysql:thin
- fineract_tenants_driver=org.drizzle.jdbc.DrizzleDriver
- fineract_tenants_url=jdbc:mysql:thin://fineractmysql:3306/fineract_tenants
diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java
index cf258bd..59dd985 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java
@@ -23,7 +23,8 @@ import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
public interface GLAccountRepository extends JpaRepository<GLAccount, Long>, JpaSpecificationExecutor<GLAccount> {
+
// no added behaviour
- //adding behaviour to fetch id by glcode for opening balance bulk import
+ // adding behaviour to fetch id by glcode for opening balance bulk import
Optional<GLAccount> findOneByGlCode(String glCode);
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java
index 1482c84..b66345e 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java
@@ -40,7 +40,8 @@ public class GLAccountRepositoryWrapper {
public GLAccount findOneWithNotFoundDetection(final Long id) {
return this.repository.findById(id).orElseThrow(() -> new GLAccountNotFoundException(id));
}
- //finding account id by glcode for opening balance bulk import
+
+ // finding account id by glcode for opening balance bulk import
public GLAccount findOneByGlCodeWithNotFoundDetection(final String glCode) {
return this.repository.findOneByGlCode(glCode).orElseThrow(() -> new GLAccountNotFoundException(glCode));
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java b/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java
index 22865ed..42cb2dd 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java
@@ -90,7 +90,8 @@ public class JournalEntryData {
private String routingCode;
private String receiptNumber;
private String bankNumber;
- //for opening bal bulk import
+
+ // for opening bal bulk import
public JournalEntryData(Long officeId, LocalDate transactionDate, String currencyCode, List<CreditDebit> credits,
List<CreditDebit> debits, String locale, String dateFormat) {
this.officeId = officeId;
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java
index 72b5414..0dd414b 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java
@@ -34,7 +34,7 @@ public final class ChartOfAcountsConstants {
public static final int TAG_COL = 7;// H
public static final int TAG_ID_COL = 8;// I
public static final int DESCRIPTION_COL = 9;// J
- //adding for opening balance bulk import
+ // adding for opening balance bulk import
public static final int OFFICE_COL = 10; // K
public static final int OFFICE_COL_ID = 11; // L
public static final int CURRENCY_CODE = 12; // M
@@ -47,7 +47,7 @@ public final class ChartOfAcountsConstants {
public static final int LOOKUP_TAG_COL = 21; // V
public static final int LOOKUP_TAG_ID_COL = 22; // W
- //adding for opening balance bulk import
+ // adding for opening balance bulk import
public static final int LOOKUP_OFFICE_COL = 23; // X
public static final int LOOKUP_OFFICE_ID_COL = 24; // Y
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java
index 68a775d..5f96176 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java
@@ -113,9 +113,12 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
CellRangeAddressList tagRange = new CellRangeAddressList(1, SpreadsheetVersion.EXCEL97.getLastRowIndex(),
ChartOfAcountsConstants.TAG_COL, ChartOfAcountsConstants.TAG_COL);
CellRangeAddressList officeNameRange = new CellRangeAddressList(1, SpreadsheetVersion.EXCEL97.getLastRowIndex(),
- ChartOfAcountsConstants.OFFICE_COL, ChartOfAcountsConstants.OFFICE_COL); // validation for opening bal office column
+ ChartOfAcountsConstants.OFFICE_COL, ChartOfAcountsConstants.OFFICE_COL); // validation for opening bal
+ // office column
CellRangeAddressList currencyCodeRange = new CellRangeAddressList(1, SpreadsheetVersion.EXCEL97.getLastRowIndex(),
- ChartOfAcountsConstants.CURRENCY_CODE, ChartOfAcountsConstants.CURRENCY_CODE);// validation for currency code for opening balance
+ ChartOfAcountsConstants.CURRENCY_CODE, ChartOfAcountsConstants.CURRENCY_CODE);// validation for currency
+ // code for opening
+ // balance
DataValidationHelper validationHelper = new HSSFDataValidationHelper((HSSFSheet) chartOfAccountsSheet);
setNames(chartOfAccountsSheet, accountTypesNoDuplicatesList, offices);
@@ -186,7 +189,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
writeFormula(ChartOfAcountsConstants.TAG_ID_COL, row,
"IF(ISERROR(VLOOKUP($H" + (rowNo + 1) + ",$V$2:$W$" + (glAccounts.size() + 1) + ",2,FALSE))," + "\"\",(VLOOKUP($H"
+ (rowNo + 1) + ",$V$2:$W$" + (glAccounts.size() + 1) + ",2,FALSE)))");
- //auto populate office id for bulk import of opening balance
+ // auto populate office id for bulk import of opening balance
writeFormula(ChartOfAcountsConstants.OFFICE_COL_ID, row,
"IF(ISERROR(VLOOKUP($K" + (rowNo + 1) + ",$X$2:$Y$" + (offices.size() + 1) + ",2,FALSE)),\"\",(VLOOKUP($K"
+ (rowNo + 1) + ",$X$2:$Y$" + (offices.size() + 1) + ",2,FALSE)))");
@@ -236,7 +239,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
accountTypeIndex++;
}
}
- //opening balance lookup table of offices
+ // opening balance lookup table of offices
startIndex = 1;
rowIndex = 1;
for (OfficeData office : offices) {
@@ -283,7 +286,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_ACCOUNT_ID_COL, TemplatePopulateImportConstants.MEDIUM_COL_SIZE);
chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_TAG_COL, TemplatePopulateImportConstants.SMALL_COL_SIZE);
chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_TAG_ID_COL, TemplatePopulateImportConstants.SMALL_COL_SIZE);
- //adding lookup for opening balance bulk import
+ // adding lookup for opening balance bulk import
chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_OFFICE_COL, TemplatePopulateImportConstants.MEDIUM_COL_SIZE);
chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_OFFICE_ID_COL, TemplatePopulateImportConstants.SMALL_COL_SIZE);
@@ -297,7 +300,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
writeString(ChartOfAcountsConstants.TAG_COL, rowHeader, "Tag");
writeString(ChartOfAcountsConstants.TAG_ID_COL, rowHeader, "Tag Id");
writeString(ChartOfAcountsConstants.DESCRIPTION_COL, rowHeader, "Description *");
- //adding data for opening balance bulk import
+ // adding data for opening balance bulk import
writeString(ChartOfAcountsConstants.OFFICE_COL, rowHeader, "Parent Office for Opening Balance");
writeString(ChartOfAcountsConstants.OFFICE_COL_ID, rowHeader, "Parent Office Code Opening Balance");
writeString(ChartOfAcountsConstants.CURRENCY_CODE, rowHeader, "Currency Code");
@@ -309,7 +312,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
writeString(ChartOfAcountsConstants.LOOKUP_TAG_ID_COL, rowHeader, "Lookup Tag Id");
writeString(ChartOfAcountsConstants.LOOKUP_ACCOUNT_NAME_COL, rowHeader, "Lookup Account name *");
writeString(ChartOfAcountsConstants.LOOKUP_ACCOUNT_ID_COL, rowHeader, "Lookup Account Id");
- //adding lookup for opening balance bulk import
+ // adding lookup for opening balance bulk import
writeString(ChartOfAcountsConstants.LOOKUP_OFFICE_COL, rowHeader, "Lookup Office Name");
writeString(ChartOfAcountsConstants.LOOKUP_OFFICE_ID_COL, rowHeader, "Lookup Office Id");
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java
index 67a384c..f3f4462 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java
@@ -41,6 +41,12 @@ public class ScheduledJobDetail extends AbstractPersistableCustom {
@Column(name = "display_name")
private String jobDisplayName;
+ @Column(name = "node_id")
+ private Integer nodeId;
+
+ @Column(name = "is_mismatched_job")
+ private boolean isMismatchedJob;
+
@Column(name = "cron_expression")
private String cronExpression;
@@ -135,6 +141,14 @@ public class ScheduledJobDetail extends AbstractPersistableCustom {
this.jobKey = jobKey;
}
+ public boolean getIsMismatchedJob() {
+ return this.isMismatchedJob;
+ }
+
+ public void setIsMismatchedJob(final boolean isMismatchedJob) {
+ this.isMismatchedJob = isMismatchedJob;
+ }
+
public void updateErrorLog(final String errorLog) {
this.errorLog = errorLog;
}
@@ -147,6 +161,10 @@ public class ScheduledJobDetail extends AbstractPersistableCustom {
this.currentlyRunning = currentlyRunning;
}
+ public Integer getNodeId() {
+ return this.nodeId;
+ }
+
public Map<String, Object> update(final JsonCommand command) {
final Map<String, Object> actualChanges = new LinkedHashMap<>(9);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java
index 216a362..c22cc17 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java
@@ -18,6 +18,7 @@
*/
package org.apache.fineract.infrastructure.jobs.domain;
+import java.util.List;
import javax.persistence.LockModeType;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
@@ -38,4 +39,10 @@ public interface ScheduledJobDetailRepository
@Query("select jobDetail from ScheduledJobDetail jobDetail where jobDetail.jobKey = :jobKey")
ScheduledJobDetail findByJobKeyWithLock(@Param("jobKey") String jobKey);
+ @Query("select jobDetail from ScheduledJobDetail jobDetail where jobDetail.isMismatchedJob = :isMismatchedJob")
+ List<ScheduledJobDetail> findAllMismatchedJobs(@Param("isMismatchedJob") boolean isMismatchedJob);
+
+ @Query("select jobDetail from ScheduledJobDetail jobDetail where jobDetail.nodeId = :nodeId or jobDetail.nodeId = 0")
+ List<ScheduledJobDetail> findAllJobs(@Param("nodeId") Integer nodeId);
+
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/exception/JobNodeIdMismatchingException.java
similarity index 59%
copy from fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/exception/JobNodeIdMismatchingException.java
index cf258bd..1bb5c6d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/exception/JobNodeIdMismatchingException.java
@@ -16,14 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.accounting.glaccount.domain;
-import java.util.Optional;
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
+package org.apache.fineract.infrastructure.jobs.exception;
+
+import org.apache.fineract.infrastructure.core.exception.AbstractPlatformDomainRuleException;
+
+public class JobNodeIdMismatchingException extends AbstractPlatformDomainRuleException {
+
+ public JobNodeIdMismatchingException(final String nodeId, final String nodeIdProvided) {
+ super("error.msg.job.cannot.execute.on.node." + nodeIdProvided,
+ "The node id provided `" + nodeIdProvided + "`" + "` does not match with the configured nodeId.", new Object[] { nodeId });
+ }
-public interface GLAccountRepository extends JpaRepository<GLAccount, Long>, JpaSpecificationExecutor<GLAccount> {
- // no added behaviour
- //adding behaviour to fetch id by glcode for opening balance bulk import
- Optional<GLAccount> findOneByGlCode(String glCode);
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
index 7417cc6..2bbc911 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
@@ -46,7 +46,8 @@ public enum JobName {
"Generate AdhocClient Schedule"), UPDATE_EMAIL_OUTBOUND_WITH_CAMPAIGN_MESSAGE(
"Update Email Outbound with campaign message"), EXECUTE_EMAIL(
"Execute Email"), UPDATE_TRAIL_BALANCE_DETAILS(
- "Update Trial Balance Details");
+ "Update Trial Balance Details"), EXECUTE_DIRTY_JOBS(
+ "Execute All Dirty Jobs");
private final String name;
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java
index 3d07344..6bb2061 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.fineract.infrastructure.jobs.domain.JobParameter;
import org.apache.fineract.infrastructure.jobs.domain.JobParameterRepository;
import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail;
import org.apache.fineract.infrastructure.jobs.domain.SchedulerDetail;
+import org.apache.fineract.infrastructure.jobs.exception.JobNodeIdMismatchingException;
import org.apache.fineract.infrastructure.jobs.exception.JobNotFoundException;
import org.apache.fineract.infrastructure.security.service.TenantDetailsService;
import org.quartz.JobDataMap;
@@ -50,6 +51,7 @@ import org.quartz.TriggerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
@@ -118,12 +120,15 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi
this.jobParameterRepository = jobParameterRepository;
}
+ @Value("${node_id:1}")
+ private String nodeId;
+
@PostConstruct
public void loadAllJobs() {
final List<FineractPlatformTenant> allTenants = this.tenantDetailsService.findAllTenants();
for (final FineractPlatformTenant tenant : allTenants) {
ThreadLocalContextUtil.setTenant(tenant);
- final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs();
+ final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs(nodeId);
for (final ScheduledJobDetail jobDetails : scheduledJobDetails) {
scheduleJob(jobDetails);
jobDetails.updateTriggerMisfired(false);
@@ -203,11 +208,12 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi
schedulerDetail.updateSuspendedState(false);
this.schedularWritePlatformService.updateSchedulerDetail(schedulerDetail);
if (schedulerDetail.isExecuteInstructionForMisfiredJobs()) {
- final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs();
+ final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs(this.nodeId);
for (final ScheduledJobDetail jobDetail : scheduledJobDetails) {
if (jobDetail.isTriggerMisfired()) {
if (jobDetail.isActiveSchedular()) {
executeJob(jobDetail, SchedulerServiceConstants.TRIGGER_TYPE_CRON);
+ jobDetail.setIsMismatchedJob(false);
}
final String schedulerName = getSchedulerName(jobDetail);
final Scheduler scheduler = this.schedulers.get(schedulerName);
@@ -236,7 +242,14 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi
@Override
public void rescheduleJob(final Long jobId) {
final ScheduledJobDetail scheduledJobDetail = this.schedularWritePlatformService.findByJobId(jobId);
- rescheduleJob(scheduledJobDetail);
+ final String nodeIdStored = scheduledJobDetail.getNodeId().toString();
+ if (nodeIdStored.equals(this.nodeId) || nodeIdStored.equals("0")) {
+ rescheduleJob(scheduledJobDetail);
+ } else {
+ scheduledJobDetail.setIsMismatchedJob(true);
+ this.schedularWritePlatformService.saveOrUpdate(scheduledJobDetail);
+ throw new JobNodeIdMismatchingException(nodeIdStored, this.nodeId);
+ }
}
@Override
@@ -245,7 +258,15 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi
if (scheduledJobDetail == null) {
throw new JobNotFoundException(String.valueOf(jobId));
}
- executeJob(scheduledJobDetail, null);
+ final String nodeIdStored = scheduledJobDetail.getNodeId().toString();
+
+ if (nodeIdStored.equals(this.nodeId) || nodeIdStored.equals("0")) {
+ executeJob(scheduledJobDetail, null);
+ } else {
+ scheduledJobDetail.setIsMismatchedJob(true);
+ this.schedularWritePlatformService.saveOrUpdate(scheduledJobDetail);
+ throw new JobNodeIdMismatchingException(nodeIdStored, this.nodeId);
+ }
}
@Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java
index 9924303..388b778 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java
@@ -27,7 +27,7 @@ import org.apache.fineract.infrastructure.jobs.domain.SchedulerDetail;
public interface SchedularWritePlatformService {
- List<ScheduledJobDetail> retrieveAllJobs();
+ List<ScheduledJobDetail> retrieveAllJobs(String nodeId);
ScheduledJobDetail findByJobKey(String triggerKey);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java
index 0c99d34..51bbf5a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java
@@ -58,8 +58,8 @@ public class SchedularWritePlatformServiceJpaRepositoryImpl implements Schedular
}
@Override
- public List<ScheduledJobDetail> retrieveAllJobs() {
- return this.scheduledJobDetailsRepository.findAll();
+ public List<ScheduledJobDetail> retrieveAllJobs(final String nodeId) {
+ return this.scheduledJobDetailsRepository.findAllJobs(Integer.parseInt(nodeId));
}
@Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java
index 9f93db5..b0c5625 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java
@@ -1647,7 +1647,8 @@ public class Loan extends AbstractPersistableCustom {
this.fixedPrincipalPercentagePerInstallment)) {
this.fixedPrincipalPercentagePerInstallment = command
.bigDecimalValueOfParameterNamed(LoanApiConstants.fixedPrincipalPercentagePerInstallmentParamName);
- actualChanges.put(LoanApiConstants.fixedPrincipalPercentagePerInstallmentParamName, this.fixedPrincipalPercentagePerInstallment);
+ actualChanges.put(LoanApiConstants.fixedPrincipalPercentagePerInstallmentParamName,
+ this.fixedPrincipalPercentagePerInstallment);
}
return actualChanges;
diff --git a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java
index 3c63230..e0f0cc5 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java
@@ -39,4 +39,6 @@ public interface ScheduledJobRunnerService {
void postDividends() throws JobExecutionException;
void updateTrialBalanceDetails() throws JobExecutionException;
+
+ void executeMissMatchedJobs() throws JobExecutionException;
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java
index 4dfe186..a9260c7 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java
@@ -38,8 +38,11 @@ import org.apache.fineract.infrastructure.core.service.DateUtils;
import org.apache.fineract.infrastructure.core.service.RoutingDataSourceServiceFactory;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.jobs.annotation.CronTarget;
+import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail;
+import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetailRepository;
import org.apache.fineract.infrastructure.jobs.exception.JobExecutionException;
import org.apache.fineract.infrastructure.jobs.service.JobName;
+import org.apache.fineract.infrastructure.jobs.service.JobRegisterService;
import org.apache.fineract.portfolio.savings.DepositAccountType;
import org.apache.fineract.portfolio.savings.DepositAccountUtils;
import org.apache.fineract.portfolio.savings.data.DepositAccountData;
@@ -53,6 +56,7 @@ import org.apache.fineract.portfolio.shareaccounts.service.ShareAccountSchedular
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -74,6 +78,11 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService
private final ShareAccountDividendReadPlatformService shareAccountDividendReadPlatformService;
private final ShareAccountSchedularService shareAccountSchedularService;
private final TrialBalanceRepositoryWrapper trialBalanceRepositoryWrapper;
+ private final JobRegisterService jobRegisterService;
+ private final ScheduledJobDetailRepository scheduledJobDetailsRepository;
+
+ @Value("${node_id:1}")
+ private String nodeId;
@Autowired
public ScheduledJobRunnerServiceImpl(final RoutingDataSourceServiceFactory dataSourceServiceFactory,
@@ -83,7 +92,8 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService
final DepositAccountWritePlatformService depositAccountWritePlatformService,
final ShareAccountDividendReadPlatformService shareAccountDividendReadPlatformService,
final ShareAccountSchedularService shareAccountSchedularService,
- final TrialBalanceRepositoryWrapper trialBalanceRepositoryWrapper) {
+ final TrialBalanceRepositoryWrapper trialBalanceRepositoryWrapper, final JobRegisterService jobRegisterService,
+ final ScheduledJobDetailRepository scheduledJobDetailsRepository) {
this.dataSourceServiceFactory = dataSourceServiceFactory;
this.savingsAccountWritePlatformService = savingsAccountWritePlatformService;
this.savingsAccountChargeReadPlatformService = savingsAccountChargeReadPlatformService;
@@ -92,6 +102,8 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService
this.shareAccountDividendReadPlatformService = shareAccountDividendReadPlatformService;
this.shareAccountSchedularService = shareAccountSchedularService;
this.trialBalanceRepositoryWrapper = trialBalanceRepositoryWrapper;
+ this.jobRegisterService = jobRegisterService;
+ this.scheduledJobDetailsRepository = scheduledJobDetailsRepository;
}
@Transactional
@@ -482,4 +494,16 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService
}
+ @Override
+ @CronTarget(jobName = JobName.EXECUTE_DIRTY_JOBS)
+ public void executeMissMatchedJobs() throws JobExecutionException {
+ List<ScheduledJobDetail> jobDetails = this.scheduledJobDetailsRepository.findAllMismatchedJobs(true);
+
+ for (ScheduledJobDetail scheduledJobDetail : jobDetails) {
+ if (scheduledJobDetail.getNodeId().toString().equals(this.nodeId)) {
+ jobRegisterService.executeJob(scheduledJobDetail.getId());
+ }
+ }
+ }
+
}
diff --git a/fineract-provider/src/main/resources/sql/migrations/core_db/V373__node_aware_scheduler_jobs.sql b/fineract-provider/src/main/resources/sql/migrations/core_db/V373__node_aware_scheduler_jobs.sql
new file mode 100644
index 0000000..9f35521
--- /dev/null
+++ b/fineract-provider/src/main/resources/sql/migrations/core_db/V373__node_aware_scheduler_jobs.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+ALTER TABLE job ADD COLUMN node_id int;
+
+UPDATE job SET node_id = 1;
+
+ALTER TABLE job ADD COLUMN is_mismatched_job TINYINT(1) DEFAULT 1;
+
+INSERT INTO `job` (`node_id`, `name`, `display_name`, `cron_expression`, `create_time`, `task_priority`, `group_name`, `previous_run_start_time`, `next_run_time`, `job_key`, `initializing_errorlog`, `is_active`, `currently_running`, `updates_allowed`, `scheduler_group`, `is_misfired`,`is_mismatched_job`) VALUES (0,'Execute All Dirty Jobs', 'Execute All Dirty Jobs', '0 1 0 1/1 * ? *', now(), 5, NULL, NULL, NULL, 'Execute All Dirty JobsJobDetail1 _ DEFAULT', NULL, 1, 0, 1, 0, 0,0);
diff --git a/kubernetes/fineract-server-deployment.yml b/kubernetes/fineract-server-deployment.yml
index b11d60a..1616318 100644
--- a/kubernetes/fineract-server-deployment.yml
+++ b/kubernetes/fineract-server-deployment.yml
@@ -87,6 +87,8 @@ spec:
value: jdbc
- name: SUB_PROTOCOL
value: mysql:thin
+ - name: node_id
+ value: 1
- name: fineract_tenants_driver
value: org.drizzle.jdbc.DrizzleDriver
- name: fineract_tenants_url