You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ar...@apache.org on 2023/04/24 12:15:24 UTC

[fineract] branch develop updated: FINERACT-1724-Regular-loan-cob-job-having-catch-up-true-catchup-api

This is an automated email from the ASF dual-hosted git repository.

arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new d2082fe2c FINERACT-1724-Regular-loan-cob-job-having-catch-up-true-catchup-api
d2082fe2c is described below

commit d2082fe2cbf351976de7649078b53945e0e3cc36
Author: Ruchi Dhamankar <ru...@gmail.com>
AuthorDate: Fri Apr 14 13:24:19 2023 +0530

    FINERACT-1724-Regular-loan-cob-job-having-catch-up-true-catchup-api
---
 .../InlineLoanCOBBuildExecutionContextTasklet.java | 15 +++-
 .../apache/fineract/cob/loan/LoanCOBConstant.java  |  2 +
 .../service/AsyncLoanCOBExecutorServiceImpl.java   | 15 ++--
 .../service/InlineLoanCOBExecutorServiceImpl.java  | 17 +++--
 .../cob/service/LoanCOBCatchUpServiceImpl.java     |  4 +-
 .../database/DatabaseSpecificSQLGenerator.java     | 11 +++
 .../jobs/domain/CustomJobParameter.java            |  2 +-
 .../jobs/domain/CustomJobParameterRepository.java  | 11 ++-
 .../domain/CustomJobParameterRepositoryImpl.java   | 82 ++++++++++++++++++++++
 .../jobs/domain/JobExecutionRepository.java        | 43 +++++++++++-
 .../LoanCOBJobParameterProvider.java               | 20 +-----
 .../db/changelog/tenant/changelog-tenant.xml       |  1 +
 ...parameter_json_column_custom_job_parameters.xml | 54 ++++++++++++++
 13 files changed, 234 insertions(+), 43 deletions(-)

diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineLoanCOBBuildExecutionContextTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineLoanCOBBuildExecutionContextTasklet.java
index 3eb01ef41..79868b76c 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineLoanCOBBuildExecutionContextTasklet.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineLoanCOBBuildExecutionContextTasklet.java
@@ -24,16 +24,19 @@ import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.fineract.cob.COBBusinessStepService;
 import org.apache.fineract.cob.data.BusinessStepNameAndOrder;
+import org.apache.fineract.cob.exceptions.CustomJobParameterNotFoundException;
 import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
 import org.apache.fineract.infrastructure.core.domain.ActionContext;
 import org.apache.fineract.infrastructure.core.serialization.GoogleGsonSerializerHelper;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
 import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameter;
 import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameterRepository;
 import org.apache.fineract.infrastructure.springbatch.SpringBatchJobConstants;
