You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2014/01/02 10:27:55 UTC
[2/2] git commit: BATCHEE-9 ServicesManager shouldn't be contextual
once an action started
BATCHEE-9 ServicesManager shouldn't be contextual once an action started
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/8ddf9f75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/8ddf9f75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/8ddf9f75
Branch: refs/heads/master
Commit: 8ddf9f754b731efe18ec48aedcc3cb0aa98814d3
Parents: b303e37
Author: Romain Manni-Bucau <rm...@apache.org>
Authored: Thu Jan 2 10:27:34 2014 +0100
Committer: Romain Manni-Bucau <rm...@apache.org>
Committed: Thu Jan 2 10:27:34 2014 +0100
----------------------------------------------------------------------
.../container/impl/JobExecutionImpl.java | 18 ++--
.../batchee/container/impl/JobOperatorImpl.java | 93 +++++++++++---------
.../impl/controller/BaseStepController.java | 42 +++++----
.../impl/controller/DecisionController.java | 11 ++-
.../ExecutionElementControllerFactory.java | 25 +++---
.../impl/controller/ExecutionTransitioner.java | 20 +++--
.../impl/controller/FlowController.java | 7 +-
.../FlowInSplitThreadRootController.java | 5 +-
.../impl/controller/JobController.java | 9 +-
.../controller/JobThreadRootController.java | 21 +++--
.../PartitionThreadRootController.java | 5 +-
.../controller/PartitionedStepController.java | 30 ++++---
.../SingleThreadedStepController.java | 14 ++-
.../impl/controller/SplitController.java | 6 +-
.../batchlet/BatchletStepController.java | 11 ++-
.../chunk/CheckpointAlgorithmFactory.java | 5 +-
.../controller/chunk/CheckpointManager.java | 5 +-
.../controller/chunk/ChunkStepController.java | 45 +++++-----
.../impl/jobinstance/JobExecutionHelper.java | 92 ++++++++++---------
.../RuntimeFlowInSplitExecution.java | 6 +-
.../impl/jobinstance/RuntimeJobExecution.java | 5 +-
.../container/proxy/ListenerFactory.java | 54 ++++++------
.../batchee/container/proxy/ProxyFactory.java | 46 +++++-----
.../container/services/ServicesManager.java | 38 +++-----
.../services/kernel/DefaultBatchKernel.java | 37 ++++----
.../services/locator/ClassLoaderLocator.java | 15 ++--
.../persistence/JDBCPersistenceManager.java | 8 +-
.../persistence/JPAPersistenceService.java | 8 +-
.../persistence/MemoryPersistenceManager.java | 7 +-
.../status/DefaultJobStatusManager.java | 8 +-
.../util/BatchFlowInSplitWorkUnit.java | 11 +--
.../container/util/BatchParallelWorkUnit.java | 6 +-
.../container/util/BatchPartitionWorkUnit.java | 11 +--
.../batchee/container/util/BatchWorkUnit.java | 11 +--
.../batchee/servlet/CleanUpWebappListener.java | 2 +-
.../org/apache/batchee/test/jmx/JMXTest.java | 4 +-
jbatch/src/test/resources/suites/dev-suite.xml | 6 +-
37 files changed, 405 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
index 0767168..68ecb5e 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
@@ -19,7 +19,6 @@ package org.apache.batchee.container.impl;
import org.apache.batchee.container.services.InternalJobExecution;
import org.apache.batchee.spi.PersistenceManagerService;
import org.apache.batchee.spi.PersistenceManagerService.TimestampType;
-import org.apache.batchee.container.services.ServicesManager;
import javax.batch.runtime.BatchStatus;
import java.sql.Timestamp;
@@ -27,7 +26,7 @@ import java.util.Date;
import java.util.Properties;
public class JobExecutionImpl implements InternalJobExecution {
- private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
+ private final PersistenceManagerService persistenceManagerService;
private long executionID = 0L;
private long instanceID = 0L;
@@ -50,7 +49,8 @@ public class JobExecutionImpl implements InternalJobExecution {
this.jobContext = jobContext;
}
- public JobExecutionImpl(long executionId, long instanceId) {
+ public JobExecutionImpl(final long executionId, final long instanceId, final PersistenceManagerService persistenceManagerService) {
+ this.persistenceManagerService = persistenceManagerService;
this.executionID = executionId;
this.instanceID = instanceId;
}
@@ -61,7 +61,7 @@ public class JobExecutionImpl implements InternalJobExecution {
return this.jobContext.getBatchStatus();
} else {
// old job, retrieve from the backend
- final String name = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionBatchStatus(executionID);
+ final String name = persistenceManagerService.jobOperatorQueryJobExecutionBatchStatus(executionID);
if (name != null) {
return BatchStatus.valueOf(name);
}
@@ -71,7 +71,7 @@ public class JobExecutionImpl implements InternalJobExecution {
@Override
public Date getCreateTime() {
- final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.CREATE);
+ final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.CREATE);
if (ts != null) {
createTime = ts;
}
@@ -84,7 +84,7 @@ public class JobExecutionImpl implements InternalJobExecution {
@Override
public Date getEndTime() {
- final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.END);
+ final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.END);
if (ts != null) {
endTime = ts;
}
@@ -106,7 +106,7 @@ public class JobExecutionImpl implements InternalJobExecution {
return this.jobContext.getExitStatus();
}
- final String persistenceExitStatus = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionExitStatus(executionID);
+ final String persistenceExitStatus = persistenceManagerService.jobOperatorQueryJobExecutionExitStatus(executionID);
if (persistenceExitStatus != null) {
exitStatus = persistenceExitStatus;
}
@@ -116,7 +116,7 @@ public class JobExecutionImpl implements InternalJobExecution {
@Override
public Date getLastUpdatedTime() {
- final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.LAST_UPDATED);
+ final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.LAST_UPDATED);
if (ts != null) {
this.updateTime = ts;
}
@@ -129,7 +129,7 @@ public class JobExecutionImpl implements InternalJobExecution {
@Override
public Date getStartTime() {
- final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.STARTED);
+ final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.STARTED);
if (ts != null) {
startTime = ts;
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
index c970534..28fab55 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
@@ -101,15 +101,24 @@ public class JobOperatorImpl implements JobOperator {
public static final String JBATCH_ADMIN = "admin";
- private static final BatchKernelService KERNEL_SERVICE = ServicesManager.service(BatchKernelService.class);
- private static final PersistenceManagerService PERSISTENCE_SERVICE = ServicesManager.service(PersistenceManagerService.class);
- private static final JobXMLLoaderService XML_LOADER_SERVICE = ServicesManager.service(JobXMLLoaderService.class);
- private static final JobStatusManagerService STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
- private static final SecurityService SECURITY_SERVICE= ServicesManager.service(SecurityService.class);
+ private final BatchKernelService kernelService;
+ private final PersistenceManagerService persistenceManagerService;
+ private final JobXMLLoaderService xmlLoaderService;
+ private final JobStatusManagerService statusManagerService;
+ private final SecurityService securityService;
+
+ public JobOperatorImpl() {
+ final ServicesManager servicesManager = ServicesManager.find();
+ kernelService = servicesManager.service(BatchKernelService.class);
+ persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+ xmlLoaderService = servicesManager.service(JobXMLLoaderService.class);
+ statusManagerService = servicesManager.service(JobStatusManagerService.class);
+ securityService = servicesManager.service(SecurityService.class);
+ }
@Override
public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(Permissions.START.name)) {
+ if (!securityService.isAuthorized(Permissions.START.name)) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
@@ -138,18 +147,18 @@ public class JobOperatorImpl implements JobOperator {
jobParameterWriter.write("Job parameters on start = null");
}
- final String jobXML = XML_LOADER_SERVICE.loadJSL(jobXMLName);
- final InternalJobExecution execution = KERNEL_SERVICE.startJob(jobXML, jobParameters);
+ final String jobXML = xmlLoaderService.loadJSL(jobXMLName);
+ final InternalJobExecution execution = kernelService.startJob(jobXML, jobParameters);
return execution.getExecutionId();
}
@Override
public void abandon(final long executionId) throws NoSuchJobExecutionException, JobExecutionIsRunningException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+ if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
- final InternalJobExecution jobEx = PERSISTENCE_SERVICE.jobOperatorGetJobExecution(executionId);
+ final InternalJobExecution jobEx = persistenceManagerService.jobOperatorGetJobExecution(executionId);
// if it is not in STARTED or STARTING state, mark it as ABANDONED
if (jobEx.getBatchStatus().equals(BatchStatus.STARTED) || jobEx.getBatchStatus().equals(BatchStatus.STARTING)) {
@@ -157,31 +166,31 @@ public class JobOperatorImpl implements JobOperator {
}
// update table to reflect ABANDONED state
- PERSISTENCE_SERVICE.updateBatchStatusOnly(jobEx.getExecutionId(), BatchStatus.ABANDONED, new Timestamp(System.currentTimeMillis()));
+ persistenceManagerService.updateBatchStatusOnly(jobEx.getExecutionId(), BatchStatus.ABANDONED, new Timestamp(System.currentTimeMillis()));
// Don't forget to update JOBSTATUS table
- STATUS_MANAGER_SERVICE.updateJobBatchStatus(jobEx.getInstanceId(), BatchStatus.ABANDONED);
+ statusManagerService.updateJobBatchStatus(jobEx.getInstanceId(), BatchStatus.ABANDONED);
}
@Override
public InternalJobExecution getJobExecution(final long executionId)
throws NoSuchJobExecutionException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+ if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
- return KERNEL_SERVICE.getJobExecution(executionId);
+ return kernelService.getJobExecution(executionId);
}
@Override
public List<JobExecution> getJobExecutions(final JobInstance instance)
throws NoSuchJobInstanceException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(instance.getInstanceId())) {
+ if (!securityService.isAuthorized(instance.getInstanceId())) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
// Mediate between one
final List<JobExecution> executions = new ArrayList<JobExecution>();
- List<InternalJobExecution> executionImpls = PERSISTENCE_SERVICE.jobOperatorGetJobExecutions(instance.getInstanceId());
+ List<InternalJobExecution> executionImpls = persistenceManagerService.jobOperatorGetJobExecutions(instance.getInstanceId());
if (executionImpls.size() == 0) {
throw new NoSuchJobInstanceException("Job: " + instance.getJobName() + " does not exist");
}
@@ -194,20 +203,20 @@ public class JobOperatorImpl implements JobOperator {
@Override
public JobInstance getJobInstance(long executionId)
throws NoSuchJobExecutionException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+ if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
- return KERNEL_SERVICE.getJobInstance(executionId);
+ return kernelService.getJobInstance(executionId);
}
@Override
public int getJobInstanceCount(String jobName) throws NoSuchJobException, JobSecurityException {
int jobInstanceCount;
- if (SECURITY_SERVICE.isAuthorized(JBATCH_ADMIN)) {
+ if (securityService.isAuthorized(JBATCH_ADMIN)) {
// Do an unfiltered query
- jobInstanceCount = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceCount(jobName);
+ jobInstanceCount = persistenceManagerService.jobOperatorGetJobInstanceCount(jobName);
} else {
- jobInstanceCount = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceCount(jobName, SECURITY_SERVICE.getLoggedUser());
+ jobInstanceCount = persistenceManagerService.jobOperatorGetJobInstanceCount(jobName, securityService.getLoggedUser());
}
if (jobInstanceCount > 0) {
@@ -229,11 +238,11 @@ public class JobOperatorImpl implements JobOperator {
}
final List<Long> instanceIds;
- if (SECURITY_SERVICE.isAuthorized(JBATCH_ADMIN)) {
+ if (securityService.isAuthorized(JBATCH_ADMIN)) {
// Do an unfiltered query
- instanceIds = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceIds(jobName, start, count);
+ instanceIds = persistenceManagerService.jobOperatorGetJobInstanceIds(jobName, start, count);
} else {
- instanceIds = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceIds(jobName, SECURITY_SERVICE.getLoggedUser(), start, count);
+ instanceIds = persistenceManagerService.jobOperatorGetJobInstanceIds(jobName, securityService.getLoggedUser(), start, count);
}
// get the jobinstance ids associated with this job name
@@ -242,9 +251,9 @@ public class JobOperatorImpl implements JobOperator {
// for every job instance id
for (long id : instanceIds) {
// get the job instance obj, add it to the list
- final JobStatus jobStatus = STATUS_MANAGER_SERVICE.getJobStatus(id);
+ final JobStatus jobStatus = statusManagerService.getJobStatus(id);
final JobInstance jobInstance = jobStatus.getJobInstance();
- if (SECURITY_SERVICE.isAuthorized(jobInstance.getInstanceId())) {
+ if (securityService.isAuthorized(jobInstance.getInstanceId())) {
jobInstances.add(jobInstance);
}
}
@@ -262,10 +271,10 @@ public class JobOperatorImpl implements JobOperator {
@Override
public Set<String> getJobNames() throws JobSecurityException {
final Set<String> jobNames = new HashSet<String>();
- final Map<Long, String> data = PERSISTENCE_SERVICE.jobOperatorGetExternalJobInstanceData();
+ final Map<Long, String> data = persistenceManagerService.jobOperatorGetExternalJobInstanceData();
for (final Map.Entry<Long, String> entry : data.entrySet()) {
long instanceId = entry.getKey();
- if (SECURITY_SERVICE.isAuthorized(instanceId)) {
+ if (securityService.isAuthorized(instanceId)) {
jobNames.add(entry.getValue());
}
}
@@ -274,11 +283,11 @@ public class JobOperatorImpl implements JobOperator {
@Override
public Properties getParameters(final long executionId) throws NoSuchJobExecutionException, JobSecurityException {
- final JobInstance requestedJobInstance = KERNEL_SERVICE.getJobInstance(executionId);
- if (!SECURITY_SERVICE.isAuthorized(requestedJobInstance.getInstanceId())) {
+ final JobInstance requestedJobInstance = kernelService.getJobInstance(executionId);
+ if (!securityService.isAuthorized(requestedJobInstance.getInstanceId())) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
- return PERSISTENCE_SERVICE.getParameters(executionId);
+ return persistenceManagerService.getParameters(executionId);
}
@@ -287,7 +296,7 @@ public class JobOperatorImpl implements JobOperator {
final List<Long> jobExecutions = new ArrayList<Long>();
// get the jobexecution ids associated with this job name
- final Set<Long> executionIds = PERSISTENCE_SERVICE.jobOperatorGetRunningExecutions(jobName);
+ final Set<Long> executionIds = persistenceManagerService.jobOperatorGetRunningExecutions(jobName);
if (executionIds.size() <= 0) {
throw new NoSuchJobException("Job Name " + jobName + " not found");
@@ -296,9 +305,9 @@ public class JobOperatorImpl implements JobOperator {
// for every job instance id
for (final long id : executionIds) {
try {
- if (SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(id))) {
- if (KERNEL_SERVICE.isExecutionRunning(id)) {
- final InternalJobExecution jobEx = KERNEL_SERVICE.getJobExecution(id);
+ if (securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(id))) {
+ if (kernelService.isExecutionRunning(id)) {
+ final InternalJobExecution jobEx = kernelService.getJobExecution(id);
jobExecutions.add(jobEx.getExecutionId());
}
}
@@ -313,12 +322,12 @@ public class JobOperatorImpl implements JobOperator {
public List<StepExecution> getStepExecutions(long executionId)
throws NoSuchJobExecutionException, JobSecurityException {
- final InternalJobExecution jobEx = KERNEL_SERVICE.getJobExecution(executionId);
+ final InternalJobExecution jobEx = kernelService.getJobExecution(executionId);
if (jobEx == null) {
throw new NoSuchJobExecutionException("Job Execution: " + executionId + " not found");
}
- if (SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
- return PERSISTENCE_SERVICE.getStepExecutionsForJobExecution(executionId);
+ if (securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
+ return persistenceManagerService.getStepExecutionsForJobExecution(executionId);
}
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
@@ -345,7 +354,7 @@ public class JobOperatorImpl implements JobOperator {
}
private long restartInternal(final long oldExecutionId, final Properties restartParameters) throws JobExecutionAlreadyCompleteException, NoSuchJobExecutionException, JobExecutionNotMostRecentException, JobRestartException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(oldExecutionId))) {
+ if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(oldExecutionId))) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
@@ -360,15 +369,15 @@ public class JobOperatorImpl implements JobOperator {
jobParameterWriter.write("Job parameters on restart = null");
}
- return KERNEL_SERVICE.restartJob(oldExecutionId, restartParameters).getExecutionId();
+ return kernelService.restartJob(oldExecutionId, restartParameters).getExecutionId();
}
@Override
public void stop(final long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+ if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
- KERNEL_SERVICE.stopJob(executionId);
+ kernelService.stopJob(executionId);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
index 044df60..63636eb 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
@@ -73,13 +73,15 @@ public abstract class BaseStepController implements ExecutionElementController {
protected long rootJobExecutionId;
- protected static final BatchKernelService BATCH_KERNEL = ServicesManager.service(BatchKernelService.class);
- private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
- private static final JobStatusManagerService JOB_STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
+ protected final BatchKernelService kernelService;
+ private final PersistenceManagerService persistenceManagerService;
+ private final JobStatusManagerService statusManagerService;
protected TransactionManagerAdapter transactionManager = null;
+ private TransactionManagementService txService;
- protected BaseStepController(final RuntimeJobExecution jobExecution, final Step step, final StepContextImpl stepContext, final long rootJobExecutionId) {
+ protected BaseStepController(final RuntimeJobExecution jobExecution, final Step step, final StepContextImpl stepContext, final long rootJobExecutionId,
+ final ServicesManager servicesManager) {
this.jobExecutionImpl = jobExecution;
this.jobInstance = jobExecution.getJobInstance();
this.stepContext = stepContext;
@@ -88,13 +90,19 @@ public abstract class BaseStepController implements ExecutionElementController {
throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
}
this.step = step;
+
+ this.txService = servicesManager.service(TransactionManagementService.class);
+ this.kernelService = servicesManager.service(BatchKernelService.class);
+ this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+ this.statusManagerService = servicesManager.service(JobStatusManagerService.class);
}
protected BaseStepController(final RuntimeJobExecution jobExecution,
final Step step, final StepContextImpl stepContext,
final long rootJobExecutionId,
- final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- this(jobExecution, step, stepContext, rootJobExecutionId);
+ final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+ final ServicesManager servicesManager) {
+ this(jobExecution, step, stepContext, rootJobExecutionId, servicesManager);
this.analyzerStatusQueue = analyzerStatusQueue;
}
@@ -239,7 +247,7 @@ public abstract class BaseStepController implements ExecutionElementController {
Timestamp startTS = new Timestamp(time);
stepContext.setStartTime(startTS);
- PERSISTENCE_MANAGER_SERVICE.updateStepExecution(rootJobExecutionId, stepContext);
+ persistenceManagerService.updateStepExecution(rootJobExecutionId, stepContext);
}
@@ -262,17 +270,17 @@ public abstract class BaseStepController implements ExecutionElementController {
protected void updateBatchStatus(final BatchStatus updatedBatchStatus) {
stepStatus.setBatchStatus(updatedBatchStatus);
- JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+ statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
stepContext.setBatchStatus(updatedBatchStatus);
}
protected boolean shouldStepBeExecuted() {
- this.stepStatus = JOB_STATUS_MANAGER_SERVICE.getStepStatus(jobInstance.getInstanceId(), step.getId());
+ this.stepStatus = statusManagerService.getStepStatus(jobInstance.getInstanceId(), step.getId());
if (stepStatus == null) {
// create new step execution
final StepExecutionImpl stepExecution = getNewStepExecution(rootJobExecutionId, stepContext);
// create new step status for this run
- stepStatus = JOB_STATUS_MANAGER_SERVICE.createStepStatus(stepExecution.getStepExecutionId());
+ stepStatus = statusManagerService.createStepStatus(stepExecution.getStepExecutionId());
stepContext.setStepExecutionId(stepExecution.getStepExecutionId());
return true;
} else {
@@ -332,8 +340,8 @@ public abstract class BaseStepController implements ExecutionElementController {
protected void statusStarting() {
stepStatus.setBatchStatus(BatchStatus.STARTING);
- JOB_STATUS_MANAGER_SERVICE.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
- JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+ statusManagerService.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
+ statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
stepContext.setBatchStatus(BatchStatus.STARTING);
}
@@ -350,23 +358,23 @@ public abstract class BaseStepController implements ExecutionElementController {
}
stepStatus.setPersistentUserData(new PersistentDataWrapper(persistentBAOS.toByteArray()));
- JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+ statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
}
protected void persistExitStatusAndEndTimestamp() {
stepStatus.setExitStatus(stepContext.getExitStatus());
- JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+ statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
// set the end time metric before flushing
long time = System.currentTimeMillis();
Timestamp endTS = new Timestamp(time);
stepContext.setEndTime(endTS);
- PERSISTENCE_MANAGER_SERVICE.updateStepExecution(rootJobExecutionId, stepContext);
+ persistenceManagerService.updateStepExecution(rootJobExecutionId, stepContext);
}
private StepExecutionImpl getNewStepExecution(long rootJobExecutionId, StepContextImpl stepContext) {
- return PERSISTENCE_MANAGER_SERVICE.createStepExecution(rootJobExecutionId, stepContext);
+ return persistenceManagerService.createStepExecution(rootJobExecutionId, stepContext);
}
private void setContextProperties() {
@@ -389,7 +397,7 @@ public abstract class BaseStepController implements ExecutionElementController {
stepContext.addMetric(MetricImpl.MetricType.COMMIT_COUNT, 0);
stepContext.addMetric(MetricImpl.MetricType.ROLLBACK_COUNT, 0);
- transactionManager = ServicesManager.service(TransactionManagementService.class).getTransactionManager(stepContext);
+ transactionManager = txService.getTransactionManager(stepContext);
}
public void setStepContext(final StepContextImpl stepContext) {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
index 498b596..bbd6c93 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
@@ -23,12 +23,13 @@ import org.apache.batchee.container.jsl.ExecutionElement;
import org.apache.batchee.container.proxy.DeciderProxy;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;
-import org.apache.batchee.spi.PersistenceManagerService;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.jaxb.Decision;
import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.spi.BatchArtifactFactory;
+import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.runtime.StepExecution;
import java.util.List;
@@ -41,11 +42,13 @@ public class DecisionController implements ExecutionElementController {
private StepExecution[] previousStepExecutions = null;
private final PersistenceManagerService persistenceService;
+ private final BatchArtifactFactory factory;
- public DecisionController(final RuntimeJobExecution jobExecution, final Decision decision) {
+ public DecisionController(final RuntimeJobExecution jobExecution, final Decision decision, final ServicesManager manager) {
this.jobExecution = jobExecution;
this.decision = decision;
- this.persistenceService = ServicesManager.service(PersistenceManagerService.class);
+ this.persistenceService = manager.service(PersistenceManagerService.class);
+ this.factory = manager.service(BatchArtifactFactory.class);
}
@Override
@@ -61,7 +64,7 @@ public class DecisionController implements ExecutionElementController {
//so two of these contexts will always be null
final InjectionReferences injectionRef = new InjectionReferences(jobExecution.getJobContext(), null, propList);
- final DeciderProxy deciderProxy = ProxyFactory.createDeciderProxy(deciderId, injectionRef, jobExecution);
+ final DeciderProxy deciderProxy = ProxyFactory.createDeciderProxy(factory, deciderId, injectionRef, jobExecution);
final String exitStatus = deciderProxy.decide(this.previousStepExecutions);
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
index 631ea3a..1492875 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
@@ -20,6 +20,8 @@ import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.controller.batchlet.BatchletStepController;
import org.apache.batchee.container.impl.controller.chunk.ChunkStepController;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.jaxb.Batchlet;
import org.apache.batchee.jaxb.Chunk;
@@ -34,17 +36,18 @@ import java.util.concurrent.BlockingQueue;
public class ExecutionElementControllerFactory {
public static BaseStepController getStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
final StepContextImpl stepContext, final long rootJobExecutionId,
- final BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+ final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+ final ServicesManager servicesManager) {
final Partition partition = step.getPartition();
if (partition != null) {
if (partition.getMapper() != null) {
- return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+ return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
}
if (partition.getPlan() != null) {
if (partition.getPlan().getPartitions() != null) {
- return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+ return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
}
}
}
@@ -54,26 +57,26 @@ public class ExecutionElementControllerFactory {
if (step.getChunk() != null) {
throw new IllegalArgumentException("Step contains both a batchlet and a chunk. Aborting.");
}
- return new BatchletStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+ return new BatchletStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue, servicesManager);
} else {
final Chunk chunk = step.getChunk();
if (chunk == null) {
throw new IllegalArgumentException("Step does not contain either a batchlet or a chunk. Aborting.");
}
- return new ChunkStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+ return new ChunkStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue, servicesManager);
}
}
- public static DecisionController getDecisionController(RuntimeJobExecution jobExecutionImpl, Decision decision) {
- return new DecisionController(jobExecutionImpl, decision);
+ public static DecisionController getDecisionController(final ServicesManager servicesManager, final RuntimeJobExecution jobExecutionImpl, final Decision decision) {
+ return new DecisionController(jobExecutionImpl, decision, servicesManager);
}
- public static FlowController getFlowController(RuntimeJobExecution jobExecutionImpl, Flow flow, long rootJobExecutionId) {
- return new FlowController(jobExecutionImpl, flow, rootJobExecutionId);
+ public static FlowController getFlowController(final ServicesManager servicesManager, final RuntimeJobExecution jobExecutionImpl, final Flow flow, final long rootJobExecutionId) {
+ return new FlowController(jobExecutionImpl, flow, rootJobExecutionId, servicesManager);
}
- public static SplitController getSplitController(RuntimeJobExecution jobExecutionImpl, Split split, long rootJobExecutionId) {
- return new SplitController(jobExecutionImpl, split, rootJobExecutionId);
+ public static SplitController getSplitController(final BatchKernelService kernel, final RuntimeJobExecution jobExecutionImpl, final Split split, final long rootJobExecutionId) {
+ return new SplitController(jobExecutionImpl, split, rootJobExecutionId, kernel);
}
private ExecutionElementControllerFactory() {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
index 15651c5..faf7669 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
@@ -27,6 +27,8 @@ import org.apache.batchee.container.jsl.IllegalTransitionException;
import org.apache.batchee.container.jsl.Transition;
import org.apache.batchee.container.jsl.TransitionElement;
import org.apache.batchee.container.navigator.ModelNavigator;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.container.util.PartitionDataWrapper;
@@ -44,6 +46,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
public class ExecutionTransitioner {
+ private final ServicesManager manager;
private RuntimeJobExecution jobExecution;
private long rootJobExecutionId;
private ModelNavigator<?> modelNavigator;
@@ -60,19 +63,24 @@ public class ExecutionTransitioner {
private List<Long> stepExecIds;
- public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<?> modelNavigator) {
+ public ExecutionTransitioner(final RuntimeJobExecution jobExecution, final long rootJobExecutionId, final ModelNavigator<?> modelNavigator,
+ final ServicesManager servicesManager) {
this.jobExecution = jobExecution;
this.rootJobExecutionId = rootJobExecutionId;
this.modelNavigator = modelNavigator;
this.jobContext = jobExecution.getJobContext();
+ this.manager = servicesManager;
}
- public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<JSLJob> jobNavigator, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+ public ExecutionTransitioner(final RuntimeJobExecution jobExecution, final long rootJobExecutionId,
+ final ModelNavigator<JSLJob> jobNavigator, final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+ final ServicesManager manager) {
this.jobExecution = jobExecution;
this.rootJobExecutionId = rootJobExecutionId;
this.modelNavigator = jobNavigator;
this.jobContext = jobExecution.getJobContext();
this.analyzerQueue = analyzerQueue;
+ this.manager = manager;
}
public ExecutionStatus doExecutionLoop() {
@@ -163,19 +171,19 @@ public class ExecutionTransitioner {
if (currentExecutionElement instanceof Decision) {
final Decision decision = (Decision) currentExecutionElement;
- elementController = ExecutionElementControllerFactory.getDecisionController(jobExecution, decision);
+ elementController = ExecutionElementControllerFactory.getDecisionController(manager, jobExecution, decision);
final DecisionController decisionController = (DecisionController) elementController;
decisionController.setPreviousStepExecutions(previousExecutionElement, previousElementController);
} else if (currentExecutionElement instanceof Flow) {
final Flow flow = (Flow) currentExecutionElement;
- elementController = ExecutionElementControllerFactory.getFlowController(jobExecution, flow, rootJobExecutionId);
+ elementController = ExecutionElementControllerFactory.getFlowController(manager, jobExecution, flow, rootJobExecutionId);
} else if (currentExecutionElement instanceof Split) {
final Split split = (Split) currentExecutionElement;
- elementController = ExecutionElementControllerFactory.getSplitController(jobExecution, split, rootJobExecutionId);
+ elementController = ExecutionElementControllerFactory.getSplitController(manager.service(BatchKernelService.class), jobExecution, split, rootJobExecutionId);
} else if (currentExecutionElement instanceof Step) {
final Step step = (Step) currentExecutionElement;
final StepContextImpl stepContext = new StepContextImpl(step.getId());
- elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue);
+ elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue, manager);
} else {
elementController = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
index db0d4c6..a4b6fda 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
@@ -22,6 +22,7 @@ import org.apache.batchee.container.impl.JobContextImpl;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.navigator.ModelNavigator;
import org.apache.batchee.container.navigator.NavigatorFactory;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.jaxb.Flow;
@@ -32,6 +33,7 @@ import java.util.List;
public class FlowController implements ExecutionElementController {
private final RuntimeJobExecution jobExecution;
private final JobContextImpl jobContext;
+ private final ServicesManager manager;
protected ModelNavigator<Flow> flowNavigator;
@@ -40,18 +42,19 @@ public class FlowController implements ExecutionElementController {
private ExecutionTransitioner transitioner;
- public FlowController(final RuntimeJobExecution jobExecution, final Flow flow, final long rootJobExecutionId) {
+ public FlowController(final RuntimeJobExecution jobExecution, final Flow flow, final long rootJobExecutionId, final ServicesManager manager) {
this.jobExecution = jobExecution;
this.jobContext = jobExecution.getJobContext();
this.flowNavigator = NavigatorFactory.createFlowNavigator(flow);
this.flow = flow;
this.rootJobExecutionId = rootJobExecutionId;
+ this.manager = manager;
}
@Override
public ExecutionStatus execute() {
if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
- transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator);
+ transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator, manager);
return transitioner.doExecutionLoop();
}
return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
index 93f6160..ea6f249 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
@@ -17,6 +17,7 @@
package org.apache.batchee.container.impl.controller;
import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.container.util.FlowInSplitBuilderConfig;
@@ -25,8 +26,8 @@ public class FlowInSplitThreadRootController extends JobThreadRootController {
// Careful, we have a separately named reference to the same object in the parent class
private RuntimeFlowInSplitExecution flowInSplitExecution;
- public FlowInSplitThreadRootController(final RuntimeFlowInSplitExecution flowInSplitExecution, final FlowInSplitBuilderConfig config) {
- super(flowInSplitExecution, config.getRootJobExecutionId());
+ public FlowInSplitThreadRootController(final RuntimeFlowInSplitExecution flowInSplitExecution, final FlowInSplitBuilderConfig config, final ServicesManager manager) {
+ super(flowInSplitExecution, config.getRootJobExecutionId(), manager);
this.flowInSplitExecution = flowInSplitExecution;
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
index 3446ba1..d671eb5 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
@@ -17,13 +17,14 @@
package org.apache.batchee.container.impl.controller;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.ServicesManager;
public class JobController extends JobThreadRootController {
- private JobController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId) {
- super(jobExecution, rootJobExecutionId);
+ private JobController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId, final ServicesManager manager) {
+ super(jobExecution, rootJobExecutionId, manager);
}
- public JobController(final RuntimeJobExecution jobExecution) {
- this(jobExecution, jobExecution.getExecutionId());
+ public JobController(final RuntimeJobExecution jobExecution, final ServicesManager manager) {
+ this(jobExecution, jobExecution.getExecutionId(), manager);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
index 1dd8c3b..feb3f1c 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
@@ -25,12 +25,13 @@ import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.JobListenerProxy;
import org.apache.batchee.container.proxy.ListenerFactory;
import org.apache.batchee.container.services.JobStatusManagerService;
-import org.apache.batchee.spi.PersistenceManagerService;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.spi.BatchArtifactFactory;
+import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.runtime.BatchStatus;
import java.io.PrintWriter;
@@ -52,22 +53,25 @@ public abstract class JobThreadRootController implements ThreadRootController {
protected final ModelNavigator<JSLJob> jobNavigator;
protected final JobStatusManagerService jobStatusService;
protected final PersistenceManagerService persistenceService;
+ protected final ServicesManager manager;
private ExecutionTransitioner transitioner;
private BlockingQueue<PartitionDataWrapper> analyzerQueue;
- public JobThreadRootController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId) {
+ public JobThreadRootController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId,
+ final ServicesManager servicesManager) {
this.jobExecution = jobExecution;
this.jobContext = jobExecution.getJobContext();
this.rootJobExecutionId = rootJobExecutionId;
this.jobInstanceId = jobExecution.getInstanceId();
- this.jobStatusService = ServicesManager.service(JobStatusManagerService.class);
- this.persistenceService = ServicesManager.service(PersistenceManagerService.class);
+ this.jobStatusService = servicesManager.service(JobStatusManagerService.class);
+ this.persistenceService = servicesManager.service(PersistenceManagerService.class);
this.jobNavigator = jobExecution.getJobNavigator();
+ this.manager = servicesManager;
final JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
final InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
- listenerFactory = new ListenerFactory(jobModel, injectionRef, jobExecution);
+ listenerFactory = new ListenerFactory(servicesManager.service(BatchArtifactFactory.class), jobModel, injectionRef, jobExecution);
jobExecution.setListenerFactory(listenerFactory);
}
@@ -75,8 +79,9 @@ public abstract class JobThreadRootController implements ThreadRootController {
* By not passing the rootJobExecutionId, we are "orphaning" the subjob execution and making it not findable from the parent.
* This is exactly what we want for getStepExecutions()... we don't want it to get extraneous entries for the partitions.
*/
- public JobThreadRootController(final RuntimeJobExecution jobExecution, final BlockingQueue<PartitionDataWrapper> analyzerQueue) {
- this(jobExecution, jobExecution.getExecutionId());
+ public JobThreadRootController(final RuntimeJobExecution jobExecution, final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+ final ServicesManager servicesManager) {
+ this(jobExecution, jobExecution.getExecutionId(), servicesManager);
this.analyzerQueue = analyzerQueue;
}
@@ -96,7 +101,7 @@ public abstract class JobThreadRootController implements ThreadRootController {
// The BIG loop transitioning
// within the job !!!
// --------------------
- transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue);
+ transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue, manager);
retVal = transitioner.doExecutionLoop();
ExtendedBatchStatus extBatchStatus = retVal.getExtendedBatchStatus();
switch (extBatchStatus) {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
index 920ca99..8cd3593 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
@@ -17,14 +17,15 @@
package org.apache.batchee.container.impl.controller;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionsBuilderConfig;
/**
* Currently there's no special function on top of the subjob required of the partition.
*/
public class PartitionThreadRootController extends JobThreadRootController {
- public PartitionThreadRootController(RuntimeJobExecution jobExecution, PartitionsBuilderConfig config) {
- super(jobExecution, config.getAnalyzerQueue());
+ public PartitionThreadRootController(final RuntimeJobExecution jobExecution, final PartitionsBuilderConfig config, final ServicesManager servicesManager) {
+ super(jobExecution, config.getAnalyzerQueue(), servicesManager);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index 9797233..01444ef 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -26,6 +26,7 @@ import org.apache.batchee.container.proxy.PartitionMapperProxy;
import org.apache.batchee.container.proxy.PartitionReducerProxy;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.BatchPartitionPlan;
import org.apache.batchee.container.util.BatchPartitionWorkUnit;
import org.apache.batchee.container.util.BatchWorkUnit;
@@ -39,6 +40,7 @@ import org.apache.batchee.jaxb.PartitionMapper;
import org.apache.batchee.jaxb.PartitionReducer;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
import javax.batch.api.partition.PartitionPlan;
import javax.batch.api.partition.PartitionReducer.PartitionStatus;
@@ -80,8 +82,12 @@ public class PartitionedStepController extends BaseStepController {
BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
- protected PartitionedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, StepContextImpl stepContext, long rootJobExecutionId) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+ private final BatchArtifactFactory factory;
+
+ protected PartitionedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
+ final long rootJobExecutionId, final ServicesManager servicesManager) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
+ factory = servicesManager.service(BatchArtifactFactory.class);
}
@Override
@@ -96,7 +102,7 @@ public class PartitionedStepController extends BaseStepController {
if (parallelBatchWorkUnits != null) {
for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
try {
- BATCH_KERNEL.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+ kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
} catch (Exception e) {
// TODO - Is this what we want to know.
// Blow up if it happens to force the issue.
@@ -128,7 +134,7 @@ public class PartitionedStepController extends BaseStepController {
// Set all the contexts associated with this controller.
// Some of them may be null
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propertyList);
- final PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ final PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(factory, partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);
final PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
@@ -280,9 +286,9 @@ public class PartitionedStepController extends BaseStepController {
PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
// Then build all the subjobs but do not start them yet
if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
- parallelBatchWorkUnits = BATCH_KERNEL.buildOnRestartParallelPartitions(config);
+ parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config);
} else {
- parallelBatchWorkUnits = BATCH_KERNEL.buildNewParallelPartitions(config);
+ parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config);
}
// NOTE: At this point I might not have as many work units as I had partitions, since some may have already completed.
@@ -304,9 +310,9 @@ public class PartitionedStepController extends BaseStepController {
for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);
if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
- BATCH_KERNEL.restartGeneratedJob(workUnit);
+ kernelService.restartGeneratedJob(workUnit);
} else {
- BATCH_KERNEL.startGeneratedJob(workUnit);
+ kernelService.startGeneratedJob(workUnit);
}
}
@@ -336,9 +342,9 @@ public class PartitionedStepController extends BaseStepController {
if (numCurrentCompleted < numTotalForThisExecution) {
if (numCurrentSubmitted < numTotalForThisExecution) {
if (stepStatus.getStartCount() > 1) {
- BATCH_KERNEL.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+ kernelService.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
} else {
- BATCH_KERNEL.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+ kernelService.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
}
}
} else {
@@ -388,14 +394,14 @@ public class PartitionedStepController extends BaseStepController {
if (analyzer != null) {
final List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties().getPropertyList();
injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(factory, analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
}
final PartitionReducer partitionReducer = step.getPartition().getReducer();
if (partitionReducer != null) {
final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties().getPropertyList();
injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(factory, partitionReducer.getRef(), injectionRef, stepContext, jobExecutionImpl);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
index d085622..94738c0 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
@@ -23,11 +23,13 @@ import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.PartitionCollectorProxy;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.container.util.PartitionDataWrapper.PartitionEventType;
import org.apache.batchee.jaxb.Collector;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
import java.io.Serializable;
import java.util.List;
@@ -41,11 +43,17 @@ import java.util.concurrent.BlockingQueue;
* separate main thread with controller).
*/
public abstract class SingleThreadedStepController extends BaseStepController implements Controller {
+ private final BatchArtifactFactory factory;
+
// Collector only used from partition threads, not main thread
protected PartitionCollectorProxy collectorProxy = null;
- protected SingleThreadedStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ protected SingleThreadedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
+ final StepContextImpl stepContext, final long rootJobExecutionId,
+ final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+ final ServicesManager servicesManager) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, servicesManager);
+ factory = servicesManager.service(BatchArtifactFactory.class);
}
List<StepListenerProxy> stepListeners = null;
@@ -67,7 +75,7 @@ public abstract class SingleThreadedStepController extends BaseStepController im
* contexts may be null
*/
injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(collector.getRef(), injectionRef, this.stepContext, jobExecutionImpl);
+ this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(factory, collector.getRef(), injectionRef, this.stepContext, jobExecutionImpl);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
index b882de6..43c26bb 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
@@ -60,13 +60,13 @@ public class SplitController implements ExecutionElementController {
protected Split split;
- public SplitController(RuntimeJobExecution jobExecution, Split split, long rootJobExecutionId) {
+ public SplitController(final RuntimeJobExecution jobExecution, final Split split, final long rootJobExecutionId,
+ final BatchKernelService kernelService) {
this.jobExecution = jobExecution;
this.jobContext = jobExecution.getJobContext();
this.rootJobExecutionId = rootJobExecutionId;
this.split = split;
-
- batchKernel = ServicesManager.service(BatchKernelService.class);
+ this.batchKernel = kernelService;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
index a0f0cbb..13b60df 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
@@ -23,29 +23,34 @@ import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.proxy.BatchletProxy;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.jaxb.Batchlet;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
import javax.batch.runtime.BatchStatus;
import java.util.List;
import java.util.concurrent.BlockingQueue;
public class BatchletStepController extends SingleThreadedStepController {
+ private final BatchArtifactFactory factory;
private BatchletProxy batchletProxy;
public BatchletStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
final StepContextImpl stepContext, final long rootJobExecutionId,
- final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+ final ServicesManager manager) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, manager);
+ factory = manager.service(BatchArtifactFactory.class);
}
private void invokeBatchlet(final Batchlet batchlet) throws BatchContainerServiceException {
final String batchletId = batchlet.getRef();
final List<Property> propList = (batchlet.getProperties() == null) ? null : batchlet.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- batchletProxy = ProxyFactory.createBatchletProxy(batchletId, injectionRef, stepContext, jobExecutionImpl);
+ batchletProxy = ProxyFactory.createBatchletProxy(factory, batchletId, injectionRef, stepContext, jobExecutionImpl);
if (!wasStopIssued()) {
final String processRetVal = batchletProxy.process();
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
index 34f2fae..781f41f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
@@ -24,14 +24,15 @@ import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.jaxb.Chunk;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
public final class CheckpointAlgorithmFactory {
- public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final Step step, final InjectionReferences injectionReferences, final StepContextImpl stepContext, final RuntimeJobExecution jobExecution) {
+ public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final BatchArtifactFactory factory, final Step step, final InjectionReferences injectionReferences, final StepContextImpl stepContext, final RuntimeJobExecution jobExecution) {
final Chunk chunk = step.getChunk();
final String checkpointType = chunk.getCheckpointPolicy();
final CheckpointAlgorithmProxy proxy;
if ("custom".equalsIgnoreCase(checkpointType)) {
- proxy = ProxyFactory.createCheckpointAlgorithmProxy(chunk.getCheckpointAlgorithm().getRef(), injectionReferences, stepContext, jobExecution);
+ proxy = ProxyFactory.createCheckpointAlgorithmProxy(factory, chunk.getCheckpointAlgorithm().getRef(), injectionReferences, stepContext, jobExecution);
} else /* "item" */ {
proxy = new CheckpointAlgorithmProxy(new ItemCheckpointAlgorithm());
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
index 76cae58..f7f01bf 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -38,14 +38,15 @@ public class CheckpointManager {
public CheckpointManager(final ItemReaderProxy reader, final ItemWriterProxy writer,
final CheckpointAlgorithm chkptAlg,
- final long jobInstanceID, final String stepId) {
+ final long jobInstanceID, final String stepId,
+ final PersistenceManagerService persistenceManagerService) {
this.readerProxy = reader;
this.writerProxy = writer;
this.checkpointAlgorithm = chkptAlg;
this.stepId = stepId;
this.jobInstanceID = jobInstanceID;
- this.persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+ this.persistenceManagerService = persistenceManagerService;
}
public boolean applyCheckPointPolicy() {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index 7c8eec7..61cadfe 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -47,6 +47,7 @@ import org.apache.batchee.jaxb.ItemReader;
import org.apache.batchee.jaxb.ItemWriter;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.api.chunk.CheckpointAlgorithm;
@@ -64,7 +65,8 @@ public class ChunkStepController extends SingleThreadedStepController {
private final static String sourceClass = ChunkStepController.class.getName();
private final static Logger logger = Logger.getLogger(sourceClass);
- private final PersistenceManagerService persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+ private final PersistenceManagerService persistenceManagerService;
+ private final BatchArtifactFactory artifactFactory;
private Chunk chunk = null;
private ItemReaderProxy readerProxy = null;
@@ -84,8 +86,11 @@ public class ChunkStepController extends SingleThreadedStepController {
private boolean rollbackRetry = false;
public ChunkStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
- final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+ final ServicesManager servicesManager) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, servicesManager);
+ this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+ this.artifactFactory = servicesManager.service(BatchArtifactFactory.class);
}
/**
@@ -472,7 +477,6 @@ public class ChunkStepController extends SingleThreadedStepController {
int timeInterval = ChunkHelper.getTimeLimit(chunk);
boolean checkPointed = true;
boolean rollback = false;
- Throwable caughtThrowable = null;
// begin new transaction at first iteration or after a checkpoint commit
@@ -498,7 +502,7 @@ public class ChunkStepController extends SingleThreadedStepController {
positionWriterAtCheckpoint();
checkpointManager = new CheckpointManager(readerProxy, writerProxy,
getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl
- .getJobInstance().getInstanceId(), step.getId());
+ .getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
}
}
@@ -581,7 +585,6 @@ public class ChunkStepController extends SingleThreadedStepController {
}
} catch (final Exception e) {
- caughtThrowable = e;
logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
// Only try to call onError() if we have an Exception, but not an Error.
for (ChunkListenerProxy chunkProxy : chunkListeners) {
@@ -591,20 +594,20 @@ public class ChunkStepController extends SingleThreadedStepController {
logger.log(Level.SEVERE, e1.getMessage(), e1);
}
}
+ rollback(e);
} catch (final Throwable t) {
- caughtThrowable = t;
- logger.log(Level.SEVERE, t.getMessage(), t);
- } finally {
- if (caughtThrowable != null) {
- transactionManager.setRollbackOnly();
- readerProxy.close();
- writerProxy.close();
- transactionManager.rollback();
- throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", caughtThrowable);
- }
+ rollback(t);
}
}
+ private void rollback(final Throwable t) {
+ transactionManager.setRollbackOnly();
+ readerProxy.close();
+ writerProxy.close();
+ transactionManager.rollback();
+ throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
+ }
+
protected void invokeCoreStep() throws BatchContainerServiceException {
this.chunk = step.getChunk();
@@ -637,7 +640,7 @@ public class ChunkStepController extends SingleThreadedStepController {
final ItemReader itemReader = chunk.getReader();
final List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemReaderProps);
- readerProxy = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ readerProxy = ProxyFactory.createItemReaderProxy(artifactFactory, itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
}
{
@@ -645,7 +648,7 @@ public class ChunkStepController extends SingleThreadedStepController {
if (itemProcessor != null) {
final List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemProcessorProps);
- processorProxy = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ processorProxy = ProxyFactory.createItemProcessorProxy(artifactFactory, itemProcessor.getRef(), injectionRef, stepContext, jobExecutionImpl);
}
}
@@ -653,7 +656,7 @@ public class ChunkStepController extends SingleThreadedStepController {
final ItemWriter itemWriter = chunk.getWriter();
final List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemWriterProps);
- writerProxy = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ writerProxy = ProxyFactory.createItemWriterProxy(artifactFactory, itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
}
{
@@ -665,7 +668,7 @@ public class ChunkStepController extends SingleThreadedStepController {
}
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(step, injectionRef, stepContext, jobExecutionImpl);
+ checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(artifactFactory, step, injectionRef, stepContext, jobExecutionImpl);
}
{
@@ -689,7 +692,7 @@ public class ChunkStepController extends SingleThreadedStepController {
chkptAlg = checkpointProxy;
}
- checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
+ checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
skipHandler = new SkipHandler(chunk);
skipHandler.addSkipProcessListener(skipProcessListeners);