You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by av...@apache.org on 2021/10/14 10:54:34 UTC

[fineract] branch develop updated: Feat: Node aware job scheduler (#1886)

This is an automated email from the ASF dual-hosted git repository.

avikg pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new e4b270c  Feat: Node aware job scheduler (#1886)
e4b270c is described below

commit e4b270c0f5be571d7268a87a2cd612acd4ef84d8
Author: Benura Abeywardena <43...@users.noreply.github.com>
AuthorDate: Thu Oct 14 16:24:21 2021 +0530

    Feat: Node aware job scheduler (#1886)
---
 docker-compose.yml                                 |  1 +
 .../glaccount/domain/GLAccountRepository.java      |  3 ++-
 .../domain/GLAccountRepositoryWrapper.java         |  3 ++-
 .../journalentry/data/JournalEntryData.java        |  3 ++-
 .../constants/ChartOfAcountsConstants.java         |  4 +--
 .../chartofaccounts/ChartOfAccountsWorkbook.java   | 17 +++++++------
 .../jobs/domain/ScheduledJobDetail.java            | 18 ++++++++++++++
 .../jobs/domain/ScheduledJobDetailRepository.java  |  7 ++++++
 .../exception/JobNodeIdMismatchingException.java}  | 18 ++++++++------
 .../infrastructure/jobs/service/JobName.java       |  3 ++-
 .../jobs/service/JobRegisterServiceImpl.java       | 29 +++++++++++++++++++---
 .../service/SchedularWritePlatformService.java     |  2 +-
 ...dularWritePlatformServiceJpaRepositoryImpl.java |  4 +--
 .../portfolio/loanaccount/domain/Loan.java         |  3 ++-
 .../service/ScheduledJobRunnerService.java         |  2 ++
 .../service/ScheduledJobRunnerServiceImpl.java     | 26 ++++++++++++++++++-
 .../core_db/V373__node_aware_scheduler_jobs.sql    | 26 +++++++++++++++++++
 kubernetes/fineract-server-deployment.yml          |  2 ++
 18 files changed, 141 insertions(+), 30 deletions(-)

diff --git a/docker-compose.yml b/docker-compose.yml
index 3724c34..2513129 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -45,6 +45,7 @@ services:
     environment:
       - DRIVERCLASS_NAME=org.drizzle.jdbc.DrizzleDriver
       - PROTOCOL=jdbc
+      - node_id=1
       - SUB_PROTOCOL=mysql:thin
       - fineract_tenants_driver=org.drizzle.jdbc.DrizzleDriver
       - fineract_tenants_url=jdbc:mysql:thin://fineractmysql:3306/fineract_tenants
diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java
index cf258bd..59dd985 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java
@@ -23,7 +23,8 @@ import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
 
 public interface GLAccountRepository extends JpaRepository<GLAccount, Long>, JpaSpecificationExecutor<GLAccount> {
+
     // no added behaviour
-    //adding behaviour to fetch id by glcode for opening balance bulk import
+    // adding behaviour to fetch id by glcode for opening balance bulk import
     Optional<GLAccount> findOneByGlCode(String glCode);
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java
index 1482c84..b66345e 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java
@@ -40,7 +40,8 @@ public class GLAccountRepositoryWrapper {
     public GLAccount findOneWithNotFoundDetection(final Long id) {
         return this.repository.findById(id).orElseThrow(() -> new GLAccountNotFoundException(id));
     }
-    //finding account id by glcode for opening balance bulk import
+
+    // finding account id by glcode for opening balance bulk import
     public GLAccount findOneByGlCodeWithNotFoundDetection(final String glCode) {
         return this.repository.findOneByGlCode(glCode).orElseThrow(() -> new GLAccountNotFoundException(glCode));
     }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java b/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java
index 22865ed..42cb2dd 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java
@@ -90,7 +90,8 @@ public class JournalEntryData {
     private String routingCode;
     private String receiptNumber;
     private String bankNumber;
-    //for opening bal bulk import
+
+    // for opening bal bulk import
     public JournalEntryData(Long officeId, LocalDate transactionDate, String currencyCode, List<CreditDebit> credits,
             List<CreditDebit> debits, String locale, String dateFormat) {
         this.officeId = officeId;
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java
index 72b5414..0dd414b 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java
@@ -34,7 +34,7 @@ public final class ChartOfAcountsConstants {
     public static final int TAG_COL = 7;// H
     public static final int TAG_ID_COL = 8;// I
     public static final int DESCRIPTION_COL = 9;// J
-    //adding for opening balance bulk import
+    // adding for opening balance bulk import
     public static final int OFFICE_COL = 10; // K
     public static final int OFFICE_COL_ID = 11; // L
     public static final int CURRENCY_CODE = 12; // M
@@ -47,7 +47,7 @@ public final class ChartOfAcountsConstants {
     public static final int LOOKUP_TAG_COL = 21; // V
     public static final int LOOKUP_TAG_ID_COL = 22; // W
 
-    //adding for opening balance bulk import
+    // adding for opening balance bulk import
     public static final int LOOKUP_OFFICE_COL = 23; // X
     public static final int LOOKUP_OFFICE_ID_COL = 24; // Y
 
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java
index 68a775d..5f96176 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java
@@ -113,9 +113,12 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
         CellRangeAddressList tagRange = new CellRangeAddressList(1, SpreadsheetVersion.EXCEL97.getLastRowIndex(),
                 ChartOfAcountsConstants.TAG_COL, ChartOfAcountsConstants.TAG_COL);
         CellRangeAddressList officeNameRange = new CellRangeAddressList(1, SpreadsheetVersion.EXCEL97.getLastRowIndex(),
-                ChartOfAcountsConstants.OFFICE_COL, ChartOfAcountsConstants.OFFICE_COL); // validation for opening bal office column
+                ChartOfAcountsConstants.OFFICE_COL, ChartOfAcountsConstants.OFFICE_COL); // validation for opening bal
+                                                                                         // office column
         CellRangeAddressList currencyCodeRange = new CellRangeAddressList(1, SpreadsheetVersion.EXCEL97.getLastRowIndex(),
-                ChartOfAcountsConstants.CURRENCY_CODE, ChartOfAcountsConstants.CURRENCY_CODE);// validation for currency code for opening balance
+                ChartOfAcountsConstants.CURRENCY_CODE, ChartOfAcountsConstants.CURRENCY_CODE);// validation for currency
+                                                                                              // code for opening
+                                                                                              // balance
 
         DataValidationHelper validationHelper = new HSSFDataValidationHelper((HSSFSheet) chartOfAccountsSheet);
         setNames(chartOfAccountsSheet, accountTypesNoDuplicatesList, offices);
@@ -186,7 +189,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
                 writeFormula(ChartOfAcountsConstants.TAG_ID_COL, row,
                         "IF(ISERROR(VLOOKUP($H" + (rowNo + 1) + ",$V$2:$W$" + (glAccounts.size() + 1) + ",2,FALSE))," + "\"\",(VLOOKUP($H"
                                 + (rowNo + 1) + ",$V$2:$W$" + (glAccounts.size() + 1) + ",2,FALSE)))");
-                //auto populate office id for bulk import of opening balance
+                // auto populate office id for bulk import of opening balance
                 writeFormula(ChartOfAcountsConstants.OFFICE_COL_ID, row,
                         "IF(ISERROR(VLOOKUP($K" + (rowNo + 1) + ",$X$2:$Y$" + (offices.size() + 1) + ",2,FALSE)),\"\",(VLOOKUP($K"
                                 + (rowNo + 1) + ",$X$2:$Y$" + (offices.size() + 1) + ",2,FALSE)))");
@@ -236,7 +239,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
                 accountTypeIndex++;
             }
         }
-        //opening balance lookup table of offices
+        // opening balance lookup table of offices
         startIndex = 1;
         rowIndex = 1;
         for (OfficeData office : offices) {
@@ -283,7 +286,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
         chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_ACCOUNT_ID_COL, TemplatePopulateImportConstants.MEDIUM_COL_SIZE);
         chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_TAG_COL, TemplatePopulateImportConstants.SMALL_COL_SIZE);
         chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_TAG_ID_COL, TemplatePopulateImportConstants.SMALL_COL_SIZE);
-        //adding lookup for opening balance bulk import
+        // adding lookup for opening balance bulk import
         chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_OFFICE_COL, TemplatePopulateImportConstants.MEDIUM_COL_SIZE);
         chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_OFFICE_ID_COL, TemplatePopulateImportConstants.SMALL_COL_SIZE);
 