@@ -76,7 +79,11 @@ public class InlineLoanCOBBuildExecutionContextTasklet implements Tasklet {
         CustomJobParameter customJobParameter = customJobParameterRepository.findById(customJobParameterId)
                 .orElseThrow(() -> new LoanNotFoundException(customJobParameterId));
         String parameterJson = customJobParameter.getParameterJson();
-        return gson.fromJson(parameterJson, new TypeToken<String>() {}.getType());
+        Set<JobParameterDTO> jobParameters = gson.fromJson(parameterJson, new TypeToken<HashSet<JobParameterDTO>>() {}.getType());
+        JobParameterDTO businessDateParameter = jobParameters.stream()
+                .filter(jobParameterDTO -> jobParameterDTO.getParameterName().equals(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME))
+                .findFirst().orElseThrow(() -> new CustomJobParameterNotFoundException(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME));
+        return businessDateParameter.getParameterValue();
     }
 
     private List<Long> getLoanIdsFromJobParameters(ChunkContext chunkContext) {
@@ -85,6 +92,10 @@ public class InlineLoanCOBBuildExecutionContextTasklet implements Tasklet {
         CustomJobParameter customJobParameter = customJobParameterRepository.findById(customJobParameterId)
                 .orElseThrow(() -> new LoanNotFoundException(customJobParameterId));
         String parameterJson = customJobParameter.getParameterJson();
-        return gson.fromJson(parameterJson, new TypeToken<ArrayList<Long>>() {}.getType());
+        Set<JobParameterDTO> jobParameters = gson.fromJson(parameterJson, new TypeToken<HashSet<JobParameterDTO>>() {}.getType());
+        JobParameterDTO loanIdsParameter = jobParameters.stream()
+                .filter(jobParameterDTO -> jobParameterDTO.getParameterName().equals(LoanCOBConstant.LOAN_IDS_PARAMETER_NAME)).findFirst()
+                .orElseThrow(() -> new CustomJobParameterNotFoundException(LoanCOBConstant.LOAN_IDS_PARAMETER_NAME));
+        return gson.fromJson(loanIdsParameter.getParameterValue(), new TypeToken<ArrayList<Long>>() {}.getType());
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
index af53bab8d..97f09d7bb 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
@@ -30,8 +30,10 @@ public final class LoanCOBConstant {
     public static final String INLINE_LOAN_COB_JOB_NAME = "INLINE_LOAN_COB";
     public static final String BUSINESS_DATE_PARAMETER_NAME = "BusinessDate";
     public static final String IS_CATCH_UP_PARAMETER_NAME = "IS_CATCH_UP";
+    public static final String LOAN_IDS_PARAMETER_NAME = "LoanIds";
 
     public static final String LOAN_COB_PARTITIONER_STEP = "Loan COB partition - Step";
+    public static final String LOAN_COB_CUSTOM_JOB_PARAMETER_KEY = "CUSTOM_JOB_PARAMETER_ID";
 
     public static final Long NUMBER_OF_DAYS_BEHIND = 1L;
 
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
index 871ea2e39..72d09a774 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
@@ -21,6 +21,7 @@ package org.apache.fineract.cob.service;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import lombok.RequiredArgsConstructor;
@@ -31,7 +32,6 @@ import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
 import org.apache.fineract.infrastructure.core.domain.FineractContext;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
 import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
-import org.apache.fineract.infrastructure.jobs.domain.JobParameter;
 import org.apache.fineract.infrastructure.jobs.domain.JobParameterRepository;
 import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail;
 import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetailRepository;
@@ -92,18 +92,11 @@ public class AsyncLoanCOBExecutorServiceImpl implements AsyncLoanCOBExecutorServ
         while (!executingBusinessDate.isAfter(cobBusinessDate)) {
             JobParameterDTO jobParameterDTO = new JobParameterDTO(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME,
                     executingBusinessDate.format(DateTimeFormatter.ISO_DATE));
-            Set<JobParameterDTO> jobParameters = Collections.singleton(jobParameterDTO);
-            saveCatchUpJobParameter(scheduledJobDetail);
+            JobParameterDTO jobParameterCatchUpDTO = new JobParameterDTO(LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME, "true");
+            Set<JobParameterDTO> jobParameters = new HashSet<>();
+            Collections.addAll(jobParameters, jobParameterDTO, jobParameterCatchUpDTO);
             jobStarter.run(job, scheduledJobDetail, jobParameters);
             executingBusinessDate = executingBusinessDate.plusDays(1);
         }
     }
-
-    private void saveCatchUpJobParameter(ScheduledJobDetail scheduledJobDetail) {
-        JobParameter jobParameter = new JobParameter();
-        jobParameter.setJobId(scheduledJobDetail.getId());
-        jobParameter.setParameterName(LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME);
-        jobParameter.setParameterValue("true");
-        jobParameterRepository.save(jobParameter);
-    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/service/InlineLoanCOBExecutorServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/cob/service/InlineLoanCOBExecutorServiceImpl.java
index d378133fe..7576f603f 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/service/InlineLoanCOBExecutorServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/service/InlineLoanCOBExecutorServiceImpl.java
@@ -25,11 +25,13 @@ import com.google.gson.Gson;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -49,7 +51,7 @@ import org.apache.fineract.infrastructure.core.exception.PlatformInternalServerE
 import org.apache.fineract.infrastructure.core.exception.PlatformRequestBodyItemLimitValidationException;
 import org.apache.fineract.infrastructure.core.serialization.GoogleGsonSerializerHelper;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
-import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameter;
+import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
 import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameterRepository;
 import org.apache.fineract.infrastructure.jobs.exception.JobNotFoundException;
 import org.apache.fineract.infrastructure.jobs.service.InlineExecutorService;
@@ -195,12 +197,13 @@ public class InlineLoanCOBExecutorServiceImpl implements InlineExecutorService<L
     private Map<String, JobParameter> getJobParametersMap(List<Long> loanIds, LocalDate businessDate) {
         // TODO: refactor for a more generic solution
         String parameterJson = gson.toJson(loanIds);
-        CustomJobParameter loanIdsJobParameter = new CustomJobParameter();
-        loanIdsJobParameter.setParameterJson(parameterJson);
-        Long loanIdsJobParameterId = customJobParameterRepository.saveAndFlush(loanIdsJobParameter).getId();
-        CustomJobParameter businessDateJobParameter = new CustomJobParameter();
-        businessDateJobParameter.setParameterJson(gson.toJson(businessDate.format(DateTimeFormatter.ISO_DATE)));
-        Long businessDateJobParameterId = customJobParameterRepository.saveAndFlush(businessDateJobParameter).getId();
+        JobParameterDTO loanIdsParameterDTO = new JobParameterDTO(LoanCOBConstant.LOAN_IDS_PARAMETER_NAME, parameterJson);
+        Set<JobParameterDTO> loanIdJobParameter = Collections.singleton(loanIdsParameterDTO);
+        Long loanIdsJobParameterId = customJobParameterRepository.save(loanIdJobParameter);
+        JobParameterDTO businessDateParameterDTO = new JobParameterDTO(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME,
+                businessDate.format(DateTimeFormatter.ISO_DATE));
+        Set<JobParameterDTO> businessDateJobParameter = Collections.singleton(businessDateParameterDTO);
+        Long businessDateJobParameterId = customJobParameterRepository.save(businessDateJobParameter);
         Map<String, JobParameter> jobParameterMap = new HashMap<>();
         jobParameterMap.put(SpringBatchJobConstants.CUSTOM_JOB_PARAMETER_ID_KEY, new JobParameter(loanIdsJobParameterId));
         jobParameterMap.put(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME, new JobParameter(businessDateJobParameterId));
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/service/LoanCOBCatchUpServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/cob/service/LoanCOBCatchUpServiceImpl.java
index e238abb4d..e5c9231de 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/service/LoanCOBCatchUpServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/service/LoanCOBCatchUpServiceImpl.java
@@ -66,8 +66,8 @@ public class LoanCOBCatchUpServiceImpl implements LoanCOBCatchUpService {
 
     @Override
     public IsCatchUpRunningDTO isCatchUpRunning() {
-        List<Long> runningCatchUpExecutionIds = jobExecutionRepository.getRunningJobsByExecutionParameter(LoanCOBConstant.JOB_NAME,
-                LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME, "true");
+        List<Long> runningCatchUpExecutionIds = jobExecutionRepository.getRunningJobsIdsByExecutionParameter(LoanCOBConstant.JOB_NAME,
+                LoanCOBConstant.LOAN_COB_CUSTOM_JOB_PARAMETER_KEY, LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME, "true");
         if (CollectionUtils.isNotEmpty(runningCatchUpExecutionIds)) {
             JobExecution jobExecution = jobExplorer.getJobExecution(runningCatchUpExecutionIds.get(0));
             String executionDateString = (String) jobExecution.getExecutionContext().get(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/database/DatabaseSpecificSQLGenerator.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/database/DatabaseSpecificSQLGenerator.java
index c1936aff6..57d511577 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/database/DatabaseSpecificSQLGenerator.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/database/DatabaseSpecificSQLGenerator.java
@@ -28,6 +28,7 @@ import org.springframework.stereotype.Component;
 public class DatabaseSpecificSQLGenerator {
 
     private final DatabaseTypeResolver databaseTypeResolver;
+    public static final String SELECT_CLAUSE = "SELECT %s";
 
     @Autowired
     public DatabaseSpecificSQLGenerator(DatabaseTypeResolver databaseTypeResolver) {
@@ -168,4 +169,14 @@ public class DatabaseSpecificSQLGenerator {
             throw new IllegalStateException("Database type is not supported for current schema " + databaseTypeResolver.databaseType());
         }
     }
+
+    public String castJson(String sql) {
+        if (databaseTypeResolver.isMySQL()) {
+            return format("%s", sql);
+        } else if (databaseTypeResolver.isPostgreSQL()) {
+            return format("%s ::json", sql);
+        } else {
+            throw new IllegalStateException("Database type is not supported for casting to json " + databaseTypeResolver.databaseType());
+        }
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameter.java
index 11a8fd9dd..2292763f9 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameter.java
@@ -33,6 +33,6 @@ import org.apache.fineract.infrastructure.core.domain.AbstractPersistableCustom;
 @Setter
 public class CustomJobParameter extends AbstractPersistableCustom {
 
-    @Column(name = "parameter_json", nullable = false)
+    @Column(name = "parameter_json", nullable = false, columnDefinition = "json")
     private String parameterJson;
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepository.java
index 2cb820e0e..b502e7fcc 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepository.java
@@ -18,6 +18,13 @@
  */
 package org.apache.fineract.infrastructure.jobs.domain;
 
-import org.springframework.data.jpa.repository.JpaRepository;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
 
-public interface CustomJobParameterRepository extends JpaRepository<CustomJobParameter, Long> {}
+public interface CustomJobParameterRepository {
+
+    Long save(Set<JobParameterDTO> customJobParameters);
+
+    Optional<CustomJobParameter> findById(Long Id);
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepositoryImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepositoryImpl.java
new file mode 100644
index 000000000..c2333c983
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepositoryImpl.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.jobs.domain;
+
+import com.google.gson.Gson;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import lombok.RequiredArgsConstructor;
+import org.apache.fineract.infrastructure.core.serialization.GoogleGsonSerializerHelper;
+import org.apache.fineract.infrastructure.core.service.database.DatabaseSpecificSQLGenerator;
+import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ResultSetExtractor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import org.springframework.stereotype.Component;
+
+@RequiredArgsConstructor
+@Component
+public class CustomJobParameterRepositoryImpl implements CustomJobParameterRepository {
+
+    private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
+    private final DatabaseSpecificSQLGenerator databaseSpecificSQLGenerator;
+    private final Gson gson = GoogleGsonSerializerHelper.createSimpleGson();
+
+    @Override
+    public Long save(Set<JobParameterDTO> customJobParameters) {
+        Objects.requireNonNull(customJobParameters);
+        final String insertSQL = "INSERT INTO batch_custom_job_parameters (parameter_json) VALUES (%s)"
+                .formatted(databaseSpecificSQLGenerator.castJson(":jsonString"));
+        final String jsonString = gson.toJson(customJobParameters);
+        SqlParameterSource parameters = new MapSqlParameterSource("jsonString", jsonString);
+        namedParameterJdbcTemplate.update(insertSQL, parameters);
+        final Long customParameterId = namedParameterJdbcTemplate.getJdbcTemplate().queryForObject(
+                DatabaseSpecificSQLGenerator.SELECT_CLAUSE.formatted(databaseSpecificSQLGenerator.lastInsertId()), Long.class);
+        return customParameterId;
+    }
+
+    @Override
+    public Optional<CustomJobParameter> findById(Long id) {
+        Objects.requireNonNull(id);
+        CustomJobParameterExtractor customJobParameterExtractor = new CustomJobParameterExtractor();
+        SqlParameterSource parameters = new MapSqlParameterSource("id", id);
+        return Optional.ofNullable(namedParameterJdbcTemplate.query(
+                "SELECT cjp.parameter_json AS parameter_json FROM batch_custom_job_parameters cjp WHERE cjp.id = :id", parameters,
+                customJobParameterExtractor));
+    }
+
+    private static final class CustomJobParameterExtractor implements ResultSetExtractor<CustomJobParameter> {
+
+        @Override
+        public CustomJobParameter extractData(ResultSet rs) throws SQLException, DataAccessException {
+            CustomJobParameter jobParameter = null;
+            if (rs.next()) {
+                jobParameter = new CustomJobParameter();
+                jobParameter.setParameterJson(rs.getString("parameter_json"));
+            }
+            return jobParameter;
+        }
+    }
+
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java
index 0b4753059..b6f10b3e6 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java
@@ -24,19 +24,32 @@ import static org.springframework.batch.core.BatchStatus.STARTED;
 import static org.springframework.batch.core.BatchStatus.STARTING;
 import static org.springframework.batch.core.BatchStatus.UNKNOWN;
 
+import com.google.gson.Gson;
 import java.util.List;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.serialization.GoogleGsonSerializerHelper;
+import org.apache.fineract.infrastructure.core.service.database.DatabaseTypeResolver;
+import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class JobExecutionRepository {
+public class JobExecutionRepository implements InitializingBean {
 
     private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
     private final FineractProperties fineractProperties;
+    private final DatabaseTypeResolver databaseTypeResolver;
+    private final GoogleGsonSerializerHelper gsonFactory;
+    private Gson gson;
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        this.gson = gsonFactory.createSimpleGson();
+    }
 
     public List<String> getStuckJobNames(NamedParameterJdbcTemplate jdbcTemplate) {
         int threshold = fineractProperties.getJob().getStuckRetryThreshold();
@@ -180,4 +193,32 @@ public class JobExecutionRepository {
                 """, Map.of("statuses", List.of(STARTED.name(), STARTING.name()), "jobName", jobName, "completedStatus", COMPLETED.name(),
                 "parameterKeyName", parameterKeyName, "parameterValue", parameterValue), Long.class);
     }
+
+    public List<Long> getRunningJobsIdsByExecutionParameter(String jobName, String jobCustomParamKeyName, String parameterKeyName,
+            String parameterValue) {
+        final StringBuilder sqlStatementBuilder = new StringBuilder();
+        String jsonString = gson.toJson(new JobParameterDTO(parameterKeyName, parameterValue));
+        sqlStatementBuilder.append(
+                "SELECT bje.JOB_EXECUTION_ID FROM BATCH_JOB_INSTANCE bji INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID INNER JOIN BATCH_JOB_EXECUTION_PARAMS bjep ON bje.JOB_EXECUTION_ID = bjep.JOB_EXECUTION_ID"
+                        + " WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bjep.KEY_NAME = :jobCustomParamKeyName AND bjep.LONG_VAL IN ("
+                        + getSubQueryForCustomJobParameters()
+                        + ") AND bje.JOB_INSTANCE_ID NOT IN (SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID"
+                        + " WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)");
+        return namedParameterJdbcTemplate.queryForList(
+                sqlStatementBuilder.toString(), Map.of("statuses", List.of(STARTED.name(), STARTING.name()), "jobName", jobName,
+                        "completedStatus", COMPLETED.name(), "jobCustomParamKeyName", jobCustomParamKeyName, "jsonString", jsonString),
+                Long.class);
+    }
+
+    private String getSubQueryForCustomJobParameters() {
+        if (databaseTypeResolver.isMySQL()) {
+            return "SELECT cjp.id FROM batch_custom_job_parameters cjp WHERE JSON_CONTAINS(cjp.parameter_json,:jsonString)";
+        } else if (databaseTypeResolver.isPostgreSQL()) {
+            return "SELECT cjp.id FROM (SELECT id,json_array_elements(parameter_json) AS json_data FROM batch_custom_job_parameters) AS cjp WHERE (cjp.json_data ::jsonb @> :jsonString ::jsonb)";
+        } else {
+            throw new IllegalStateException("Database type is not supported for json query " + databaseTypeResolver.databaseType());
+        }
+
+    }
+
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/LoanCOBJobParameterProvider.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/LoanCOBJobParameterProvider.java
index f84be7816..7ea3ceb01 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/LoanCOBJobParameterProvider.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/LoanCOBJobParameterProvider.java
@@ -18,7 +18,6 @@
  */
 package org.apache.fineract.infrastructure.jobs.service.jobparameterprovider;
 
-import com.google.gson.Gson;
 import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,40 +27,27 @@ import java.util.Set;
 import lombok.RequiredArgsConstructor;
 import org.apache.fineract.cob.loan.LoanCOBConstant;
 import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
-import org.apache.fineract.infrastructure.core.serialization.GoogleGsonSerializerHelper;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
 import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
-import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameter;
 import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameterRepository;
 import org.apache.fineract.infrastructure.jobs.service.JobName;
 import org.apache.fineract.infrastructure.springbatch.SpringBatchJobConstants;
 import org.springframework.batch.core.JobParameter;
-import org.springframework.beans.factory.InitializingBean;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
 @Component
 @RequiredArgsConstructor
-public class LoanCOBJobParameterProvider extends AbstractJobParameterProvider implements InitializingBean {
+public class LoanCOBJobParameterProvider extends AbstractJobParameterProvider {
 
     private final CustomJobParameterRepository customJobParameterRepository;
-    private final GoogleGsonSerializerHelper gsonFactory;
-
-    private Gson gson;
-
-    @Override
-    public void afterPropertiesSet() throws Exception {
-        this.gson = gsonFactory.createSimpleGson();
-    }
 
     @Override
     @Transactional
     public Map<String, JobParameter> provide(Set<JobParameterDTO> jobParameterDTOSet) {
         Map<String, JobParameter> jobParameterMap = new HashMap<>();
-        CustomJobParameter customJobParameter = new CustomJobParameter();
-        customJobParameter.setParameterJson(gson.toJson(getJobParameterDTOListWithCorrectBusinessDate(jobParameterDTOSet)));
-        CustomJobParameter savedCustomJobParameter = customJobParameterRepository.saveAndFlush(customJobParameter);
-        jobParameterMap.put(SpringBatchJobConstants.CUSTOM_JOB_PARAMETER_ID_KEY, new JobParameter(savedCustomJobParameter.getId()));
+        Long customJobParameterId = customJobParameterRepository.save(getJobParameterDTOListWithCorrectBusinessDate(jobParameterDTOSet));
+        jobParameterMap.put(SpringBatchJobConstants.CUSTOM_JOB_PARAMETER_ID_KEY, new JobParameter(customJobParameterId));
         return jobParameterMap;
     }
 
diff --git a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
index 84d8e7be6..9c6ad412d 100644
--- a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
+++ b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
@@ -122,4 +122,5 @@
     <include file="parts/0100_new_repayment_strategy.xml" relativeToChangelogFile="true" />
     <include file="parts/0101_update_transaction_summary_table_report.xml" relativeToChangelogFile="true" />
     <include file="parts/0102_add_external_event_for_loan_reschedule.xml" relativeToChangelogFile="true" />
+    <include file="parts/0103_modify_parameter_json_column_custom_job_parameters.xml" relativeToChangelogFile="true" />
 </databaseChangeLog>
diff --git a/fineract-provider/src/main/resources/db/changelog/tenant/parts/0103_modify_parameter_json_column_custom_job_parameters.xml b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0103_modify_parameter_json_column_custom_job_parameters.xml
new file mode 100644
index 000000000..6b93fee16
--- /dev/null
+++ b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0103_modify_parameter_json_column_custom_job_parameters.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
+                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd">
+    <changeSet id="1" author="fineract">
+        <modifyDataType columnName="parameter_json"
+                        newDataType="json"
+                        tableName="batch_custom_job_parameters"/>
+    </changeSet>
+    <changeSet author="fineract" id="1-postgresql" context="postgresql">
+        <sql>
+            UPDATE batch_custom_job_parameters SET parameter_json = json_build_array(json_build_object('parameterName','LoanIds','parameterValue',parameter_json)) WHERE id IN (SELECT DISTINCT cjp.id FROM (SELECT id,json_array_elements(parameter_json) AS json_data FROM batch_custom_job_parameters cjp1 WHERE json_typeof(cjp1.parameter_json)='array') AS cjp WHERE (json_typeof(cjp.json_data) = 'number'));
+            UPDATE batch_custom_job_parameters SET parameter_json = json_build_array(json_build_object('parameterName','BusinessDate','parameterValue',parameter_json)) WHERE id IN (SELECT DISTINCT cjp.id FROM batch_custom_job_parameters cjp WHERE json_typeof(parameter_json) IS DISTINCT FROM 'array');
+        </sql>
+    </changeSet>
+    <changeSet author="fineract" id="2-mysql" context="mysql">
+        <sql>
+            UPDATE batch_custom_job_parameters SET parameter_json = JSON_ARRAY(JSON_OBJECT('parameterName','LoanIds','parameterValue',parameter_json)) WHERE id IN (SELECT DISTINCT cjp.id FROM (SELECT * FROM batch_custom_job_parameters) cjp WHERE JSON_TYPE(cjp.parameter_json) = 'ARRAY' AND JSON_TYPE(JSON_EXTRACT(cjp.parameter_json, '$[0]')) = 'INTEGER');
+            UPDATE batch_custom_job_parameters SET parameter_json = JSON_ARRAY(JSON_OBJECT('parameterName','BusinessDate','parameterValue',parameter_json)) WHERE id IN (SELECT DISTINCT cjp.id FROM (SELECT * FROM batch_custom_job_parameters) cjp WHERE JSON_TYPE(cjp.parameter_json) != 'ARRAY');
+        </sql>
+    </changeSet>
+    <changeSet author="fineract" id="4">
+        <!-- Delete existing rows in batch job execution params that have IS_CATCH_UP key, so that it is not picked up for next loan cob job execution -->
+        <delete tableName="BATCH_JOB_EXECUTION_PARAMS">
+            <where>KEY_NAME='IS_CATCH_UP'</where>
+        </delete>
+    </changeSet>
+    <changeSet author="fineract" id="5">
+        <!-- Delete existing rows in job parameter table that have IS_CATCH_UP key, so that it is not picked up for next loan cob job execution -->
+        <delete tableName="job_parameters">
+            <where>parameter_name='IS_CATCH_UP'</where>
+        </delete>
+    </changeSet>
+</databaseChangeLog>