You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2014/01/02 10:27:55 UTC

[2/2] git commit: BATCHEE-9 ServicesManager shouldn't be contextual once an action started

BATCHEE-9 ServicesManager shouldn't be contextual once an action started


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/8ddf9f75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/8ddf9f75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/8ddf9f75

Branch: refs/heads/master
Commit: 8ddf9f754b731efe18ec48aedcc3cb0aa98814d3
Parents: b303e37
Author: Romain Manni-Bucau <rm...@apache.org>
Authored: Thu Jan 2 10:27:34 2014 +0100
Committer: Romain Manni-Bucau <rm...@apache.org>
Committed: Thu Jan 2 10:27:34 2014 +0100

----------------------------------------------------------------------
 .../container/impl/JobExecutionImpl.java        | 18 ++--
 .../batchee/container/impl/JobOperatorImpl.java | 93 +++++++++++---------
 .../impl/controller/BaseStepController.java     | 42 +++++----
 .../impl/controller/DecisionController.java     | 11 ++-
 .../ExecutionElementControllerFactory.java      | 25 +++---
 .../impl/controller/ExecutionTransitioner.java  | 20 +++--
 .../impl/controller/FlowController.java         |  7 +-
 .../FlowInSplitThreadRootController.java        |  5 +-
 .../impl/controller/JobController.java          |  9 +-
 .../controller/JobThreadRootController.java     | 21 +++--
 .../PartitionThreadRootController.java          |  5 +-
 .../controller/PartitionedStepController.java   | 30 ++++---
 .../SingleThreadedStepController.java           | 14 ++-
 .../impl/controller/SplitController.java        |  6 +-
 .../batchlet/BatchletStepController.java        | 11 ++-
 .../chunk/CheckpointAlgorithmFactory.java       |  5 +-
 .../controller/chunk/CheckpointManager.java     |  5 +-
 .../controller/chunk/ChunkStepController.java   | 45 +++++-----
 .../impl/jobinstance/JobExecutionHelper.java    | 92 ++++++++++---------
 .../RuntimeFlowInSplitExecution.java            |  6 +-
 .../impl/jobinstance/RuntimeJobExecution.java   |  5 +-
 .../container/proxy/ListenerFactory.java        | 54 ++++++------
 .../batchee/container/proxy/ProxyFactory.java   | 46 +++++-----
 .../container/services/ServicesManager.java     | 38 +++-----
 .../services/kernel/DefaultBatchKernel.java     | 37 ++++----
 .../services/locator/ClassLoaderLocator.java    | 15 ++--
 .../persistence/JDBCPersistenceManager.java     |  8 +-
 .../persistence/JPAPersistenceService.java      |  8 +-
 .../persistence/MemoryPersistenceManager.java   |  7 +-
 .../status/DefaultJobStatusManager.java         |  8 +-
 .../util/BatchFlowInSplitWorkUnit.java          | 11 +--
 .../container/util/BatchParallelWorkUnit.java   |  6 +-
 .../container/util/BatchPartitionWorkUnit.java  | 11 +--
 .../batchee/container/util/BatchWorkUnit.java   | 11 +--
 .../batchee/servlet/CleanUpWebappListener.java  |  2 +-
 .../org/apache/batchee/test/jmx/JMXTest.java    |  4 +-
 jbatch/src/test/resources/suites/dev-suite.xml  |  6 +-
 37 files changed, 405 insertions(+), 342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
index 0767168..68ecb5e 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
@@ -19,7 +19,6 @@ package org.apache.batchee.container.impl;
 import org.apache.batchee.container.services.InternalJobExecution;
 import org.apache.batchee.spi.PersistenceManagerService;
 import org.apache.batchee.spi.PersistenceManagerService.TimestampType;
-import org.apache.batchee.container.services.ServicesManager;
 
 import javax.batch.runtime.BatchStatus;
 import java.sql.Timestamp;