@@ -297,7 +300,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
         writeString(ChartOfAcountsConstants.TAG_COL, rowHeader, "Tag");
         writeString(ChartOfAcountsConstants.TAG_ID_COL, rowHeader, "Tag Id");
         writeString(ChartOfAcountsConstants.DESCRIPTION_COL, rowHeader, "Description *");
-        //adding data for opening balance bulk import
+        // adding data for opening balance bulk import
         writeString(ChartOfAcountsConstants.OFFICE_COL, rowHeader, "Parent Office for Opening Balance");
         writeString(ChartOfAcountsConstants.OFFICE_COL_ID, rowHeader, "Parent Office Code Opening Balance");
         writeString(ChartOfAcountsConstants.CURRENCY_CODE, rowHeader, "Currency Code");
@@ -309,7 +312,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator {
         writeString(ChartOfAcountsConstants.LOOKUP_TAG_ID_COL, rowHeader, "Lookup Tag Id");
         writeString(ChartOfAcountsConstants.LOOKUP_ACCOUNT_NAME_COL, rowHeader, "Lookup Account name *");
         writeString(ChartOfAcountsConstants.LOOKUP_ACCOUNT_ID_COL, rowHeader, "Lookup Account Id");
-        //adding lookup for opening balance bulk import
+        // adding lookup for opening balance bulk import
         writeString(ChartOfAcountsConstants.LOOKUP_OFFICE_COL, rowHeader, "Lookup Office Name");
         writeString(ChartOfAcountsConstants.LOOKUP_OFFICE_ID_COL, rowHeader, "Lookup Office Id");
 
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java
index 67a384c..f3f4462 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java
@@ -41,6 +41,12 @@ public class ScheduledJobDetail extends AbstractPersistableCustom {
     @Column(name = "display_name")
     private String jobDisplayName;
 
+    @Column(name = "node_id")
+    private Integer nodeId;
+
+    @Column(name = "is_mismatched_job")
+    private boolean isMismatchedJob;
+
     @Column(name = "cron_expression")
     private String cronExpression;
 
@@ -135,6 +141,14 @@ public class ScheduledJobDetail extends AbstractPersistableCustom {
         this.jobKey = jobKey;
     }
 
+    public boolean getIsMismatchedJob() {
+        return this.isMismatchedJob;
+    }
+
+    public void setIsMismatchedJob(final boolean isMismatchedJob) {
+        this.isMismatchedJob = isMismatchedJob;
+    }
+
     public void updateErrorLog(final String errorLog) {
         this.errorLog = errorLog;
     }
@@ -147,6 +161,10 @@ public class ScheduledJobDetail extends AbstractPersistableCustom {
         this.currentlyRunning = currentlyRunning;
     }
 
+    public Integer getNodeId() {
+        return this.nodeId;
+    }
+
     public Map<String, Object> update(final JsonCommand command) {
         final Map<String, Object> actualChanges = new LinkedHashMap<>(9);
 
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java
index 216a362..c22cc17 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java
@@ -18,6 +18,7 @@
  */
 package org.apache.fineract.infrastructure.jobs.domain;
 
+import java.util.List;
 import javax.persistence.LockModeType;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
@@ -38,4 +39,10 @@ public interface ScheduledJobDetailRepository
     @Query("select jobDetail from ScheduledJobDetail jobDetail where jobDetail.jobKey = :jobKey")
     ScheduledJobDetail findByJobKeyWithLock(@Param("jobKey") String jobKey);
 
+    @Query("select jobDetail from ScheduledJobDetail jobDetail where jobDetail.isMismatchedJob = :isMismatchedJob")
+    List<ScheduledJobDetail> findAllMismatchedJobs(@Param("isMismatchedJob") boolean isMismatchedJob);
+
+    @Query("select jobDetail from ScheduledJobDetail jobDetail where jobDetail.nodeId = :nodeId or jobDetail.nodeId = 0")
+    List<ScheduledJobDetail> findAllJobs(@Param("nodeId") Integer nodeId);
+
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/exception/JobNodeIdMismatchingException.java
similarity index 59%
copy from fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/exception/JobNodeIdMismatchingException.java
index cf258bd..1bb5c6d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/exception/JobNodeIdMismatchingException.java
@@ -16,14 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.accounting.glaccount.domain;
 
-import java.util.Optional;
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
+package org.apache.fineract.infrastructure.jobs.exception;
+
+import org.apache.fineract.infrastructure.core.exception.AbstractPlatformDomainRuleException;
+
+public class JobNodeIdMismatchingException extends AbstractPlatformDomainRuleException {
+
+    public JobNodeIdMismatchingException(final String nodeId, final String nodeIdProvided) {
+        super("error.msg.job.cannot.execute.on.node." + nodeIdProvided,
+                "The node id provided `" + nodeIdProvided + "`" + "` does not match with the configured nodeId.", new Object[] { nodeId });
+    }
 
-public interface GLAccountRepository extends JpaRepository<GLAccount, Long>, JpaSpecificationExecutor<GLAccount> {
-    // no added behaviour
-    //adding behaviour to fetch id by glcode for opening balance bulk import
-    Optional<GLAccount> findOneByGlCode(String glCode);
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
index 7417cc6..2bbc911 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
@@ -46,7 +46,8 @@ public enum JobName {
                                                                                                                                                                                             "Generate AdhocClient Schedule"), UPDATE_EMAIL_OUTBOUND_WITH_CAMPAIGN_MESSAGE(
                                                                                                                                                                                                     "Update Email Outbound with campaign message"), EXECUTE_EMAIL(
                                                                                                                                                                                                             "Execute Email"), UPDATE_TRAIL_BALANCE_DETAILS(
-                                                                                                                                                                                                                    "Update Trial Balance Details");
+                                                                                                                                                                                                                    "Update Trial Balance Details"), EXECUTE_DIRTY_JOBS(
+                                                                                                                                                                                                                            "Execute All Dirty Jobs");
 
     private final String name;
 
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java
index 3d07344..6bb2061 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.fineract.infrastructure.jobs.domain.JobParameter;
 import org.apache.fineract.infrastructure.jobs.domain.JobParameterRepository;
 import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail;
 import org.apache.fineract.infrastructure.jobs.domain.SchedulerDetail;
+import org.apache.fineract.infrastructure.jobs.exception.JobNodeIdMismatchingException;
 import org.apache.fineract.infrastructure.jobs.exception.JobNotFoundException;
 import org.apache.fineract.infrastructure.security.service.TenantDetailsService;
 import org.quartz.JobDataMap;
@@ -50,6 +51,7 @@ import org.quartz.TriggerListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationListener;
 import org.springframework.context.event.ContextClosedEvent;
@@ -118,12 +120,15 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi
         this.jobParameterRepository = jobParameterRepository;
     }
 
+    @Value("${node_id:1}")
+    private String nodeId;
+
     @PostConstruct
     public void loadAllJobs() {
         final List<FineractPlatformTenant> allTenants = this.tenantDetailsService.findAllTenants();
         for (final FineractPlatformTenant tenant : allTenants) {
             ThreadLocalContextUtil.setTenant(tenant);
-            final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs();
+            final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs(nodeId);
             for (final ScheduledJobDetail jobDetails : scheduledJobDetails) {
                 scheduleJob(jobDetails);
                 jobDetails.updateTriggerMisfired(false);
@@ -203,11 +208,12 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi
             schedulerDetail.updateSuspendedState(false);
             this.schedularWritePlatformService.updateSchedulerDetail(schedulerDetail);
             if (schedulerDetail.isExecuteInstructionForMisfiredJobs()) {
-                final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs();
+                final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs(this.nodeId);
                 for (final ScheduledJobDetail jobDetail : scheduledJobDetails) {
                     if (jobDetail.isTriggerMisfired()) {
                         if (jobDetail.isActiveSchedular()) {
                             executeJob(jobDetail, SchedulerServiceConstants.TRIGGER_TYPE_CRON);
+                            jobDetail.setIsMismatchedJob(false);
                         }
                         final String schedulerName = getSchedulerName(jobDetail);
                         final Scheduler scheduler = this.schedulers.get(schedulerName);
@@ -236,7 +242,14 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi
     @Override
     public void rescheduleJob(final Long jobId) {
         final ScheduledJobDetail scheduledJobDetail = this.schedularWritePlatformService.findByJobId(jobId);
-        rescheduleJob(scheduledJobDetail);
+        final String nodeIdStored = scheduledJobDetail.getNodeId().toString();
+        if (nodeIdStored.equals(this.nodeId) || nodeIdStored.equals("0")) {
+            rescheduleJob(scheduledJobDetail);
+        } else {
+            scheduledJobDetail.setIsMismatchedJob(true);
+            this.schedularWritePlatformService.saveOrUpdate(scheduledJobDetail);
+            throw new JobNodeIdMismatchingException(nodeIdStored, this.nodeId);
+        }
     }
 
     @Override
@@ -245,7 +258,15 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi
         if (scheduledJobDetail == null) {
             throw new JobNotFoundException(String.valueOf(jobId));
         }
-        executeJob(scheduledJobDetail, null);
+        final String nodeIdStored = scheduledJobDetail.getNodeId().toString();
+
+        if (nodeIdStored.equals(this.nodeId) || nodeIdStored.equals("0")) {
+            executeJob(scheduledJobDetail, null);
+        } else {
+            scheduledJobDetail.setIsMismatchedJob(true);
+            this.schedularWritePlatformService.saveOrUpdate(scheduledJobDetail);
+            throw new JobNodeIdMismatchingException(nodeIdStored, this.nodeId);
+        }
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java
index 9924303..388b778 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java
@@ -27,7 +27,7 @@ import org.apache.fineract.infrastructure.jobs.domain.SchedulerDetail;
 
 public interface SchedularWritePlatformService {
 
-    List<ScheduledJobDetail> retrieveAllJobs();
+    List<ScheduledJobDetail> retrieveAllJobs(String nodeId);
 
     ScheduledJobDetail findByJobKey(String triggerKey);
 
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java
index 0c99d34..51bbf5a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java
@@ -58,8 +58,8 @@ public class SchedularWritePlatformServiceJpaRepositoryImpl implements Schedular
     }
 
     @Override
-    public List<ScheduledJobDetail> retrieveAllJobs() {
-        return this.scheduledJobDetailsRepository.findAll();
+    public List<ScheduledJobDetail> retrieveAllJobs(final String nodeId) {
+        return this.scheduledJobDetailsRepository.findAllJobs(Integer.parseInt(nodeId));
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java
index 9f93db5..b0c5625 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java
@@ -1647,7 +1647,8 @@ public class Loan extends AbstractPersistableCustom {
                 this.fixedPrincipalPercentagePerInstallment)) {
             this.fixedPrincipalPercentagePerInstallment = command
                     .bigDecimalValueOfParameterNamed(LoanApiConstants.fixedPrincipalPercentagePerInstallmentParamName);
-            actualChanges.put(LoanApiConstants.fixedPrincipalPercentagePerInstallmentParamName, this.fixedPrincipalPercentagePerInstallment);
+            actualChanges.put(LoanApiConstants.fixedPrincipalPercentagePerInstallmentParamName,
+                    this.fixedPrincipalPercentagePerInstallment);
         }
 
         return actualChanges;
diff --git a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java
index 3c63230..e0f0cc5 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java
@@ -39,4 +39,6 @@ public interface ScheduledJobRunnerService {
     void postDividends() throws JobExecutionException;
 
     void updateTrialBalanceDetails() throws JobExecutionException;
+
+    void executeMissMatchedJobs() throws JobExecutionException;
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java
index 4dfe186..a9260c7 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java
@@ -38,8 +38,11 @@ import org.apache.fineract.infrastructure.core.service.DateUtils;
 import org.apache.fineract.infrastructure.core.service.RoutingDataSourceServiceFactory;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
 import org.apache.fineract.infrastructure.jobs.annotation.CronTarget;
+import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail;
+import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetailRepository;
 import org.apache.fineract.infrastructure.jobs.exception.JobExecutionException;
 import org.apache.fineract.infrastructure.jobs.service.JobName;
+import org.apache.fineract.infrastructure.jobs.service.JobRegisterService;
 import org.apache.fineract.portfolio.savings.DepositAccountType;
 import org.apache.fineract.portfolio.savings.DepositAccountUtils;
 import org.apache.fineract.portfolio.savings.data.DepositAccountData;
@@ -53,6 +56,7 @@ import org.apache.fineract.portfolio.shareaccounts.service.ShareAccountSchedular
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
@@ -74,6 +78,11 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService
     private final ShareAccountDividendReadPlatformService shareAccountDividendReadPlatformService;
     private final ShareAccountSchedularService shareAccountSchedularService;
     private final TrialBalanceRepositoryWrapper trialBalanceRepositoryWrapper;
+    private final JobRegisterService jobRegisterService;
+    private final ScheduledJobDetailRepository scheduledJobDetailsRepository;
+
+    @Value("${node_id:1}")
+    private String nodeId;
 
     @Autowired
     public ScheduledJobRunnerServiceImpl(final RoutingDataSourceServiceFactory dataSourceServiceFactory,
@@ -83,7 +92,8 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService
             final DepositAccountWritePlatformService depositAccountWritePlatformService,
             final ShareAccountDividendReadPlatformService shareAccountDividendReadPlatformService,
             final ShareAccountSchedularService shareAccountSchedularService,
-            final TrialBalanceRepositoryWrapper trialBalanceRepositoryWrapper) {
+            final TrialBalanceRepositoryWrapper trialBalanceRepositoryWrapper, final JobRegisterService jobRegisterService,
+            final ScheduledJobDetailRepository scheduledJobDetailsRepository) {
         this.dataSourceServiceFactory = dataSourceServiceFactory;
         this.savingsAccountWritePlatformService = savingsAccountWritePlatformService;
         this.savingsAccountChargeReadPlatformService = savingsAccountChargeReadPlatformService;
@@ -92,6 +102,8 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService
         this.shareAccountDividendReadPlatformService = shareAccountDividendReadPlatformService;
         this.shareAccountSchedularService = shareAccountSchedularService;
         this.trialBalanceRepositoryWrapper = trialBalanceRepositoryWrapper;
+        this.jobRegisterService = jobRegisterService;
+        this.scheduledJobDetailsRepository = scheduledJobDetailsRepository;
     }
 
     @Transactional
@@ -482,4 +494,16 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService
 
     }
 
+    @Override
+    @CronTarget(jobName = JobName.EXECUTE_DIRTY_JOBS)
+    public void executeMissMatchedJobs() throws JobExecutionException {
+        List<ScheduledJobDetail> jobDetails = this.scheduledJobDetailsRepository.findAllMismatchedJobs(true);
+
+        for (ScheduledJobDetail scheduledJobDetail : jobDetails) {
+            if (scheduledJobDetail.getNodeId().toString().equals(this.nodeId)) {
+                jobRegisterService.executeJob(scheduledJobDetail.getId());
+            }
+        }
+    }
+
 }
diff --git a/fineract-provider/src/main/resources/sql/migrations/core_db/V373__node_aware_scheduler_jobs.sql b/fineract-provider/src/main/resources/sql/migrations/core_db/V373__node_aware_scheduler_jobs.sql
new file mode 100644
index 0000000..9f35521
--- /dev/null
+++ b/fineract-provider/src/main/resources/sql/migrations/core_db/V373__node_aware_scheduler_jobs.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+ALTER TABLE job ADD COLUMN node_id int;
+
+UPDATE job SET node_id = 1;
+
+ALTER TABLE job ADD COLUMN is_mismatched_job TINYINT(1) DEFAULT 1;
+
+INSERT INTO `job` (`node_id`, `name`, `display_name`, `cron_expression`, `create_time`, `task_priority`, `group_name`, `previous_run_start_time`, `next_run_time`, `job_key`, `initializing_errorlog`, `is_active`, `currently_running`, `updates_allowed`, `scheduler_group`, `is_misfired`,`is_mismatched_job`) VALUES (0,'Execute All Dirty Jobs', 'Execute All Dirty Jobs', '0 1 0 1/1 * ? *', now(), 5, NULL, NULL, NULL, 'Execute All Dirty JobsJobDetail1 _ DEFAULT', NULL, 1, 0, 1, 0, 0,0);
diff --git a/kubernetes/fineract-server-deployment.yml b/kubernetes/fineract-server-deployment.yml
index b11d60a..1616318 100644
--- a/kubernetes/fineract-server-deployment.yml
+++ b/kubernetes/fineract-server-deployment.yml
@@ -87,6 +87,8 @@ spec:
           value: jdbc
         - name: SUB_PROTOCOL
           value: mysql:thin
+        - name: node_id
+          value: 1
         - name: fineract_tenants_driver
           value: org.drizzle.jdbc.DrizzleDriver
         - name: fineract_tenants_url