You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ad...@apache.org on 2023/04/12 12:54:00 UTC

[fineract] branch develop updated: FINERACT-1724-Regular-loan-cob-job-having-catch-up-true

This is an automated email from the ASF dual-hosted git repository.

adamsaghy pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new 63fa315f1 FINERACT-1724-Regular-loan-cob-job-having-catch-up-true
63fa315f1 is described below

commit 63fa315f15c56cd769d484e2918b0fd4012001b7
Author: Ruchi Dhamankar <ru...@gmail.com>
AuthorDate: Fri Apr 7 20:59:15 2023 +0530

    FINERACT-1724-Regular-loan-cob-job-having-catch-up-true
---
 .../apache/fineract/cob/loan/LoanCOBConstant.java  |  1 +
 .../service/AsyncLoanCOBExecutorServiceImpl.java   | 15 ++---
 .../service/InlineLoanCOBExecutorServiceImpl.java  |  9 +--
 .../cob/service/LoanCOBCatchUpServiceImpl.java     |  4 +-
 .../database/DatabaseSpecificSQLGenerator.java     | 11 ++++
 .../jobs/domain/CustomJobParameter.java            |  2 +-
 .../jobs/domain/CustomJobParameterRepository.java  |  9 ++-
 .../domain/CustomJobParameterRepositoryImpl.java   | 74 ++++++++++++++++++++++
 .../jobs/domain/JobExecutionRepository.java        | 43 ++++++++++++-
 .../LoanCOBJobParameterProvider.java               |  8 +--
 .../db/changelog/tenant/changelog-tenant.xml       |  1 +
 ...parameter_json_column_custom_job_parameters.xml | 30 +++++++++
 12 files changed, 178 insertions(+), 29 deletions(-)

diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
index 8a1e4d024..35563e02b 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBConstant.java
@@ -33,6 +33,7 @@ public final class LoanCOBConstant {
     public static final String IS_CATCH_UP_PARAMETER_NAME = "IS_CATCH_UP";
 
     public static final String LOAN_COB_PARTITIONER_STEP = "Loan COB partition - Step";
+    public static final String LOAN_COB_CUSTOM_JOB_PARAMETER_KEY = "CUSTOM_JOB_PARAMETER_ID";
 
     private LoanCOBConstant() {
 
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
index 871ea2e39..72d09a774 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java
@@ -21,6 +21,7 @@ package org.apache.fineract.cob.service;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import lombok.RequiredArgsConstructor;
@@ -31,7 +32,6 @@ import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
 import org.apache.fineract.infrastructure.core.domain.FineractContext;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
 import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
-import org.apache.fineract.infrastructure.jobs.domain.JobParameter;
 import org.apache.fineract.infrastructure.jobs.domain.JobParameterRepository;
 import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail;
 import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetailRepository;
@@ -92,18 +92,11 @@ public class AsyncLoanCOBExecutorServiceImpl implements AsyncLoanCOBExecutorServ
         while (!executingBusinessDate.isAfter(cobBusinessDate)) {
             JobParameterDTO jobParameterDTO = new JobParameterDTO(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME,
                     executingBusinessDate.format(DateTimeFormatter.ISO_DATE));
-            Set<JobParameterDTO> jobParameters = Collections.singleton(jobParameterDTO);
-            saveCatchUpJobParameter(scheduledJobDetail);
+            JobParameterDTO jobParameterCatchUpDTO = new JobParameterDTO(LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME, "true");
+            Set<JobParameterDTO> jobParameters = new HashSet<>();
+            Collections.addAll(jobParameters, jobParameterDTO, jobParameterCatchUpDTO);
             jobStarter.run(job, scheduledJobDetail, jobParameters);
             executingBusinessDate = executingBusinessDate.plusDays(1);
         }
     }
-
-    private void saveCatchUpJobParameter(ScheduledJobDetail scheduledJobDetail) {
-        JobParameter jobParameter = new JobParameter();
-        jobParameter.setJobId(scheduledJobDetail.getId());
-        jobParameter.setParameterName(LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME);
-        jobParameter.setParameterValue("true");
-        jobParameterRepository.save(jobParameter);
-    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/service/InlineLoanCOBExecutorServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/cob/service/InlineLoanCOBExecutorServiceImpl.java
index d378133fe..92b55528c 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/service/InlineLoanCOBExecutorServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/service/InlineLoanCOBExecutorServiceImpl.java
@@ -49,7 +49,6 @@ import org.apache.fineract.infrastructure.core.exception.PlatformInternalServerE
 import org.apache.fineract.infrastructure.core.exception.PlatformRequestBodyItemLimitValidationException;
 import org.apache.fineract.infrastructure.core.serialization.GoogleGsonSerializerHelper;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
-import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameter;
 import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameterRepository;
 import org.apache.fineract.infrastructure.jobs.exception.JobNotFoundException;
 import org.apache.fineract.infrastructure.jobs.service.InlineExecutorService;
@@ -195,12 +194,8 @@ public class InlineLoanCOBExecutorServiceImpl implements InlineExecutorService<L
     private Map<String, JobParameter> getJobParametersMap(List<Long> loanIds, LocalDate businessDate) {
         // TODO: refactor for a more generic solution
         String parameterJson = gson.toJson(loanIds);
-        CustomJobParameter loanIdsJobParameter = new CustomJobParameter();
-        loanIdsJobParameter.setParameterJson(parameterJson);
-        Long loanIdsJobParameterId = customJobParameterRepository.saveAndFlush(loanIdsJobParameter).getId();
-        CustomJobParameter businessDateJobParameter = new CustomJobParameter();
-        businessDateJobParameter.setParameterJson(gson.toJson(businessDate.format(DateTimeFormatter.ISO_DATE)));
-        Long businessDateJobParameterId = customJobParameterRepository.saveAndFlush(businessDateJobParameter).getId();
+        Long loanIdsJobParameterId = customJobParameterRepository.save(parameterJson);
+        Long businessDateJobParameterId = customJobParameterRepository.save(gson.toJson(businessDate.format(DateTimeFormatter.ISO_DATE)));
         Map<String, JobParameter> jobParameterMap = new HashMap<>();
         jobParameterMap.put(SpringBatchJobConstants.CUSTOM_JOB_PARAMETER_ID_KEY, new JobParameter(loanIdsJobParameterId));
         jobParameterMap.put(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME, new JobParameter(businessDateJobParameterId));
diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/service/LoanCOBCatchUpServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/cob/service/LoanCOBCatchUpServiceImpl.java
index f9267695a..223591e6b 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/cob/service/LoanCOBCatchUpServiceImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/cob/service/LoanCOBCatchUpServiceImpl.java
@@ -66,8 +66,8 @@ public class LoanCOBCatchUpServiceImpl implements LoanCOBCatchUpService {
 
     @Override
     public IsCatchUpRunningDTO isCatchUpRunning() {
-        List<Long> runningCatchUpExecutionIds = jobExecutionRepository.getRunningJobsByExecutionParameter(LoanCOBConstant.JOB_NAME,
-                LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME, "true");
+        List<Long> runningCatchUpExecutionIds = jobExecutionRepository.getRunningJobsIdsByExecutionParameter(LoanCOBConstant.JOB_NAME,
+                LoanCOBConstant.LOAN_COB_CUSTOM_JOB_PARAMETER_KEY, LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME, "true");
         if (CollectionUtils.isNotEmpty(runningCatchUpExecutionIds)) {
             JobExecution jobExecution = jobExplorer.getJobExecution(runningCatchUpExecutionIds.get(0));
             String executionDateString = (String) jobExecution.getExecutionContext().get(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME);
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/database/DatabaseSpecificSQLGenerator.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/database/DatabaseSpecificSQLGenerator.java
index c1936aff6..57d511577 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/database/DatabaseSpecificSQLGenerator.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/service/database/DatabaseSpecificSQLGenerator.java
@@ -28,6 +28,7 @@ import org.springframework.stereotype.Component;
 public class DatabaseSpecificSQLGenerator {
 
     private final DatabaseTypeResolver databaseTypeResolver;
+    public static final String SELECT_CLAUSE = "SELECT %s";
 
     @Autowired
     public DatabaseSpecificSQLGenerator(DatabaseTypeResolver databaseTypeResolver) {
@@ -168,4 +169,14 @@ public class DatabaseSpecificSQLGenerator {
             throw new IllegalStateException("Database type is not supported for current schema " + databaseTypeResolver.databaseType());
         }
     }
+
+    public String castJson(String sql) {
+        if (databaseTypeResolver.isMySQL()) {
+            return format("%s", sql);
+        } else if (databaseTypeResolver.isPostgreSQL()) {
+            return format("%s ::json", sql);
+        } else {
+            throw new IllegalStateException("Database type is not supported for casting to json " + databaseTypeResolver.databaseType());
+        }
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameter.java
index 11a8fd9dd..2292763f9 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameter.java
@@ -33,6 +33,6 @@ import org.apache.fineract.infrastructure.core.domain.AbstractPersistableCustom;
 @Setter
 public class CustomJobParameter extends AbstractPersistableCustom {
 
-    @Column(name = "parameter_json", nullable = false)
+    @Column(name = "parameter_json", nullable = false, columnDefinition = "json")
     private String parameterJson;
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepository.java
index 2cb820e0e..52119523f 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepository.java
@@ -18,6 +18,11 @@
  */
 package org.apache.fineract.infrastructure.jobs.domain;
 
-import org.springframework.data.jpa.repository.JpaRepository;
+import java.util.Optional;
 
-public interface CustomJobParameterRepository extends JpaRepository<CustomJobParameter, Long> {}
+public interface CustomJobParameterRepository {
+
+    Long save(String jsonString);
+
+    Optional<CustomJobParameter> findById(Long Id);
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepositoryImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepositoryImpl.java
new file mode 100644
index 000000000..0542ff007
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/CustomJobParameterRepositoryImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.jobs.domain;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import org.apache.fineract.infrastructure.core.service.database.DatabaseSpecificSQLGenerator;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ResultSetExtractor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import org.springframework.stereotype.Component;
+
+@RequiredArgsConstructor
+@Component
+public class CustomJobParameterRepositoryImpl implements CustomJobParameterRepository {
+
+    private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
+    private final DatabaseSpecificSQLGenerator databaseSpecificSQLGenerator;
+
+    @Override
+    public Long save(String jsonString) {
+        final StringBuilder insertSqlStatementBuilder = new StringBuilder(500);
+        insertSqlStatementBuilder.append("INSERT INTO batch_custom_job_parameters (parameter_json) VALUES (%s)"
+                .formatted(databaseSpecificSQLGenerator.castJson(":jsonString")));
+        SqlParameterSource parameters = new MapSqlParameterSource("jsonString", jsonString);
+        namedParameterJdbcTemplate.update(insertSqlStatementBuilder.toString(), parameters);
+        final Long customParameterId = namedParameterJdbcTemplate.getJdbcTemplate().queryForObject(
+                DatabaseSpecificSQLGenerator.SELECT_CLAUSE.formatted(databaseSpecificSQLGenerator.lastInsertId()), Long.class);
+        return customParameterId;
+    }
+
+    @Override
+    public Optional<CustomJobParameter> findById(Long id) {
+        CustomJobParameterExtractor customJobParameterExtractor = new CustomJobParameterExtractor();
+        final StringBuilder sqlStatementBuilder = new StringBuilder(500);
+        sqlStatementBuilder.append("SELECT cjp.parameter_json AS parameter_json FROM batch_custom_job_parameters cjp WHERE cjp.id = :id");
+        SqlParameterSource parameters = new MapSqlParameterSource("id", id);
+        return namedParameterJdbcTemplate.query(sqlStatementBuilder.toString(), parameters, customJobParameterExtractor);
+    }
+
+    private static final class CustomJobParameterExtractor implements ResultSetExtractor<Optional<CustomJobParameter>> {
+
+        @Override
+        public Optional<CustomJobParameter> extractData(ResultSet rs) throws SQLException, DataAccessException {
+            if (rs.next()) {
+                CustomJobParameter jobParameter = new CustomJobParameter();
+                jobParameter.setParameterJson(rs.getString("parameter_json"));
+                return Optional.of(jobParameter);
+            }
+            return Optional.empty();
+        }
+    }
+
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java
index 0b4753059..b6f10b3e6 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/JobExecutionRepository.java
@@ -24,19 +24,32 @@ import static org.springframework.batch.core.BatchStatus.STARTED;
 import static org.springframework.batch.core.BatchStatus.STARTING;
 import static org.springframework.batch.core.BatchStatus.UNKNOWN;
 
+import com.google.gson.Gson;
 import java.util.List;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.serialization.GoogleGsonSerializerHelper;
+import org.apache.fineract.infrastructure.core.service.database.DatabaseTypeResolver;
+import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class JobExecutionRepository {
+public class JobExecutionRepository implements InitializingBean {
 
     private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
     private final FineractProperties fineractProperties;
+    private final DatabaseTypeResolver databaseTypeResolver;
+    private final GoogleGsonSerializerHelper gsonFactory;
+    private Gson gson;
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        this.gson = gsonFactory.createSimpleGson();
+    }
 
     public List<String> getStuckJobNames(NamedParameterJdbcTemplate jdbcTemplate) {
         int threshold = fineractProperties.getJob().getStuckRetryThreshold();
@@ -180,4 +193,32 @@ public class JobExecutionRepository {
                 """, Map.of("statuses", List.of(STARTED.name(), STARTING.name()), "jobName", jobName, "completedStatus", COMPLETED.name(),
                 "parameterKeyName", parameterKeyName, "parameterValue", parameterValue), Long.class);
     }
+
+    public List<Long> getRunningJobsIdsByExecutionParameter(String jobName, String jobCustomParamKeyName, String parameterKeyName,
+            String parameterValue) {
+        final StringBuilder sqlStatementBuilder = new StringBuilder();
+        String jsonString = gson.toJson(new JobParameterDTO(parameterKeyName, parameterValue));
+        sqlStatementBuilder.append(
+                "SELECT bje.JOB_EXECUTION_ID FROM BATCH_JOB_INSTANCE bji INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID INNER JOIN BATCH_JOB_EXECUTION_PARAMS bjep ON bje.JOB_EXECUTION_ID = bjep.JOB_EXECUTION_ID"
+                        + " WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bjep.KEY_NAME = :jobCustomParamKeyName AND bjep.LONG_VAL IN ("
+                        + getSubQueryForCustomJobParameters()
+                        + ") AND bje.JOB_INSTANCE_ID NOT IN (SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID"
+                        + " WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)");
+        return namedParameterJdbcTemplate.queryForList(
+                sqlStatementBuilder.toString(), Map.of("statuses", List.of(STARTED.name(), STARTING.name()), "jobName", jobName,
+                        "completedStatus", COMPLETED.name(), "jobCustomParamKeyName", jobCustomParamKeyName, "jsonString", jsonString),
+                Long.class);
+    }
+
+    private String getSubQueryForCustomJobParameters() {
+        if (databaseTypeResolver.isMySQL()) {
+            return "SELECT cjp.id FROM batch_custom_job_parameters cjp WHERE JSON_CONTAINS(cjp.parameter_json,:jsonString)";
+        } else if (databaseTypeResolver.isPostgreSQL()) {
+            return "SELECT cjp.id FROM (SELECT id,json_array_elements(parameter_json) AS json_data FROM batch_custom_job_parameters) AS cjp WHERE (cjp.json_data ::jsonb @> :jsonString ::jsonb)";
+        } else {
+            throw new IllegalStateException("Database type is not supported for json query " + databaseTypeResolver.databaseType());
+        }
+
+    }
+
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/LoanCOBJobParameterProvider.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/LoanCOBJobParameterProvider.java
index f84be7816..8a7027e64 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/LoanCOBJobParameterProvider.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/LoanCOBJobParameterProvider.java
@@ -31,7 +31,6 @@ import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
 import org.apache.fineract.infrastructure.core.serialization.GoogleGsonSerializerHelper;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
 import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
-import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameter;
 import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameterRepository;
 import org.apache.fineract.infrastructure.jobs.service.JobName;
 import org.apache.fineract.infrastructure.springbatch.SpringBatchJobConstants;
@@ -58,10 +57,9 @@ public class LoanCOBJobParameterProvider extends AbstractJobParameterProvider im
     @Transactional
     public Map<String, JobParameter> provide(Set<JobParameterDTO> jobParameterDTOSet) {
         Map<String, JobParameter> jobParameterMap = new HashMap<>();
-        CustomJobParameter customJobParameter = new CustomJobParameter();
-        customJobParameter.setParameterJson(gson.toJson(getJobParameterDTOListWithCorrectBusinessDate(jobParameterDTOSet)));
-        CustomJobParameter savedCustomJobParameter = customJobParameterRepository.saveAndFlush(customJobParameter);
-        jobParameterMap.put(SpringBatchJobConstants.CUSTOM_JOB_PARAMETER_ID_KEY, new JobParameter(savedCustomJobParameter.getId()));
+        Long customJobParameterId = customJobParameterRepository
+                .save(gson.toJson(getJobParameterDTOListWithCorrectBusinessDate(jobParameterDTOSet)));
+        jobParameterMap.put(SpringBatchJobConstants.CUSTOM_JOB_PARAMETER_ID_KEY, new JobParameter(customJobParameterId));
         return jobParameterMap;
     }
 
diff --git a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
index 84d8e7be6..9c6ad412d 100644
--- a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
+++ b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
@@ -122,4 +122,5 @@
     <include file="parts/0100_new_repayment_strategy.xml" relativeToChangelogFile="true" />
     <include file="parts/0101_update_transaction_summary_table_report.xml" relativeToChangelogFile="true" />
     <include file="parts/0102_add_external_event_for_loan_reschedule.xml" relativeToChangelogFile="true" />
+    <include file="parts/0103_modify_parameter_json_column_custom_job_parameters.xml" relativeToChangelogFile="true" />
 </databaseChangeLog>
diff --git a/fineract-provider/src/main/resources/db/changelog/tenant/parts/0103_modify_parameter_json_column_custom_job_parameters.xml b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0103_modify_parameter_json_column_custom_job_parameters.xml
new file mode 100644
index 000000000..a17d17630
--- /dev/null
+++ b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0103_modify_parameter_json_column_custom_job_parameters.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
+                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd">
+    <changeSet id="1" author="fineract">
+        <modifyDataType columnName="parameter_json"
+                        newDataType="json"
+                        tableName="batch_custom_job_parameters"/>
+    </changeSet>
+</databaseChangeLog>