@@ -27,7 +26,7 @@ import java.util.Date;
 import java.util.Properties;
 
 public class JobExecutionImpl implements InternalJobExecution {
-    private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
+    private final PersistenceManagerService persistenceManagerService;
 
     private long executionID = 0L;
     private long instanceID = 0L;
@@ -50,7 +49,8 @@ public class JobExecutionImpl implements InternalJobExecution {
         this.jobContext = jobContext;
     }
 
-    public JobExecutionImpl(long executionId, long instanceId) {
+    public JobExecutionImpl(final long executionId, final long instanceId, final PersistenceManagerService persistenceManagerService) {
+        this.persistenceManagerService = persistenceManagerService;
         this.executionID = executionId;
         this.instanceID = instanceId;
     }
@@ -61,7 +61,7 @@ public class JobExecutionImpl implements InternalJobExecution {
             return this.jobContext.getBatchStatus();
         } else {
             // old job, retrieve from the backend
-            final String name = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionBatchStatus(executionID);
+            final String name = persistenceManagerService.jobOperatorQueryJobExecutionBatchStatus(executionID);
             if (name != null) {
                 return BatchStatus.valueOf(name);
             }
@@ -71,7 +71,7 @@ public class JobExecutionImpl implements InternalJobExecution {
 
     @Override
     public Date getCreateTime() {
-        final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.CREATE);
+        final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.CREATE);
         if (ts != null) {
             createTime = ts;
         }
@@ -84,7 +84,7 @@ public class JobExecutionImpl implements InternalJobExecution {
 
     @Override
     public Date getEndTime() {
-        final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.END);
+        final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.END);
         if (ts != null) {
             endTime = ts;
         }
@@ -106,7 +106,7 @@ public class JobExecutionImpl implements InternalJobExecution {
             return this.jobContext.getExitStatus();
         }
 
-        final String persistenceExitStatus = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionExitStatus(executionID);
+        final String persistenceExitStatus = persistenceManagerService.jobOperatorQueryJobExecutionExitStatus(executionID);
         if (persistenceExitStatus != null) {
             exitStatus = persistenceExitStatus;
         }
@@ -116,7 +116,7 @@ public class JobExecutionImpl implements InternalJobExecution {
 
     @Override
     public Date getLastUpdatedTime() {
-        final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.LAST_UPDATED);
+        final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.LAST_UPDATED);
         if (ts != null) {
             this.updateTime = ts;
         }
@@ -129,7 +129,7 @@ public class JobExecutionImpl implements InternalJobExecution {
 
     @Override
     public Date getStartTime() {
-        final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.STARTED);
+        final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.STARTED);
         if (ts != null) {
             startTime = ts;
         }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
index c970534..28fab55 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
@@ -101,15 +101,24 @@ public class JobOperatorImpl implements JobOperator {
 
     public static final String JBATCH_ADMIN = "admin";
 
-    private static final BatchKernelService KERNEL_SERVICE = ServicesManager.service(BatchKernelService.class);
-    private static final PersistenceManagerService PERSISTENCE_SERVICE = ServicesManager.service(PersistenceManagerService.class);
-    private static final JobXMLLoaderService XML_LOADER_SERVICE = ServicesManager.service(JobXMLLoaderService.class);
-    private static final JobStatusManagerService STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
-    private static final SecurityService SECURITY_SERVICE= ServicesManager.service(SecurityService.class);
+    private final BatchKernelService kernelService;
+    private final PersistenceManagerService persistenceManagerService;
+    private final JobXMLLoaderService xmlLoaderService;
+    private final JobStatusManagerService statusManagerService;
+    private final SecurityService securityService;
+
+    public JobOperatorImpl() {
+        final ServicesManager servicesManager = ServicesManager.find();
+        kernelService = servicesManager.service(BatchKernelService.class);
+        persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+        xmlLoaderService = servicesManager.service(JobXMLLoaderService.class);
+        statusManagerService = servicesManager.service(JobStatusManagerService.class);
+        securityService = servicesManager.service(SecurityService.class);
+    }
 
     @Override
     public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(Permissions.START.name)) {
+        if (!securityService.isAuthorized(Permissions.START.name)) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
 
@@ -138,18 +147,18 @@ public class JobOperatorImpl implements JobOperator {
             jobParameterWriter.write("Job parameters on start = null");
         }
 
-        final String jobXML = XML_LOADER_SERVICE.loadJSL(jobXMLName);
-        final InternalJobExecution execution = KERNEL_SERVICE.startJob(jobXML, jobParameters);
+        final String jobXML = xmlLoaderService.loadJSL(jobXMLName);
+        final InternalJobExecution execution = kernelService.startJob(jobXML, jobParameters);
         return execution.getExecutionId();
     }
 
     @Override
     public void abandon(final long executionId) throws NoSuchJobExecutionException, JobExecutionIsRunningException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+        if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
 
-        final InternalJobExecution jobEx = PERSISTENCE_SERVICE.jobOperatorGetJobExecution(executionId);
+        final InternalJobExecution jobEx = persistenceManagerService.jobOperatorGetJobExecution(executionId);
 
         // if it is not in STARTED or STARTING state, mark it as ABANDONED
         if (jobEx.getBatchStatus().equals(BatchStatus.STARTED) || jobEx.getBatchStatus().equals(BatchStatus.STARTING)) {
@@ -157,31 +166,31 @@ public class JobOperatorImpl implements JobOperator {
         }
 
         // update table to reflect ABANDONED state
-        PERSISTENCE_SERVICE.updateBatchStatusOnly(jobEx.getExecutionId(), BatchStatus.ABANDONED, new Timestamp(System.currentTimeMillis()));
+        persistenceManagerService.updateBatchStatusOnly(jobEx.getExecutionId(), BatchStatus.ABANDONED, new Timestamp(System.currentTimeMillis()));
 
         // Don't forget to update JOBSTATUS table
-        STATUS_MANAGER_SERVICE.updateJobBatchStatus(jobEx.getInstanceId(), BatchStatus.ABANDONED);
+        statusManagerService.updateJobBatchStatus(jobEx.getInstanceId(), BatchStatus.ABANDONED);
     }
 
     @Override
     public InternalJobExecution getJobExecution(final long executionId)
         throws NoSuchJobExecutionException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+        if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
-        return KERNEL_SERVICE.getJobExecution(executionId);
+        return kernelService.getJobExecution(executionId);
     }
 
     @Override
     public List<JobExecution> getJobExecutions(final JobInstance instance)
         throws NoSuchJobInstanceException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(instance.getInstanceId())) {
+        if (!securityService.isAuthorized(instance.getInstanceId())) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
 
         // Mediate between one
         final List<JobExecution> executions = new ArrayList<JobExecution>();
-        List<InternalJobExecution> executionImpls = PERSISTENCE_SERVICE.jobOperatorGetJobExecutions(instance.getInstanceId());
+        List<InternalJobExecution> executionImpls = persistenceManagerService.jobOperatorGetJobExecutions(instance.getInstanceId());
         if (executionImpls.size() == 0) {
             throw new NoSuchJobInstanceException("Job: " + instance.getJobName() + " does not exist");
         }
@@ -194,20 +203,20 @@ public class JobOperatorImpl implements JobOperator {
     @Override
     public JobInstance getJobInstance(long executionId)
         throws NoSuchJobExecutionException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+        if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
-        return KERNEL_SERVICE.getJobInstance(executionId);
+        return kernelService.getJobInstance(executionId);
     }
 
     @Override
     public int getJobInstanceCount(String jobName) throws NoSuchJobException, JobSecurityException {
         int jobInstanceCount;
-        if (SECURITY_SERVICE.isAuthorized(JBATCH_ADMIN)) {
+        if (securityService.isAuthorized(JBATCH_ADMIN)) {
             // Do an unfiltered query
-            jobInstanceCount = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceCount(jobName);
+            jobInstanceCount = persistenceManagerService.jobOperatorGetJobInstanceCount(jobName);
         } else {
-            jobInstanceCount = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceCount(jobName, SECURITY_SERVICE.getLoggedUser());
+            jobInstanceCount = persistenceManagerService.jobOperatorGetJobInstanceCount(jobName, securityService.getLoggedUser());
         }
 
         if (jobInstanceCount > 0) {
@@ -229,11 +238,11 @@ public class JobOperatorImpl implements JobOperator {
         }
 
         final List<Long> instanceIds;
-        if (SECURITY_SERVICE.isAuthorized(JBATCH_ADMIN)) {
+        if (securityService.isAuthorized(JBATCH_ADMIN)) {
             // Do an unfiltered query
-            instanceIds = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceIds(jobName, start, count);
+            instanceIds = persistenceManagerService.jobOperatorGetJobInstanceIds(jobName, start, count);
         } else {
-            instanceIds = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceIds(jobName, SECURITY_SERVICE.getLoggedUser(), start, count);
+            instanceIds = persistenceManagerService.jobOperatorGetJobInstanceIds(jobName, securityService.getLoggedUser(), start, count);
         }
 
         // get the jobinstance ids associated with this job name
@@ -242,9 +251,9 @@ public class JobOperatorImpl implements JobOperator {
             // for every job instance id
             for (long id : instanceIds) {
                 // get the job instance obj, add it to the list
-                final JobStatus jobStatus = STATUS_MANAGER_SERVICE.getJobStatus(id);
+                final JobStatus jobStatus = statusManagerService.getJobStatus(id);
                 final JobInstance jobInstance = jobStatus.getJobInstance();
-                if (SECURITY_SERVICE.isAuthorized(jobInstance.getInstanceId())) {
+                if (securityService.isAuthorized(jobInstance.getInstanceId())) {
                     jobInstances.add(jobInstance);
                 }
             }
@@ -262,10 +271,10 @@ public class JobOperatorImpl implements JobOperator {
     @Override
     public Set<String> getJobNames() throws JobSecurityException {
         final Set<String> jobNames = new HashSet<String>();
-        final Map<Long, String> data = PERSISTENCE_SERVICE.jobOperatorGetExternalJobInstanceData();
+        final Map<Long, String> data = persistenceManagerService.jobOperatorGetExternalJobInstanceData();
         for (final Map.Entry<Long, String> entry : data.entrySet()) {
             long instanceId = entry.getKey();
-            if (SECURITY_SERVICE.isAuthorized(instanceId)) {
+            if (securityService.isAuthorized(instanceId)) {
                 jobNames.add(entry.getValue());
             }
         }
@@ -274,11 +283,11 @@ public class JobOperatorImpl implements JobOperator {
 
     @Override
     public Properties getParameters(final long executionId) throws NoSuchJobExecutionException, JobSecurityException {
-        final JobInstance requestedJobInstance = KERNEL_SERVICE.getJobInstance(executionId);
-        if (!SECURITY_SERVICE.isAuthorized(requestedJobInstance.getInstanceId())) {
+        final JobInstance requestedJobInstance = kernelService.getJobInstance(executionId);
+        if (!securityService.isAuthorized(requestedJobInstance.getInstanceId())) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
-        return PERSISTENCE_SERVICE.getParameters(executionId);
+        return persistenceManagerService.getParameters(executionId);
     }
 
 
@@ -287,7 +296,7 @@ public class JobOperatorImpl implements JobOperator {
         final List<Long> jobExecutions = new ArrayList<Long>();
 
         // get the jobexecution ids associated with this job name
-        final Set<Long> executionIds = PERSISTENCE_SERVICE.jobOperatorGetRunningExecutions(jobName);
+        final Set<Long> executionIds = persistenceManagerService.jobOperatorGetRunningExecutions(jobName);
 
         if (executionIds.size() <= 0) {
             throw new NoSuchJobException("Job Name " + jobName + " not found");
@@ -296,9 +305,9 @@ public class JobOperatorImpl implements JobOperator {
         // for every job instance id
         for (final long id : executionIds) {
             try {
-                if (SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(id))) {
-                    if (KERNEL_SERVICE.isExecutionRunning(id)) {
-                        final InternalJobExecution jobEx = KERNEL_SERVICE.getJobExecution(id);
+                if (securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(id))) {
+                    if (kernelService.isExecutionRunning(id)) {
+                        final InternalJobExecution jobEx = kernelService.getJobExecution(id);
                         jobExecutions.add(jobEx.getExecutionId());
                     }
                 }
@@ -313,12 +322,12 @@ public class JobOperatorImpl implements JobOperator {
     public List<StepExecution> getStepExecutions(long executionId)
         throws NoSuchJobExecutionException, JobSecurityException {
 
-        final InternalJobExecution jobEx = KERNEL_SERVICE.getJobExecution(executionId);
+        final InternalJobExecution jobEx = kernelService.getJobExecution(executionId);
         if (jobEx == null) {
             throw new NoSuchJobExecutionException("Job Execution: " + executionId + " not found");
         }
-        if (SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
-            return PERSISTENCE_SERVICE.getStepExecutionsForJobExecution(executionId);
+        if (securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
+            return persistenceManagerService.getStepExecutionsForJobExecution(executionId);
         }
         throw new JobSecurityException("The current user is not authorized to perform this operation");
     }
@@ -345,7 +354,7 @@ public class JobOperatorImpl implements JobOperator {
     }
 
     private long restartInternal(final long oldExecutionId, final Properties restartParameters) throws JobExecutionAlreadyCompleteException, NoSuchJobExecutionException, JobExecutionNotMostRecentException, JobRestartException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(oldExecutionId))) {
+        if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(oldExecutionId))) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
 
@@ -360,15 +369,15 @@ public class JobOperatorImpl implements JobOperator {
             jobParameterWriter.write("Job parameters on restart = null");
         }
 
-        return KERNEL_SERVICE.restartJob(oldExecutionId, restartParameters).getExecutionId();
+        return kernelService.restartJob(oldExecutionId, restartParameters).getExecutionId();
     }
 
     @Override
     public void stop(final long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+        if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
 
-        KERNEL_SERVICE.stopJob(executionId);
+        kernelService.stopJob(executionId);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
index 044df60..63636eb 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
@@ -73,13 +73,15 @@ public abstract class BaseStepController implements ExecutionElementController {
 
     protected long rootJobExecutionId;
 
-    protected static final BatchKernelService BATCH_KERNEL = ServicesManager.service(BatchKernelService.class);
-    private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
-    private static final JobStatusManagerService JOB_STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
+    protected final BatchKernelService kernelService;
+    private final PersistenceManagerService persistenceManagerService;
+    private final JobStatusManagerService statusManagerService;
 
     protected TransactionManagerAdapter transactionManager = null;
+    private TransactionManagementService txService;
 
-    protected BaseStepController(final RuntimeJobExecution jobExecution, final Step step, final StepContextImpl stepContext, final long rootJobExecutionId) {
+    protected BaseStepController(final RuntimeJobExecution jobExecution, final Step step, final StepContextImpl stepContext, final long rootJobExecutionId,
+                                 final ServicesManager servicesManager) {
         this.jobExecutionImpl = jobExecution;
         this.jobInstance = jobExecution.getJobInstance();
         this.stepContext = stepContext;
@@ -88,13 +90,19 @@ public abstract class BaseStepController implements ExecutionElementController {
             throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
         }
         this.step = step;
+
+        this.txService = servicesManager.service(TransactionManagementService.class);
+        this.kernelService = servicesManager.service(BatchKernelService.class);
+        this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+        this.statusManagerService = servicesManager.service(JobStatusManagerService.class);
     }
 
     protected BaseStepController(final RuntimeJobExecution jobExecution,
                                  final Step step, final StepContextImpl stepContext,
                                  final long rootJobExecutionId,
-                                 final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
-        this(jobExecution, step, stepContext, rootJobExecutionId);
+                                 final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+                                 final ServicesManager servicesManager) {
+        this(jobExecution, step, stepContext, rootJobExecutionId, servicesManager);
         this.analyzerStatusQueue = analyzerStatusQueue;
     }
 
@@ -239,7 +247,7 @@ public abstract class BaseStepController implements ExecutionElementController {
         Timestamp startTS = new Timestamp(time);
         stepContext.setStartTime(startTS);
 
-        PERSISTENCE_MANAGER_SERVICE.updateStepExecution(rootJobExecutionId, stepContext);
+        persistenceManagerService.updateStepExecution(rootJobExecutionId, stepContext);
     }
 
 
@@ -262,17 +270,17 @@ public abstract class BaseStepController implements ExecutionElementController {
 
     protected void updateBatchStatus(final BatchStatus updatedBatchStatus) {
         stepStatus.setBatchStatus(updatedBatchStatus);
-        JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+        statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
         stepContext.setBatchStatus(updatedBatchStatus);
     }
 
     protected boolean shouldStepBeExecuted() {
-        this.stepStatus = JOB_STATUS_MANAGER_SERVICE.getStepStatus(jobInstance.getInstanceId(), step.getId());
+        this.stepStatus = statusManagerService.getStepStatus(jobInstance.getInstanceId(), step.getId());
         if (stepStatus == null) {
             // create new step execution
             final StepExecutionImpl stepExecution = getNewStepExecution(rootJobExecutionId, stepContext);
             // create new step status for this run
-            stepStatus = JOB_STATUS_MANAGER_SERVICE.createStepStatus(stepExecution.getStepExecutionId());
+            stepStatus = statusManagerService.createStepStatus(stepExecution.getStepExecutionId());
             stepContext.setStepExecutionId(stepExecution.getStepExecutionId());
             return true;
         } else {
@@ -332,8 +340,8 @@ public abstract class BaseStepController implements ExecutionElementController {
 
     protected void statusStarting() {
         stepStatus.setBatchStatus(BatchStatus.STARTING);
-        JOB_STATUS_MANAGER_SERVICE.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
-        JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+        statusManagerService.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
+        statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
         stepContext.setBatchStatus(BatchStatus.STARTING);
     }
 
@@ -350,23 +358,23 @@ public abstract class BaseStepController implements ExecutionElementController {
         }
 
         stepStatus.setPersistentUserData(new PersistentDataWrapper(persistentBAOS.toByteArray()));
-        JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+        statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
     }
 
     protected void persistExitStatusAndEndTimestamp() {
         stepStatus.setExitStatus(stepContext.getExitStatus());
-        JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+        statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
 
         // set the end time metric before flushing
         long time = System.currentTimeMillis();
         Timestamp endTS = new Timestamp(time);
         stepContext.setEndTime(endTS);
 
-        PERSISTENCE_MANAGER_SERVICE.updateStepExecution(rootJobExecutionId, stepContext);
+        persistenceManagerService.updateStepExecution(rootJobExecutionId, stepContext);
     }
 
     private StepExecutionImpl getNewStepExecution(long rootJobExecutionId, StepContextImpl stepContext) {
-        return PERSISTENCE_MANAGER_SERVICE.createStepExecution(rootJobExecutionId, stepContext);
+        return persistenceManagerService.createStepExecution(rootJobExecutionId, stepContext);
     }
 
     private void setContextProperties() {
@@ -389,7 +397,7 @@ public abstract class BaseStepController implements ExecutionElementController {
         stepContext.addMetric(MetricImpl.MetricType.COMMIT_COUNT, 0);
         stepContext.addMetric(MetricImpl.MetricType.ROLLBACK_COUNT, 0);
 
-        transactionManager = ServicesManager.service(TransactionManagementService.class).getTransactionManager(stepContext);
+        transactionManager = txService.getTransactionManager(stepContext);
     }
 
     public void setStepContext(final StepContextImpl stepContext) {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
index 498b596..bbd6c93 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
@@ -23,12 +23,13 @@ import org.apache.batchee.container.jsl.ExecutionElement;
 import org.apache.batchee.container.proxy.DeciderProxy;
 import org.apache.batchee.container.proxy.InjectionReferences;
 import org.apache.batchee.container.proxy.ProxyFactory;
-import org.apache.batchee.spi.PersistenceManagerService;
 import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
 import org.apache.batchee.container.status.ExtendedBatchStatus;
 import org.apache.batchee.jaxb.Decision;
 import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.spi.BatchArtifactFactory;
+import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.runtime.StepExecution;
 import java.util.List;
@@ -41,11 +42,13 @@ public class DecisionController implements ExecutionElementController {
     private StepExecution[] previousStepExecutions = null;
 
     private final PersistenceManagerService persistenceService;
+    private final BatchArtifactFactory factory;
 
-    public DecisionController(final RuntimeJobExecution jobExecution, final Decision decision) {
+    public DecisionController(final RuntimeJobExecution jobExecution, final Decision decision, final ServicesManager manager) {
         this.jobExecution = jobExecution;
         this.decision = decision;
-        this.persistenceService = ServicesManager.service(PersistenceManagerService.class);
+        this.persistenceService = manager.service(PersistenceManagerService.class);
+        this.factory = manager.service(BatchArtifactFactory.class);
     }
 
     @Override
@@ -61,7 +64,7 @@ public class DecisionController implements ExecutionElementController {
         //so two of these contexts will always be null
         final InjectionReferences injectionRef = new InjectionReferences(jobExecution.getJobContext(), null, propList);
 
-        final DeciderProxy deciderProxy = ProxyFactory.createDeciderProxy(deciderId, injectionRef, jobExecution);
+        final DeciderProxy deciderProxy = ProxyFactory.createDeciderProxy(factory, deciderId, injectionRef, jobExecution);
 
         final String exitStatus = deciderProxy.decide(this.previousStepExecutions);
 

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
index 631ea3a..1492875 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
@@ -20,6 +20,8 @@ import org.apache.batchee.container.impl.StepContextImpl;
 import org.apache.batchee.container.impl.controller.batchlet.BatchletStepController;
 import org.apache.batchee.container.impl.controller.chunk.ChunkStepController;
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.PartitionDataWrapper;
 import org.apache.batchee.jaxb.Batchlet;
 import org.apache.batchee.jaxb.Chunk;
@@ -34,17 +36,18 @@ import java.util.concurrent.BlockingQueue;
 public class ExecutionElementControllerFactory {
     public static BaseStepController getStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
                                                            final StepContextImpl stepContext, final long rootJobExecutionId,
-                                                           final BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+                                                           final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+                                                           final ServicesManager servicesManager) {
         final Partition partition = step.getPartition();
         if (partition != null) {
 
             if (partition.getMapper() != null) {
-                return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+                return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
             }
 
             if (partition.getPlan() != null) {
                 if (partition.getPlan().getPartitions() != null) {
-                    return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+                    return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
                 }
             }
         }
@@ -54,26 +57,26 @@ public class ExecutionElementControllerFactory {
             if (step.getChunk() != null) {
                 throw new IllegalArgumentException("Step contains both a batchlet and a chunk.  Aborting.");
             }
-            return new BatchletStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+            return new BatchletStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue, servicesManager);
         } else {
             final Chunk chunk = step.getChunk();
             if (chunk == null) {
                 throw new IllegalArgumentException("Step does not contain either a batchlet or a chunk.  Aborting.");
             }
-            return new ChunkStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+            return new ChunkStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue, servicesManager);
         }
     }
 
-    public static DecisionController getDecisionController(RuntimeJobExecution jobExecutionImpl, Decision decision) {
-        return new DecisionController(jobExecutionImpl, decision);
+    public static DecisionController getDecisionController(final ServicesManager servicesManager, final RuntimeJobExecution jobExecutionImpl, final Decision decision) {
+        return new DecisionController(jobExecutionImpl, decision, servicesManager);
     }
 
-    public static FlowController getFlowController(RuntimeJobExecution jobExecutionImpl, Flow flow, long rootJobExecutionId) {
-        return new FlowController(jobExecutionImpl, flow, rootJobExecutionId);
+    public static FlowController getFlowController(final ServicesManager servicesManager, final RuntimeJobExecution jobExecutionImpl, final Flow flow, final long rootJobExecutionId) {
+        return new FlowController(jobExecutionImpl, flow, rootJobExecutionId, servicesManager);
     }
 
-    public static SplitController getSplitController(RuntimeJobExecution jobExecutionImpl, Split split, long rootJobExecutionId) {
-        return new SplitController(jobExecutionImpl, split, rootJobExecutionId);
+    public static SplitController getSplitController(final BatchKernelService kernel, final RuntimeJobExecution jobExecutionImpl, final Split split, final long rootJobExecutionId) {
+        return new SplitController(jobExecutionImpl, split, rootJobExecutionId, kernel);
     }
 
     private ExecutionElementControllerFactory() {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
index 15651c5..faf7669 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
@@ -27,6 +27,8 @@ import org.apache.batchee.container.jsl.IllegalTransitionException;
 import org.apache.batchee.container.jsl.Transition;
 import org.apache.batchee.container.jsl.TransitionElement;
 import org.apache.batchee.container.navigator.ModelNavigator;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
 import org.apache.batchee.container.status.ExtendedBatchStatus;
 import org.apache.batchee.container.util.PartitionDataWrapper;
@@ -44,6 +46,7 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
 public class ExecutionTransitioner {
+    private final ServicesManager manager;
     private RuntimeJobExecution jobExecution;
     private long rootJobExecutionId;
     private ModelNavigator<?> modelNavigator;
@@ -60,19 +63,24 @@ public class ExecutionTransitioner {
 
     private List<Long> stepExecIds;
 
-    public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<?> modelNavigator) {
+    public ExecutionTransitioner(final RuntimeJobExecution jobExecution, final long rootJobExecutionId, final ModelNavigator<?> modelNavigator,
+                                 final ServicesManager servicesManager) {
         this.jobExecution = jobExecution;
         this.rootJobExecutionId = rootJobExecutionId;
         this.modelNavigator = modelNavigator;
         this.jobContext = jobExecution.getJobContext();
+        this.manager = servicesManager;
     }
 
-    public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<JSLJob> jobNavigator, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+    public ExecutionTransitioner(final RuntimeJobExecution jobExecution, final long rootJobExecutionId,
+                                 final ModelNavigator<JSLJob> jobNavigator, final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+                                 final ServicesManager manager) {
         this.jobExecution = jobExecution;
         this.rootJobExecutionId = rootJobExecutionId;
         this.modelNavigator = jobNavigator;
         this.jobContext = jobExecution.getJobContext();
         this.analyzerQueue = analyzerQueue;
+        this.manager = manager;
     }
 
     public ExecutionStatus doExecutionLoop() {
@@ -163,19 +171,19 @@ public class ExecutionTransitioner {
 
         if (currentExecutionElement instanceof Decision) {
             final Decision decision = (Decision) currentExecutionElement;
-            elementController = ExecutionElementControllerFactory.getDecisionController(jobExecution, decision);
+            elementController = ExecutionElementControllerFactory.getDecisionController(manager, jobExecution, decision);
             final DecisionController decisionController = (DecisionController) elementController;
             decisionController.setPreviousStepExecutions(previousExecutionElement, previousElementController);
         } else if (currentExecutionElement instanceof Flow) {
             final Flow flow = (Flow) currentExecutionElement;
-            elementController = ExecutionElementControllerFactory.getFlowController(jobExecution, flow, rootJobExecutionId);
+            elementController = ExecutionElementControllerFactory.getFlowController(manager, jobExecution, flow, rootJobExecutionId);
         } else if (currentExecutionElement instanceof Split) {
             final Split split = (Split) currentExecutionElement;
-            elementController = ExecutionElementControllerFactory.getSplitController(jobExecution, split, rootJobExecutionId);
+            elementController = ExecutionElementControllerFactory.getSplitController(manager.service(BatchKernelService.class), jobExecution, split, rootJobExecutionId);
         } else if (currentExecutionElement instanceof Step) {
             final Step step = (Step) currentExecutionElement;
             final StepContextImpl stepContext = new StepContextImpl(step.getId());
-            elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue);
+            elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue, manager);
         } else {
             elementController = null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
index db0d4c6..a4b6fda 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
@@ -22,6 +22,7 @@ import org.apache.batchee.container.impl.JobContextImpl;
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.navigator.ModelNavigator;
 import org.apache.batchee.container.navigator.NavigatorFactory;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
 import org.apache.batchee.container.status.ExtendedBatchStatus;
 import org.apache.batchee.jaxb.Flow;
@@ -32,6 +33,7 @@ import java.util.List;
 public class FlowController implements ExecutionElementController {
     private final RuntimeJobExecution jobExecution;
     private final JobContextImpl jobContext;
+    private final ServicesManager manager;
 
     protected ModelNavigator<Flow> flowNavigator;
 
@@ -40,18 +42,19 @@ public class FlowController implements ExecutionElementController {
 
     private ExecutionTransitioner transitioner;
 
-    public FlowController(final RuntimeJobExecution jobExecution, final Flow flow, final long rootJobExecutionId) {
+    public FlowController(final RuntimeJobExecution jobExecution, final Flow flow, final long rootJobExecutionId, final ServicesManager manager) {
         this.jobExecution = jobExecution;
         this.jobContext = jobExecution.getJobContext();
         this.flowNavigator = NavigatorFactory.createFlowNavigator(flow);
         this.flow = flow;
         this.rootJobExecutionId = rootJobExecutionId;
+        this.manager = manager;
     }
 
     @Override
     public ExecutionStatus execute() {
         if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
-            transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator);
+            transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator, manager);
             return transitioner.doExecutionLoop();
         }
         return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
index 93f6160..ea6f249 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
@@ -17,6 +17,7 @@
 package org.apache.batchee.container.impl.controller;
 
 import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
 import org.apache.batchee.container.status.ExtendedBatchStatus;
 import org.apache.batchee.container.util.FlowInSplitBuilderConfig;
@@ -25,8 +26,8 @@ public class FlowInSplitThreadRootController extends JobThreadRootController {
     // Careful, we have a separately named reference to the same object in the parent class
     private RuntimeFlowInSplitExecution flowInSplitExecution;
 
-    public FlowInSplitThreadRootController(final RuntimeFlowInSplitExecution flowInSplitExecution, final FlowInSplitBuilderConfig config) {
-        super(flowInSplitExecution, config.getRootJobExecutionId());
+    public FlowInSplitThreadRootController(final RuntimeFlowInSplitExecution flowInSplitExecution, final FlowInSplitBuilderConfig config, final ServicesManager manager) {
+        super(flowInSplitExecution, config.getRootJobExecutionId(), manager);
         this.flowInSplitExecution = flowInSplitExecution;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
index 3446ba1..d671eb5 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
@@ -17,13 +17,14 @@
 package org.apache.batchee.container.impl.controller;
 
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.ServicesManager;
 
 public class JobController extends JobThreadRootController {
-    private JobController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId) {
-        super(jobExecution, rootJobExecutionId);
+    private JobController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId, final ServicesManager manager) {
+        super(jobExecution, rootJobExecutionId, manager);
     }
 
-    public JobController(final RuntimeJobExecution jobExecution) {
-        this(jobExecution, jobExecution.getExecutionId());
+    public JobController(final RuntimeJobExecution jobExecution, final ServicesManager manager) {
+        this(jobExecution, jobExecution.getExecutionId(), manager);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
index 1dd8c3b..feb3f1c 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
@@ -25,12 +25,13 @@ import org.apache.batchee.container.proxy.InjectionReferences;
 import org.apache.batchee.container.proxy.JobListenerProxy;
 import org.apache.batchee.container.proxy.ListenerFactory;
 import org.apache.batchee.container.services.JobStatusManagerService;
-import org.apache.batchee.spi.PersistenceManagerService;
 import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
 import org.apache.batchee.container.status.ExtendedBatchStatus;
 import org.apache.batchee.container.util.PartitionDataWrapper;
 import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.spi.BatchArtifactFactory;
+import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.runtime.BatchStatus;
 import java.io.PrintWriter;
@@ -52,22 +53,25 @@ public abstract class JobThreadRootController implements ThreadRootController {
     protected final ModelNavigator<JSLJob> jobNavigator;
     protected final JobStatusManagerService jobStatusService;
     protected final PersistenceManagerService persistenceService;
+    protected final ServicesManager manager;
 
     private ExecutionTransitioner transitioner;
     private BlockingQueue<PartitionDataWrapper> analyzerQueue;
 
-    public JobThreadRootController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId) {
+    public JobThreadRootController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId,
+                                   final ServicesManager servicesManager) {
         this.jobExecution = jobExecution;
         this.jobContext = jobExecution.getJobContext();
         this.rootJobExecutionId = rootJobExecutionId;
         this.jobInstanceId = jobExecution.getInstanceId();
-        this.jobStatusService = ServicesManager.service(JobStatusManagerService.class);
-        this.persistenceService = ServicesManager.service(PersistenceManagerService.class);
+        this.jobStatusService = servicesManager.service(JobStatusManagerService.class);
+        this.persistenceService = servicesManager.service(PersistenceManagerService.class);
         this.jobNavigator = jobExecution.getJobNavigator();
+        this.manager = servicesManager;
 
         final JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
         final InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
-        listenerFactory = new ListenerFactory(jobModel, injectionRef, jobExecution);
+        listenerFactory = new ListenerFactory(servicesManager.service(BatchArtifactFactory.class), jobModel, injectionRef, jobExecution);
         jobExecution.setListenerFactory(listenerFactory);
     }
 
@@ -75,8 +79,9 @@ public abstract class JobThreadRootController implements ThreadRootController {
      * By not passing the rootJobExecutionId, we are "orphaning" the subjob execution and making it not findable from the parent.
      * This is exactly what we want for getStepExecutions()... we don't want it to get extraneous entries for the partitions.
      */
-    public JobThreadRootController(final RuntimeJobExecution jobExecution, final BlockingQueue<PartitionDataWrapper> analyzerQueue) {
-        this(jobExecution, jobExecution.getExecutionId());
+    public JobThreadRootController(final RuntimeJobExecution jobExecution, final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+                                   final ServicesManager servicesManager) {
+        this(jobExecution, jobExecution.getExecutionId(), servicesManager);
         this.analyzerQueue = analyzerQueue;
     }
 
@@ -96,7 +101,7 @@ public abstract class JobThreadRootController implements ThreadRootController {
                 // The BIG loop transitioning
                 // within the job !!!
                 // --------------------
-                transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue);
+                transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue, manager);
                 retVal = transitioner.doExecutionLoop();
                 ExtendedBatchStatus extBatchStatus = retVal.getExtendedBatchStatus();
                 switch (extBatchStatus) {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
index 920ca99..8cd3593 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
@@ -17,14 +17,15 @@
 package org.apache.batchee.container.impl.controller;
 
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.PartitionsBuilderConfig;
 
 /**
  * Currently there's no special function on top of the subjob required of the partition.
  */
 public class PartitionThreadRootController extends JobThreadRootController {
-    public PartitionThreadRootController(RuntimeJobExecution jobExecution, PartitionsBuilderConfig config) {
-        super(jobExecution, config.getAnalyzerQueue());
+    public PartitionThreadRootController(final RuntimeJobExecution jobExecution, final PartitionsBuilderConfig config, final ServicesManager servicesManager) {
+        super(jobExecution, config.getAnalyzerQueue(), servicesManager);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index 9797233..01444ef 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -26,6 +26,7 @@ import org.apache.batchee.container.proxy.PartitionMapperProxy;
 import org.apache.batchee.container.proxy.PartitionReducerProxy;
 import org.apache.batchee.container.proxy.ProxyFactory;
 import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.BatchPartitionPlan;
 import org.apache.batchee.container.util.BatchPartitionWorkUnit;
 import org.apache.batchee.container.util.BatchWorkUnit;
@@ -39,6 +40,7 @@ import org.apache.batchee.jaxb.PartitionMapper;
 import org.apache.batchee.jaxb.PartitionReducer;
 import org.apache.batchee.jaxb.Property;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 
 import javax.batch.api.partition.PartitionPlan;
 import javax.batch.api.partition.PartitionReducer.PartitionStatus;
@@ -80,8 +82,12 @@ public class PartitionedStepController extends BaseStepController {
 
     BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
 
-    protected PartitionedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, StepContextImpl stepContext, long rootJobExecutionId) {
-        super(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+    private final BatchArtifactFactory factory;
+
+    protected PartitionedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
+                                        final long rootJobExecutionId, final ServicesManager servicesManager) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
+        factory = servicesManager.service(BatchArtifactFactory.class);
     }
 
     @Override
@@ -96,7 +102,7 @@ public class PartitionedStepController extends BaseStepController {
             if (parallelBatchWorkUnits != null) {
                 for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
                     try {
-                        BATCH_KERNEL.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+                        kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
                     } catch (Exception e) {
                         // TODO - Is this what we want to know.
                         // Blow up if it happens to force the issue.
@@ -128,7 +134,7 @@ public class PartitionedStepController extends BaseStepController {
             // Set all the contexts associated with this controller.
             // Some of them may be null
             final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propertyList);
-            final PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            final PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(factory, partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);
 
 
             final PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
@@ -280,9 +286,9 @@ public class PartitionedStepController extends BaseStepController {
             PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
             // Then build all the subjobs but do not start them yet
             if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
-                parallelBatchWorkUnits = BATCH_KERNEL.buildOnRestartParallelPartitions(config);
+                parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config);
             } else {
-                parallelBatchWorkUnits = BATCH_KERNEL.buildNewParallelPartitions(config);
+                parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config);
             }
 
             // NOTE:  At this point I might not have as many work units as I had partitions, since some may have already completed.
@@ -304,9 +310,9 @@ public class PartitionedStepController extends BaseStepController {
         for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
             final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);
             if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
-                BATCH_KERNEL.restartGeneratedJob(workUnit);
+                kernelService.restartGeneratedJob(workUnit);
             } else {
-                BATCH_KERNEL.startGeneratedJob(workUnit);
+                kernelService.startGeneratedJob(workUnit);
             }
         }
 
@@ -336,9 +342,9 @@ public class PartitionedStepController extends BaseStepController {
             if (numCurrentCompleted < numTotalForThisExecution) {
                 if (numCurrentSubmitted < numTotalForThisExecution) {
                     if (stepStatus.getStartCount() > 1) {
-                        BATCH_KERNEL.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+                        kernelService.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
                     } else {
-                        BATCH_KERNEL.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+                        kernelService.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
                     }
                 }
             } else {
@@ -388,14 +394,14 @@ public class PartitionedStepController extends BaseStepController {
         if (analyzer != null) {
             final List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties().getPropertyList();
             injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-            analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(factory, analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
         }
 
         final PartitionReducer partitionReducer = step.getPartition().getReducer();
         if (partitionReducer != null) {
             final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties().getPropertyList();
             injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-            partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(factory, partitionReducer.getRef(), injectionRef, stepContext, jobExecutionImpl);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
index d085622..94738c0 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
@@ -23,11 +23,13 @@ import org.apache.batchee.container.proxy.InjectionReferences;
 import org.apache.batchee.container.proxy.PartitionCollectorProxy;
 import org.apache.batchee.container.proxy.ProxyFactory;
 import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.PartitionDataWrapper;
 import org.apache.batchee.container.util.PartitionDataWrapper.PartitionEventType;
 import org.apache.batchee.jaxb.Collector;
 import org.apache.batchee.jaxb.Property;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 
 import java.io.Serializable;
 import java.util.List;
@@ -41,11 +43,17 @@ import java.util.concurrent.BlockingQueue;
  * separate main thread with controller).
  */
 public abstract class SingleThreadedStepController extends BaseStepController implements Controller {
+    private final BatchArtifactFactory factory;
+
     // Collector only used from partition threads, not main thread
     protected PartitionCollectorProxy collectorProxy = null;
 
-    protected SingleThreadedStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
-        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+    protected SingleThreadedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
+                                           final StepContextImpl stepContext, final long rootJobExecutionId,
+                                           final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+                                           final ServicesManager servicesManager) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, servicesManager);
+        factory = servicesManager.service(BatchArtifactFactory.class);
     }
 
     List<StepListenerProxy> stepListeners = null;
@@ -67,7 +75,7 @@ public abstract class SingleThreadedStepController extends BaseStepController im
                  * contexts may be null
                  */
                 injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-                this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(collector.getRef(), injectionRef, this.stepContext, jobExecutionImpl);
+                this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(factory, collector.getRef(), injectionRef, this.stepContext, jobExecutionImpl);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
index b882de6..43c26bb 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
@@ -60,13 +60,13 @@ public class SplitController implements ExecutionElementController {
 
     protected Split split;
 
-    public SplitController(RuntimeJobExecution jobExecution, Split split, long rootJobExecutionId) {
+    public SplitController(final RuntimeJobExecution jobExecution, final Split split, final long rootJobExecutionId,
+                           final BatchKernelService kernelService) {
         this.jobExecution = jobExecution;
         this.jobContext = jobExecution.getJobContext();
         this.rootJobExecutionId = rootJobExecutionId;
         this.split = split;
-
-        batchKernel = ServicesManager.service(BatchKernelService.class);
+        this.batchKernel = kernelService;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
index a0f0cbb..13b60df 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
@@ -23,29 +23,34 @@ import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.proxy.BatchletProxy;
 import org.apache.batchee.container.proxy.InjectionReferences;
 import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.PartitionDataWrapper;
 import org.apache.batchee.jaxb.Batchlet;
 import org.apache.batchee.jaxb.Property;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 
 import javax.batch.runtime.BatchStatus;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
 public class BatchletStepController extends SingleThreadedStepController {
+    private final BatchArtifactFactory factory;
     private BatchletProxy batchletProxy;
 
     public BatchletStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
                                   final StepContextImpl stepContext, final long rootJobExecutionId,
-                                  final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
-        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+                                  final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+                                  final ServicesManager manager) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, manager);
+        factory = manager.service(BatchArtifactFactory.class);
     }
 
     private void invokeBatchlet(final Batchlet batchlet) throws BatchContainerServiceException {
         final String batchletId = batchlet.getRef();
         final List<Property> propList = (batchlet.getProperties() == null) ? null : batchlet.getProperties().getPropertyList();
         final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-        batchletProxy = ProxyFactory.createBatchletProxy(batchletId, injectionRef, stepContext, jobExecutionImpl);
+        batchletProxy = ProxyFactory.createBatchletProxy(factory, batchletId, injectionRef, stepContext, jobExecutionImpl);
 
         if (!wasStopIssued()) {
             final String processRetVal = batchletProxy.process();

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
index 34f2fae..781f41f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
@@ -24,14 +24,15 @@ import org.apache.batchee.container.proxy.InjectionReferences;
 import org.apache.batchee.container.proxy.ProxyFactory;
 import org.apache.batchee.jaxb.Chunk;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 
 public final class CheckpointAlgorithmFactory {
-    public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final Step step, final InjectionReferences injectionReferences, final StepContextImpl stepContext, final RuntimeJobExecution jobExecution) {
+    public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final BatchArtifactFactory factory, final Step step, final InjectionReferences injectionReferences, final StepContextImpl stepContext, final RuntimeJobExecution jobExecution) {
         final Chunk chunk = step.getChunk();
         final String checkpointType = chunk.getCheckpointPolicy();
         final CheckpointAlgorithmProxy proxy;
         if ("custom".equalsIgnoreCase(checkpointType)) {
-            proxy = ProxyFactory.createCheckpointAlgorithmProxy(chunk.getCheckpointAlgorithm().getRef(), injectionReferences, stepContext, jobExecution);
+            proxy = ProxyFactory.createCheckpointAlgorithmProxy(factory, chunk.getCheckpointAlgorithm().getRef(), injectionReferences, stepContext, jobExecution);
         } else /* "item" */ {
             proxy = new CheckpointAlgorithmProxy(new ItemCheckpointAlgorithm());
         }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
index 76cae58..f7f01bf 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -38,14 +38,15 @@ public class CheckpointManager {
 
     public CheckpointManager(final ItemReaderProxy reader, final ItemWriterProxy writer,
                              final CheckpointAlgorithm chkptAlg,
-                             final long jobInstanceID, final String stepId) {
+                             final long jobInstanceID, final String stepId,
+                             final PersistenceManagerService persistenceManagerService) {
         this.readerProxy = reader;
         this.writerProxy = writer;
         this.checkpointAlgorithm = chkptAlg;
         this.stepId = stepId;
         this.jobInstanceID = jobInstanceID;
 
-        this.persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+        this.persistenceManagerService = persistenceManagerService;
     }
 
     public boolean applyCheckPointPolicy() {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index 7c8eec7..61cadfe 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -47,6 +47,7 @@ import org.apache.batchee.jaxb.ItemReader;
 import org.apache.batchee.jaxb.ItemWriter;
 import org.apache.batchee.jaxb.Property;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.api.chunk.CheckpointAlgorithm;
@@ -64,7 +65,8 @@ public class ChunkStepController extends SingleThreadedStepController {
     private final static String sourceClass = ChunkStepController.class.getName();
     private final static Logger logger = Logger.getLogger(sourceClass);
 
-    private final PersistenceManagerService persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+    private final PersistenceManagerService persistenceManagerService;
+    private final BatchArtifactFactory artifactFactory;
 
     private Chunk chunk = null;
     private ItemReaderProxy readerProxy = null;
@@ -84,8 +86,11 @@ public class ChunkStepController extends SingleThreadedStepController {
     private boolean rollbackRetry = false;
 
     public ChunkStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
-                               final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
-        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+                               final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+                               final ServicesManager servicesManager) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, servicesManager);
+        this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+        this.artifactFactory = servicesManager.service(BatchArtifactFactory.class);
     }
 
     /**
@@ -472,7 +477,6 @@ public class ChunkStepController extends SingleThreadedStepController {
         int timeInterval = ChunkHelper.getTimeLimit(chunk);
         boolean checkPointed = true;
         boolean rollback = false;
-        Throwable caughtThrowable = null;
 
         // begin new transaction at first iteration or after a checkpoint commit
 
@@ -498,7 +502,7 @@ public class ChunkStepController extends SingleThreadedStepController {
                         positionWriterAtCheckpoint();
                         checkpointManager = new CheckpointManager(readerProxy, writerProxy,
                             getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl
-                            .getJobInstance().getInstanceId(), step.getId());
+                            .getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
                     }
                 }
 
@@ -581,7 +585,6 @@ public class ChunkStepController extends SingleThreadedStepController {
 
             }
         } catch (final Exception e) {
-            caughtThrowable = e;
             logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
             // Only try to call onError() if we have an Exception, but not an Error.
             for (ChunkListenerProxy chunkProxy : chunkListeners) {
@@ -591,20 +594,20 @@ public class ChunkStepController extends SingleThreadedStepController {
                     logger.log(Level.SEVERE, e1.getMessage(), e1);
                 }
             }
+            rollback(e);
         } catch (final Throwable t) {
-            caughtThrowable = t;
-            logger.log(Level.SEVERE, t.getMessage(), t);
-        } finally {
-            if (caughtThrowable != null) {
-                transactionManager.setRollbackOnly();
-                readerProxy.close();
-                writerProxy.close();
-                transactionManager.rollback();
-                throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", caughtThrowable);
-            }
+            rollback(t);
         }
     }
 
+    private void rollback(final Throwable t) {
+        transactionManager.setRollbackOnly();
+        readerProxy.close();
+        writerProxy.close();
+        transactionManager.rollback();
+        throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
+    }
+
     protected void invokeCoreStep() throws BatchContainerServiceException {
 
         this.chunk = step.getChunk();
@@ -637,7 +640,7 @@ public class ChunkStepController extends SingleThreadedStepController {
             final ItemReader itemReader = chunk.getReader();
             final List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
             final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemReaderProps);
-            readerProxy = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            readerProxy = ProxyFactory.createItemReaderProxy(artifactFactory, itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
         }
 
         {
@@ -645,7 +648,7 @@ public class ChunkStepController extends SingleThreadedStepController {
             if (itemProcessor != null) {
                 final List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
                 final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemProcessorProps);
-                processorProxy = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef, stepContext, jobExecutionImpl);
+                processorProxy = ProxyFactory.createItemProcessorProxy(artifactFactory, itemProcessor.getRef(), injectionRef, stepContext, jobExecutionImpl);
             }
         }
 
@@ -653,7 +656,7 @@ public class ChunkStepController extends SingleThreadedStepController {
             final ItemWriter itemWriter = chunk.getWriter();
             final List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
             final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemWriterProps);
-            writerProxy = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            writerProxy = ProxyFactory.createItemWriterProxy(artifactFactory, itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
         }
 
         {
@@ -665,7 +668,7 @@ public class ChunkStepController extends SingleThreadedStepController {
             }
 
             final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-            checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(step, injectionRef, stepContext, jobExecutionImpl);
+            checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(artifactFactory, step, injectionRef, stepContext, jobExecutionImpl);
         }
 
         {
@@ -689,7 +692,7 @@ public class ChunkStepController extends SingleThreadedStepController {
                 chkptAlg = checkpointProxy;
             }
 
-            checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
+            checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
 
             skipHandler = new SkipHandler(chunk);
             skipHandler.addSkipProcessListener(skipProcessListeners);