You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2014/01/02 10:27:54 UTC

[1/2] BATCHEE-9 ServicesManager shouldn't be contextual once an action started

Updated Branches:
  refs/heads/master b303e37e6 -> 8ddf9f754


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
index 5b89a88..6230a76 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
@@ -42,10 +42,6 @@ import javax.batch.runtime.JobInstance;
 import java.util.Properties;
 
 public class JobExecutionHelper {
-    private static final JobStatusManagerService JOB_STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
-    private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
-    private static final SecurityService SECURITY_SERVICE = ServicesManager.service(SecurityService.class);
-
     private static ModelNavigator<JSLJob> getResolvedJobNavigator(final String jobXml, final Properties jobParameters, final boolean parallelExecution) {
         final JSLJob jobModel = new JobModelResolver().resolveModel(jobXml);
         final PropertyResolver<JSLJob> propResolver = PropertyResolverFactory.createJobPropertyResolver(parallelExecution);
@@ -69,17 +65,19 @@ public class JobExecutionHelper {
         return new JobContextImpl(jobNavigator, jslProperties);
     }
 
-    private static JobInstance getNewJobInstance(final String name, final String jobXml) {
-        return PERSISTENCE_MANAGER_SERVICE.createJobInstance(name, SECURITY_SERVICE.getLoggedUser(), jobXml);
+    private static JobInstance getNewJobInstance(final ServicesManager servicesManager, final String name, final String jobXml) {
+        return servicesManager.service(PersistenceManagerService.class).createJobInstance(
+                name, servicesManager.service(SecurityService.class).getLoggedUser(), jobXml);
     }
 
-    private static JobInstance getNewSubJobInstance(final String name) {
-        return PERSISTENCE_MANAGER_SERVICE.createSubJobInstance(name, SECURITY_SERVICE.getLoggedUser());
+    private static JobInstance getNewSubJobInstance(final ServicesManager servicesManager, final String name) {
+        return servicesManager.service(PersistenceManagerService.class).createSubJobInstance(
+                name, servicesManager.service(SecurityService.class).getLoggedUser());
     }
 
-    private static JobStatus createNewJobStatus(final JobInstance jobInstance) {
+    private static JobStatus createNewJobStatus(final JobStatusManagerService statusManagerService, final JobInstance jobInstance) {
         final long instanceId = jobInstance.getInstanceId();
-        final JobStatus jobStatus = JOB_STATUS_MANAGER_SERVICE.createJobStatus(instanceId);
+        final JobStatus jobStatus = statusManagerService.createJobStatus(instanceId);
         jobStatus.setJobInstance(jobInstance);
         return jobStatus;
     }
@@ -91,47 +89,50 @@ public class JobExecutionHelper {
         }
     }
 
-    public static RuntimeJobExecution startJob(final String jobXML, final Properties jobParameters) throws JobStartException {
+    public static RuntimeJobExecution startJob(final ServicesManager servicesManager, final String jobXML, final Properties jobParameters) throws JobStartException {
         final JSLJob jobModel = new JobModelResolver().resolveModel(jobXML);
         final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, jobParameters, false);
         final JobContextImpl jobContext = getJobContext(jobNavigator);
-        final JobInstance jobInstance = getNewJobInstance(jobNavigator.getRootModelElement().getId(), jobXML);
-        final RuntimeJobExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
+        final JobInstance jobInstance = getNewJobInstance(servicesManager, jobNavigator.getRootModelElement().getId(), jobXML);
+        final RuntimeJobExecution executionHelper = servicesManager.service(PersistenceManagerService.class).createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
 
         executionHelper.prepareForExecution(jobContext);
 
-        final JobStatus jobStatus = createNewJobStatus(jobInstance);
-        JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+        final JobStatusManagerService statusManagerService = servicesManager.service(JobStatusManagerService.class);
+        final JobStatus jobStatus = createNewJobStatus(statusManagerService, jobInstance);
+        statusManagerService.updateJobStatus(jobStatus);
 
         return executionHelper;
     }
 
-    public static RuntimeFlowInSplitExecution startFlowInSplit(final JSLJob jobModel) throws JobStartException {
+    public static RuntimeFlowInSplitExecution startFlowInSplit(final ServicesManager servicesManager, final JSLJob jobModel) throws JobStartException {
         final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, null, true);
         final JobContextImpl jobContext = getJobContext(jobNavigator);
-        final JobInstance jobInstance = getNewSubJobInstance(jobNavigator.getRootModelElement().getId());
-        final RuntimeFlowInSplitExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
+        final JobInstance jobInstance = getNewSubJobInstance(servicesManager, jobNavigator.getRootModelElement().getId());
+        final RuntimeFlowInSplitExecution executionHelper = servicesManager.service(PersistenceManagerService.class).createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
 
         executionHelper.prepareForExecution(jobContext);
 
-        final JobStatus jobStatus = createNewJobStatus(jobInstance);
-        JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+        final JobStatusManagerService statusManagerService = servicesManager.service(JobStatusManagerService.class);
+        final JobStatus jobStatus = createNewJobStatus(statusManagerService, jobInstance);
+        statusManagerService.updateJobStatus(jobStatus);
 
         return executionHelper;
     }
 
-    public static RuntimeJobExecution startPartition(JSLJob jobModel, Properties jobParameters) throws JobStartException {
+    public static RuntimeJobExecution startPartition(final ServicesManager servicesManager, final JSLJob jobModel, final Properties jobParameters) throws JobStartException {
         final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, jobParameters, true);
         final JobContextImpl jobContext = getJobContext(jobNavigator);
 
-        final JobInstance jobInstance = getNewSubJobInstance(jobNavigator.getRootModelElement().getId());
+        final JobInstance jobInstance = getNewSubJobInstance(servicesManager, jobNavigator.getRootModelElement().getId());
 
-        final RuntimeJobExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
+        final RuntimeJobExecution executionHelper = servicesManager.service(PersistenceManagerService.class).createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
 
         executionHelper.prepareForExecution(jobContext);
 
-        final JobStatus jobStatus = createNewJobStatus(jobInstance);
-        JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+        final JobStatusManagerService statusManagerService = servicesManager.service(JobStatusManagerService.class);
+        final JobStatus jobStatus = createNewJobStatus(statusManagerService, jobInstance);
+        statusManagerService.updateJobStatus(jobStatus);
 
         return executionHelper;
     }
@@ -148,35 +149,38 @@ public class JobExecutionHelper {
         }
     }
 
-    private static void validateJobExecutionIsMostRecent(final long jobInstanceId, final long executionId) throws JobExecutionNotMostRecentException {
-        final long mostRecentExecutionId = PERSISTENCE_MANAGER_SERVICE.getMostRecentExecutionId(jobInstanceId);
+    private static void validateJobExecutionIsMostRecent(final PersistenceManagerService persistenceManagerService, final long jobInstanceId, final long executionId) throws JobExecutionNotMostRecentException {
+        final long mostRecentExecutionId = persistenceManagerService.getMostRecentExecutionId(jobInstanceId);
         if (mostRecentExecutionId != executionId) {
             throw new JobExecutionNotMostRecentException("ExecutionId: " + executionId + " is not the most recent execution.");
         }
     }
 
-    public static RuntimeJobExecution restartPartition(final long execId, final JSLJob gennedJobModel, final Properties partitionProps) throws JobRestartException,
+    public static RuntimeJobExecution restartPartition(final ServicesManager servicesManager, final long execId, final JSLJob gennedJobModel, final Properties partitionProps) throws JobRestartException,
         JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
-        return restartExecution(execId, gennedJobModel, partitionProps, true, false);
+        return restartExecution(servicesManager, execId, gennedJobModel, partitionProps, true, false);
     }
 
-    public static RuntimeFlowInSplitExecution restartFlowInSplit(final long execId, final JSLJob gennedJobModel) throws JobRestartException,
+    public static RuntimeFlowInSplitExecution restartFlowInSplit(final ServicesManager servicesManager, final long execId, final JSLJob gennedJobModel) throws JobRestartException,
         JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
-        return (RuntimeFlowInSplitExecution) restartExecution(execId, gennedJobModel, null, true, true);
+        return (RuntimeFlowInSplitExecution) restartExecution(servicesManager, execId, gennedJobModel, null, true, true);
     }
 
-    public static RuntimeJobExecution restartJob(final long executionId, final Properties restartJobParameters) throws JobRestartException,
+    public static RuntimeJobExecution restartJob(final ServicesManager servicesManager, final long executionId, final Properties restartJobParameters) throws JobRestartException,
         JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
-        return restartExecution(executionId, null, restartJobParameters, false, false);
+        return restartExecution(servicesManager, executionId, null, restartJobParameters, false, false);
     }
 
-    private static RuntimeJobExecution restartExecution(final long executionId, final JSLJob gennedJobModel, final Properties restartJobParameters, final boolean parallelExecution, final boolean flowInSplit) throws JobRestartException,
+    private static RuntimeJobExecution restartExecution(final ServicesManager servicesManager, final long executionId, final JSLJob gennedJobModel, final Properties restartJobParameters, final boolean parallelExecution, final boolean flowInSplit) throws JobRestartException,
         JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
 
-        final long jobInstanceId = PERSISTENCE_MANAGER_SERVICE.getJobInstanceIdByExecutionId(executionId);
-        final JobStatus jobStatus = JOB_STATUS_MANAGER_SERVICE.getJobStatus(jobInstanceId);
+        final PersistenceManagerService persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+        final JobStatusManagerService jobStatusManagerService = servicesManager.service(JobStatusManagerService.class);
+
+        final long jobInstanceId = persistenceManagerService.getJobInstanceIdByExecutionId(executionId);
+        final JobStatus jobStatus = jobStatusManagerService.getJobStatus(jobInstanceId);
 
-        validateJobExecutionIsMostRecent(jobInstanceId, executionId);
+        validateJobExecutionIsMostRecent(persistenceManagerService, jobInstanceId, executionId);
 
         validateJobInstanceNotCompleteOrAbandonded(jobStatus);
 
@@ -196,22 +200,22 @@ public class JobExecutionHelper {
 
         final RuntimeJobExecution executionHelper;
         if (flowInSplit) {
-            executionHelper = PERSISTENCE_MANAGER_SERVICE.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
+            executionHelper = persistenceManagerService.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
         } else {
-            executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, restartJobParameters, jobContext.getBatchStatus());
+            executionHelper = persistenceManagerService.createJobExecution(jobInstance, restartJobParameters, jobContext.getBatchStatus());
         }
         executionHelper.prepareForExecution(jobContext, jobStatus.getRestartOn());
-        JOB_STATUS_MANAGER_SERVICE.updateJobStatusWithNewExecution(jobInstance.getInstanceId(), executionHelper.getExecutionId());
+        jobStatusManagerService.updateJobStatusWithNewExecution(jobInstance.getInstanceId(), executionHelper.getExecutionId());
 
         return executionHelper;
     }
 
-    public static InternalJobExecution getPersistedJobOperatorJobExecution(final long jobExecutionId) throws NoSuchJobExecutionException {
-        return PERSISTENCE_MANAGER_SERVICE.jobOperatorGetJobExecution(jobExecutionId);
+    public static InternalJobExecution getPersistedJobOperatorJobExecution(final PersistenceManagerService persistenceManagerService, final long jobExecutionId) throws NoSuchJobExecutionException {
+        return persistenceManagerService.jobOperatorGetJobExecution(jobExecutionId);
     }
 
 
-    public static JobInstance getJobInstance(final long executionId) {
-        return JOB_STATUS_MANAGER_SERVICE.getJobStatusFromExecutionId(executionId).getJobInstance();
+    public static JobInstance getJobInstance(final JobStatusManagerService statusManagerService, final long executionId) {
+        return statusManagerService.getJobStatusFromExecutionId(executionId).getJobInstance();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
index 8570351..bf065bf 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
@@ -16,13 +16,15 @@
  */
 package org.apache.batchee.container.impl.jobinstance;
 
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
+import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.runtime.JobInstance;
 
 public class RuntimeFlowInSplitExecution extends RuntimeJobExecution {
-    public RuntimeFlowInSplitExecution(final JobInstance jobInstance, final long executionId) {
-        super(jobInstance, executionId);
+    public RuntimeFlowInSplitExecution(final JobInstance jobInstance, final long executionId, final PersistenceManagerService manager) {
+        super(jobInstance, executionId, manager);
     }
 
     private ExecutionStatus flowStatus;

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
index d9fdb84..b1b81c4 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
@@ -22,6 +22,7 @@ import org.apache.batchee.container.navigator.ModelNavigator;
 import org.apache.batchee.container.proxy.ListenerFactory;
 import org.apache.batchee.container.services.InternalJobExecution;
 import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.runtime.BatchStatus;
 import javax.batch.runtime.JobInstance;
@@ -43,10 +44,10 @@ public class RuntimeJobExecution {
     private Integer partitionInstance = null;
     private Collection<Closeable> releasables = new ArrayList<Closeable>();
 
-    public RuntimeJobExecution(final JobInstance jobInstance, final long executionId) {
+    public RuntimeJobExecution(final JobInstance jobInstance, final long executionId, final PersistenceManagerService persistenceManagerService) {
         this.jobInstance = jobInstance;
         this.executionId = executionId;
-        this.operatorJobExecution = new JobExecutionImpl(executionId, jobInstance.getInstanceId());
+        this.operatorJobExecution = new JobExecutionImpl(executionId, jobInstance.getInstanceId(), persistenceManagerService);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
index f568205..a872c30 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
@@ -24,6 +24,7 @@ import org.apache.batchee.jaxb.Listener;
 import org.apache.batchee.jaxb.Listeners;
 import org.apache.batchee.jaxb.Property;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 
 import javax.batch.api.chunk.listener.ChunkListener;
 import javax.batch.api.chunk.listener.ItemProcessListener;
@@ -46,29 +47,30 @@ import java.util.concurrent.ConcurrentHashMap;
 
 
 public class ListenerFactory {
-    private List<ListenerInfo> jobLevelListenerInfo = null;
-
-    private Map<String, List<ListenerInfo>> stepLevelListenerInfo = new ConcurrentHashMap<String, List<ListenerInfo>>();
+    private final BatchArtifactFactory factory;
+    private final List<ListenerInfo> jobLevelListenerInfo;
+    private final Map<String, List<ListenerInfo>> stepLevelListenerInfo = new ConcurrentHashMap<String, List<ListenerInfo>>();
 
     /*
      * Build job-level ListenerInfo(s) up-front, but build step-level ones
      * lazily.
      */
-    public ListenerFactory(final JSLJob jobModel, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
+    public ListenerFactory(final BatchArtifactFactory factory, final JSLJob jobModel, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
         jobLevelListenerInfo = new ArrayList<ListenerInfo>();
+        this.factory = factory;
 
         Listeners jobLevelListeners = jobModel.getListeners();
 
-        jobLevelListenerInfo.addAll(globalListeners("org.apache.batchee.job.listeners.before", injectionRefs, execution));
+        jobLevelListenerInfo.addAll(globalListeners(factory, "org.apache.batchee.job.listeners.before", injectionRefs, execution));
         if (jobLevelListeners != null) {
             for (final Listener listener : jobLevelListeners.getListenerList()) {
-                jobLevelListenerInfo.add(buildListenerInfo(listener, injectionRefs, execution));
+                jobLevelListenerInfo.add(buildListenerInfo(factory, listener, injectionRefs, execution));
             }
         }
-        jobLevelListenerInfo.addAll(globalListeners("org.apache.batchee.job.listeners.after", injectionRefs, execution));
+        jobLevelListenerInfo.addAll(globalListeners(factory, "org.apache.batchee.job.listeners.after", injectionRefs, execution));
     }
 
-    private static Collection<ListenerInfo> globalListeners(final String key, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
+    private static Collection<ListenerInfo> globalListeners(final BatchArtifactFactory factory, final String key, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
         final String globalListeners = ServicesManager.value(key, null);
         if (globalListeners != null) {
             final String[] refs = globalListeners.split(",");
@@ -78,7 +80,7 @@ public class ListenerFactory {
                 final Listener listener = new Listener();
                 listener.setRef(ref);
 
-                list.add(buildListenerInfo(listener, injectionRefs, execution));
+                list.add(buildListenerInfo(factory, listener, injectionRefs, execution));
             }
 
             return list;
@@ -91,7 +93,7 @@ public class ListenerFactory {
      * 
      * @JobListener, even if that is the only type of listener annotation found.
      */
-    private List<ListenerInfo> getStepListenerInfo(final Step step, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
+    private List<ListenerInfo> getStepListenerInfo(final BatchArtifactFactory factory, final Step step, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
         List<ListenerInfo> stepListenerInfoList = stepLevelListenerInfo.get(step.getId());
         if (stepListenerInfoList == null) {
             synchronized (this) {
@@ -100,28 +102,28 @@ public class ListenerFactory {
                     stepListenerInfoList = new ArrayList<ListenerInfo>();
                     stepLevelListenerInfo.put(step.getId(), stepListenerInfoList);
 
-                    stepListenerInfoList.addAll(globalListeners("org.apache.batchee.step.listeners.before", injectionRefs, execution));
+                    stepListenerInfoList.addAll(globalListeners(factory, "org.apache.batchee.step.listeners.before", injectionRefs, execution));
 
                     final Listeners stepLevelListeners = step.getListeners();
                     if (stepLevelListeners != null) {
                         for (final Listener listener : stepLevelListeners.getListenerList()) {
-                            stepListenerInfoList.add(buildListenerInfo(listener, injectionRefs, execution));
+                            stepListenerInfoList.add(buildListenerInfo(factory, listener, injectionRefs, execution));
                         }
                     }
 
-                    stepListenerInfoList.addAll(globalListeners("org.apache.batchee.step.listeners.after", injectionRefs, execution));
+                    stepListenerInfoList.addAll(globalListeners(factory, "org.apache.batchee.step.listeners.after", injectionRefs, execution));
                 }
             }
         }
         return stepListenerInfoList;
     }
 
-    private static ListenerInfo buildListenerInfo(final Listener listener, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
+    private static ListenerInfo buildListenerInfo(final BatchArtifactFactory factory, final Listener listener, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
         final String id = listener.getRef();
         final List<Property> propList = (listener.getProperties() == null) ? null : listener.getProperties().getPropertyList();
 
         injectionRefs.setProps(propList);
-        final Object listenerArtifact = ProxyFactory.loadArtifact(id, injectionRefs, execution);
+        final Object listenerArtifact = ProxyFactory.loadArtifact(factory, id, injectionRefs, execution);
         if (listenerArtifact == null) {
             throw new IllegalArgumentException("Load of artifact id: " + id + " returned <null>.");
         }
@@ -140,7 +142,7 @@ public class ListenerFactory {
     }
 
     public List<ChunkListenerProxy> getChunkListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<ChunkListenerProxy> retVal = new ArrayList<ChunkListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isChunkListener()) {
@@ -154,7 +156,7 @@ public class ListenerFactory {
     }
 
     public List<ItemProcessListenerProxy> getItemProcessListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<ItemProcessListenerProxy> retVal = new ArrayList<ItemProcessListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isItemProcessListener()) {
@@ -168,7 +170,7 @@ public class ListenerFactory {
     }
 
     public List<ItemReadListenerProxy> getItemReadListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<ItemReadListenerProxy> retVal = new ArrayList<ItemReadListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isItemReadListener()) {
@@ -182,7 +184,7 @@ public class ListenerFactory {
     }
 
     public List<ItemWriteListenerProxy> getItemWriteListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<ItemWriteListenerProxy> retVal = new ArrayList<ItemWriteListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isItemWriteListener()) {
@@ -196,7 +198,7 @@ public class ListenerFactory {
     }
 
     public List<RetryProcessListenerProxy> getRetryProcessListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<RetryProcessListenerProxy> retVal = new ArrayList<RetryProcessListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isRetryProcessListener()) {
@@ -210,7 +212,7 @@ public class ListenerFactory {
     }
 
     public List<RetryReadListenerProxy> getRetryReadListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<RetryReadListenerProxy> retVal = new ArrayList<RetryReadListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isRetryReadListener()) {
@@ -224,7 +226,7 @@ public class ListenerFactory {
     }
 
     public List<RetryWriteListenerProxy> getRetryWriteListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<RetryWriteListenerProxy> retVal = new ArrayList<RetryWriteListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isRetryWriteListener()) {
@@ -238,7 +240,7 @@ public class ListenerFactory {
     }
 
     public List<SkipProcessListenerProxy> getSkipProcessListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<SkipProcessListenerProxy> retVal = new ArrayList<SkipProcessListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isSkipProcessListener()) {
@@ -252,7 +254,7 @@ public class ListenerFactory {
     }
 
     public List<SkipReadListenerProxy> getSkipReadListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<SkipReadListenerProxy> retVal = new ArrayList<SkipReadListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isSkipReadListener()) {
@@ -266,7 +268,7 @@ public class ListenerFactory {
     }
 
     public List<SkipWriteListenerProxy> getSkipWriteListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<SkipWriteListenerProxy> retVal = new ArrayList<SkipWriteListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isSkipWriteListener()) {
@@ -280,7 +282,7 @@ public class ListenerFactory {
     }
 
     public List<StepListenerProxy> getStepListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+        final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
         final List<StepListenerProxy> retVal = new ArrayList<StepListenerProxy>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isStepListener()) {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/proxy/ProxyFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ProxyFactory.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ProxyFactory.java
index de27560..3a1c487 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ProxyFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/proxy/ProxyFactory.java
@@ -18,7 +18,6 @@ package org.apache.batchee.container.proxy;
 
 import org.apache.batchee.container.impl.StepContextImpl;
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
-import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.spi.BatchArtifactFactory;
 
 import javax.batch.api.Batchlet;
@@ -36,13 +35,12 @@ import javax.batch.api.partition.PartitionReducer;
  * Introduce a level of indirection so proxies are not instantiated directly by newing them up.
  */
 public class ProxyFactory {
-    private static final BatchArtifactFactory ARTIFACT_FACTORY = ServicesManager.service(BatchArtifactFactory.class);
     private static final ThreadLocal<InjectionReferences> INJECTION_CONTEXT = new ThreadLocal<InjectionReferences>();
 
-    protected static Object loadArtifact(final String id, final InjectionReferences injectionReferences, final RuntimeJobExecution execution) {
+    protected static Object loadArtifact(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionReferences, final RuntimeJobExecution execution) {
         INJECTION_CONTEXT.set(injectionReferences);
         try {
-            final BatchArtifactFactory.Instance instance = ARTIFACT_FACTORY.load(id);
+            final BatchArtifactFactory.Instance instance = factory.load(id);
             if (instance == null) {
                 return null;
             }
@@ -65,15 +63,15 @@ public class ProxyFactory {
     /*
      * Decider
      */
-    public static DeciderProxy createDeciderProxy(final String id, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
-        return new DeciderProxy(Decider.class.cast(loadArtifact(id, injectionRefs, execution)));
+    public static DeciderProxy createDeciderProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
+        return new DeciderProxy(Decider.class.cast(loadArtifact(factory, id, injectionRefs, execution)));
     }
 
     /*
      * Batchlet artifact
      */
-    public static BatchletProxy createBatchletProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final Batchlet loadedArtifact = (Batchlet) loadArtifact(id, injectionRefs, execution);
+    public static BatchletProxy createBatchletProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+        final Batchlet loadedArtifact = (Batchlet) loadArtifact(factory, id, injectionRefs, execution);
         final BatchletProxy proxy = new BatchletProxy(loadedArtifact);
         proxy.setStepContext(stepContext);
         return proxy;
@@ -83,29 +81,29 @@ public class ProxyFactory {
      * The four main chunk-related artifacts
      */
 
-    public static CheckpointAlgorithmProxy createCheckpointAlgorithmProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final CheckpointAlgorithm loadedArtifact = (CheckpointAlgorithm) loadArtifact(id, injectionRefs, execution);
+    public static CheckpointAlgorithmProxy createCheckpointAlgorithmProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+        final CheckpointAlgorithm loadedArtifact = (CheckpointAlgorithm) loadArtifact(factory, id, injectionRefs, execution);
         final CheckpointAlgorithmProxy proxy = new CheckpointAlgorithmProxy(loadedArtifact);
         proxy.setStepContext(stepContext);
         return proxy;
     }
 
-    public static ItemReaderProxy createItemReaderProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final ItemReader loadedArtifact = (ItemReader) loadArtifact(id, injectionRefs, execution);
+    public static ItemReaderProxy createItemReaderProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+        final ItemReader loadedArtifact = (ItemReader) loadArtifact(factory, id, injectionRefs, execution);
         final ItemReaderProxy proxy = new ItemReaderProxy(loadedArtifact);
         proxy.setStepContext(stepContext);
         return proxy;
     }
 
-    public static ItemProcessorProxy createItemProcessorProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final ItemProcessor loadedArtifact = (ItemProcessor) loadArtifact(id, injectionRefs, execution);
+    public static ItemProcessorProxy createItemProcessorProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+        final ItemProcessor loadedArtifact = (ItemProcessor) loadArtifact(factory, id, injectionRefs, execution);
         final ItemProcessorProxy proxy = new ItemProcessorProxy(loadedArtifact);
         proxy.setStepContext(stepContext);
         return proxy;
     }
 
-    public static ItemWriterProxy createItemWriterProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final ItemWriter loadedArtifact = (ItemWriter) loadArtifact(id, injectionRefs, execution);
+    public static ItemWriterProxy createItemWriterProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+        final ItemWriter loadedArtifact = (ItemWriter) loadArtifact(factory, id, injectionRefs, execution);
         final ItemWriterProxy proxy = new ItemWriterProxy(loadedArtifact);
         proxy.setStepContext(stepContext);
         return proxy;
@@ -115,29 +113,29 @@ public class ProxyFactory {
      * The four partition-related artifacts
      */
 
-    public static PartitionReducerProxy createPartitionReducerProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final PartitionReducer loadedArtifact = (PartitionReducer) loadArtifact(id, injectionRefs, execution);
+    public static PartitionReducerProxy createPartitionReducerProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+        final PartitionReducer loadedArtifact = (PartitionReducer) loadArtifact(factory, id, injectionRefs, execution);
         final PartitionReducerProxy proxy = new PartitionReducerProxy(loadedArtifact);
         proxy.setStepContext(stepContext);
         return proxy;
     }
 
-    public static PartitionMapperProxy createPartitionMapperProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final PartitionMapper loadedArtifact = (PartitionMapper) loadArtifact(id, injectionRefs, execution);
+    public static PartitionMapperProxy createPartitionMapperProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+        final PartitionMapper loadedArtifact = (PartitionMapper) loadArtifact(factory, id, injectionRefs, execution);
         final PartitionMapperProxy proxy = new PartitionMapperProxy(loadedArtifact);
         proxy.setStepContext(stepContext);
         return proxy;
     }
 
-    public static PartitionAnalyzerProxy createPartitionAnalyzerProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final PartitionAnalyzer loadedArtifact = (PartitionAnalyzer) loadArtifact(id, injectionRefs, execution);
+    public static PartitionAnalyzerProxy createPartitionAnalyzerProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+        final PartitionAnalyzer loadedArtifact = (PartitionAnalyzer) loadArtifact(factory, id, injectionRefs, execution);
         final PartitionAnalyzerProxy proxy = new PartitionAnalyzerProxy(loadedArtifact);
         proxy.setStepContext(stepContext);
         return proxy;
     }
 
-    public static PartitionCollectorProxy createPartitionCollectorProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
-        final PartitionCollector loadedArtifact = (PartitionCollector) loadArtifact(id, injectionRefs, execution);
+    public static PartitionCollectorProxy createPartitionCollectorProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+        final PartitionCollector loadedArtifact = (PartitionCollector) loadArtifact(factory, id, injectionRefs, execution);
         final PartitionCollectorProxy proxy = new PartitionCollectorProxy(loadedArtifact);
         proxy.setStepContext(stepContext);
         return proxy;

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java b/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
index 46cfaac..8418127 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
@@ -40,10 +40,7 @@ import org.apache.batchee.spi.TransactionManagementService;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
+import java.lang.reflect.Constructor;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -82,8 +79,8 @@ public class ServicesManager implements BatchContainerConstants {
         servicesManagerLocator = locator;
     }
 
-    public static <T extends BatchService> T service(final Class<T> api) {
-        return api.cast(Proxy.newProxyInstance(ServicesManager.class.getClassLoader(), new Class<?>[]{ api }, new ServiceHandler<T>(api)));
+    public static ServicesManager find() {
+        return servicesManagerLocator.find();
     }
 
     public static String value(final String key, final String defaultValue) {
@@ -149,7 +146,7 @@ public class ServicesManager implements BatchContainerConstants {
         }
     }
 
-    private <T extends BatchService> T getService(final Class<T> clazz) throws BatchContainerServiceException {
+    public <T extends BatchService> T service(final Class<T> clazz) throws BatchContainerServiceException {
         T service = clazz.cast(serviceRegistry.get(clazz.getName()));
         if (service == null) {
             // Probably don't want to be loading two on two different threads so lock the whole table.
@@ -192,9 +189,15 @@ public class ServicesManager implements BatchContainerConstants {
         return service;
     }
 
-    private static <T> T load(final Class<T> expected, final String className) throws Exception {
+    private <T> T load(final Class<T> expected, final String className) throws Exception {
         final Class<?> cls = Class.forName(className);
         if (cls != null) {
+            try {
+                final Constructor<?> constructor = cls.getConstructor(ServicesManager.class);
+                return expected.cast(constructor.newInstance(this));
+            } catch (final Throwable th) {
+                // try no arg constructor
+            }
             if (cls.getConstructor() != null) {
                 return expected.cast(cls.newInstance());
             }
@@ -202,24 +205,5 @@ public class ServicesManager implements BatchContainerConstants {
         }
         throw new Exception("Exception loading Service class " + className + " make sure it exists");
     }
-
-    // just an handler getting the right service instance using the ServicesManagerLocator
-    private static class ServiceHandler<T extends BatchService> implements InvocationHandler {
-        private final Class<T> service;
-
-        public ServiceHandler(final Class<T> api) {
-            this.service = api;
-        }
-
-        @Override
-        public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
-            final T instance = servicesManagerLocator.find().getService(service);
-            try {
-                return method.invoke(instance, args);
-            } catch (final InvocationTargetException ite) {
-                throw ite.getCause();
-            }
-        }
-    }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
index b44a0b4..7dc488a 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
@@ -23,6 +23,7 @@ import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.services.BatchKernelService;
 import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.container.services.JobStatusManagerService;
 import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.BatchFlowInSplitWorkUnit;
 import org.apache.batchee.container.util.BatchPartitionWorkUnit;
@@ -56,10 +57,12 @@ public class DefaultBatchKernel implements BatchKernelService {
 
     private final BatchThreadPoolService executorService;
     private final PersistenceManagerService persistenceService;
+    private final ServicesManager servicesManager;
 
-    public DefaultBatchKernel() {
-        executorService = ServicesManager.service(BatchThreadPoolService.class);
-        persistenceService = ServicesManager.service(PersistenceManagerService.class);
+    public DefaultBatchKernel(final ServicesManager servicesManager) {
+        this.servicesManager = servicesManager;
+        executorService = servicesManager.service(BatchThreadPoolService.class);
+        persistenceService = servicesManager.service(PersistenceManagerService.class);
     }
 
     @Override
@@ -69,11 +72,11 @@ public class DefaultBatchKernel implements BatchKernelService {
 
     @Override
     public InternalJobExecution startJob(final String jobXML, final Properties jobParameters) throws JobStartException {
-        final RuntimeJobExecution jobExecution = JobExecutionHelper.startJob(jobXML, jobParameters);
+        final RuntimeJobExecution jobExecution = JobExecutionHelper.startJob(servicesManager, jobXML, jobParameters);
 
         // TODO - register with status manager
 
-        final BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+        final BatchWorkUnit batchWork = new BatchWorkUnit(servicesManager, jobExecution);
         registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
 
         executorService.executeTask(batchWork, null);
@@ -93,8 +96,8 @@ public class DefaultBatchKernel implements BatchKernelService {
 
     @Override
     public InternalJobExecution restartJob(final long executionId, final Properties jobOverrideProps) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
-        final RuntimeJobExecution jobExecution = JobExecutionHelper.restartJob(executionId, jobOverrideProps);
-        final BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+        final RuntimeJobExecution jobExecution = JobExecutionHelper.restartJob(servicesManager, executionId, jobOverrideProps);
+        final BatchWorkUnit batchWork = new BatchWorkUnit(servicesManager, jobExecution);
 
         registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
 
@@ -124,7 +127,7 @@ public class DefaultBatchKernel implements BatchKernelService {
     }
 
     public InternalJobExecution getJobExecution(final long executionId) throws NoSuchJobExecutionException {
-        return JobExecutionHelper.getPersistedJobOperatorJobExecution(executionId);
+        return JobExecutionHelper.getPersistedJobOperatorJobExecution(servicesManager.service(PersistenceManagerService.class), executionId);
     }
 
     @Override
@@ -139,7 +142,7 @@ public class DefaultBatchKernel implements BatchKernelService {
 
     @Override
     public JobInstance getJobInstance(final long executionId) {
-        return JobExecutionHelper.getJobInstance(executionId);
+        return JobExecutionHelper.getJobInstance(servicesManager.service(JobStatusManagerService.class), executionId);
     }
 
 
@@ -158,10 +161,10 @@ public class DefaultBatchKernel implements BatchKernelService {
         int instance = 0;
         for (final JSLJob parallelJob : jobModels) {
             final Properties partitionProps = (partitionPropertiesArray == null) ? null : partitionPropertiesArray[instance];
-            final RuntimeJobExecution jobExecution = JobExecutionHelper.startPartition(parallelJob, partitionProps);
+            final RuntimeJobExecution jobExecution = JobExecutionHelper.startPartition(servicesManager, parallelJob, partitionProps);
             jobExecution.setPartitionInstance(instance);
 
-            final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+            final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(jobExecution, config, servicesManager);
 
             registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
 
@@ -190,13 +193,13 @@ public class DefaultBatchKernel implements BatchKernelService {
                 final long execId = getMostRecentExecutionId(parallelJob);
                 final RuntimeJobExecution jobExecution;
                 try {
-                    jobExecution = JobExecutionHelper.restartPartition(execId, parallelJob, partitionProps);
+                    jobExecution = JobExecutionHelper.restartPartition(servicesManager, execId, parallelJob, partitionProps);
                     jobExecution.setPartitionInstance(instance);
                 } catch (final NoSuchJobExecutionException e) {
                     throw new IllegalStateException("Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId, e);
                 }
 
-                final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+                final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(jobExecution, config, servicesManager);
                 registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
 
                 batchWorkUnits.add(batchWork);
@@ -219,8 +222,8 @@ public class DefaultBatchKernel implements BatchKernelService {
     public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(final FlowInSplitBuilderConfig config) {
         final JSLJob parallelJob = config.getJobModel();
 
-        final RuntimeFlowInSplitExecution execution = JobExecutionHelper.startFlowInSplit(parallelJob);
-        final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, execution, config);
+        final RuntimeFlowInSplitExecution execution = JobExecutionHelper.startFlowInSplit(servicesManager, parallelJob);
+        final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(execution, config, servicesManager);
 
         registerCurrentInstanceAndExecution(execution, batchWork.getController());
         return batchWork;
@@ -256,12 +259,12 @@ public class DefaultBatchKernel implements BatchKernelService {
         final long execId = getMostRecentExecutionId(jobModel);
         final RuntimeFlowInSplitExecution jobExecution;
         try {
-            jobExecution = JobExecutionHelper.restartFlowInSplit(execId, jobModel);
+            jobExecution = JobExecutionHelper.restartFlowInSplit(servicesManager, execId, jobModel);
         } catch (final NoSuchJobExecutionException e) {
             throw new IllegalStateException("Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId, e);
         }
 
-        final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, jobExecution, config);
+        final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(jobExecution, config, servicesManager);
 
         registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
         return batchWork;

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java b/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
index dbea45f..3255ff9 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
@@ -1,13 +1,12 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Copyright 2012,2013 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
index b99e881..056b05e 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
@@ -934,7 +934,7 @@ public class JDBCPersistenceManager implements PersistenceManagerService {
 
                 final String jobName = rs.getString(dictionary.jobInstanceColumns(3));
 
-                final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instanceId);
+                final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instanceId, this);
                 jobEx.setCreateTime(createtime);
                 jobEx.setStartTime(starttime);
                 jobEx.setEndTime(endtime);
@@ -979,7 +979,7 @@ public class JDBCPersistenceManager implements PersistenceManagerService {
                 final String exitStatus = rs.getString(dictionary.jobExecutionColumns(4));
                 final String jobName = rs.getString(dictionary.jobInstanceColumns(3));
 
-                final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, jobInstanceId);
+                final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, jobInstanceId, this);
                 jobEx.setCreateTime(createtime);
                 jobEx.setStartTime(starttime);
                 jobEx.setEndTime(endtime);
@@ -1154,7 +1154,7 @@ public class JDBCPersistenceManager implements PersistenceManagerService {
     public RuntimeJobExecution createJobExecution(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus) {
         final Timestamp now = new Timestamp(System.currentTimeMillis());
         final long newExecutionId = createRuntimeJobExecutionEntry(jobInstance, jobParameters, batchStatus, now);
-        final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, newExecutionId);
+        final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, newExecutionId, this);
         jobExecution.setBatchStatus(batchStatus.name());
         jobExecution.setCreateTime(now);
         jobExecution.setLastUpdateTime(now);
@@ -1195,7 +1195,7 @@ public class JDBCPersistenceManager implements PersistenceManagerService {
     public RuntimeFlowInSplitExecution createFlowInSplitExecution(final JobInstance jobInstance, final BatchStatus batchStatus) {
         final Timestamp now = new Timestamp(System.currentTimeMillis());
         final long newExecutionId = createRuntimeJobExecutionEntry(jobInstance, null, batchStatus, now);
-        final RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, newExecutionId);
+        final RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, newExecutionId, this);
         flowExecution.setBatchStatus(batchStatus.name());
         flowExecution.setCreateTime(now);
         flowExecution.setLastUpdateTime(now);

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
index 8c0f6f0..5d8a4fe 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
@@ -344,7 +344,7 @@ public class JPAPersistenceService implements PersistenceManagerService {
 
             final List<InternalJobExecution> result = new ArrayList<InternalJobExecution>(list.size());
             for (final JobExecutionEntity entity : list) {
-                final JobExecutionImpl jobEx = new JobExecutionImpl(entity.getExecutionId(), jobInstanceId);
+                final JobExecutionImpl jobEx = new JobExecutionImpl(entity.getExecutionId(), jobInstanceId, this);
                 jobEx.setCreateTime(entity.getCreateTime());
                 jobEx.setStartTime(entity.getStartTime());
                 jobEx.setEndTime(entity.getEndTime());
@@ -390,7 +390,7 @@ public class JPAPersistenceService implements PersistenceManagerService {
         try {
             final JobExecutionEntity instance = em.find(JobExecutionEntity.class, jobExecutionId);
 
-            final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instance.getInstance().getJobInstanceId());
+            final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instance.getInstance().getJobInstanceId(), this);
             jobEx.setCreateTime(instance.getCreateTime());
             jobEx.setStartTime(instance.getStartTime());
             jobEx.setEndTime(instance.getEndTime());
@@ -496,7 +496,7 @@ public class JPAPersistenceService implements PersistenceManagerService {
                 em.persist(instance);
                 txProvider.commit(tx);
 
-                final RuntimeFlowInSplitExecution jobExecution = new RuntimeFlowInSplitExecution(jobInstance, instance.getExecutionId());
+                final RuntimeFlowInSplitExecution jobExecution = new RuntimeFlowInSplitExecution(jobInstance, instance.getExecutionId(), this);
                 jobExecution.setBatchStatus(batchStatus.name());
                 jobExecution.setCreateTime(instance.getCreateTime());
                 jobExecution.setLastUpdateTime(instance.getCreateTime());
@@ -561,7 +561,7 @@ public class JPAPersistenceService implements PersistenceManagerService {
                 em.persist(execution);
                 txProvider.commit(tx);
 
-                final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, execution.getExecutionId());
+                final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, execution.getExecutionId(), this);
                 jobExecution.setBatchStatus(batchStatus.name());
                 jobExecution.setCreateTime(execution.getCreateTime());
                 jobExecution.setLastUpdateTime(execution.getCreateTime());

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManager.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManager.java
index 48e9ee5..0fbc623 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManager.java
@@ -51,7 +51,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -324,7 +323,7 @@ public class MemoryPersistenceManager implements PersistenceManagerService {
         final Structures.ExecutionInstanceData executionInstanceData = createRuntimeJobExecutionEntry(jobInstance, jobParameters, batchStatus, now);
         executionInstanceData.execution.setJobName(jobInstance.getJobName());
 
-        final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, executionInstanceData.execution.getExecutionId());
+        final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, executionInstanceData.execution.getExecutionId(), this);
         jobExecution.setBatchStatus(batchStatus.name());
         jobExecution.setCreateTime(now);
         jobExecution.setLastUpdateTime(now);
@@ -335,7 +334,7 @@ public class MemoryPersistenceManager implements PersistenceManagerService {
     private Structures.ExecutionInstanceData createRuntimeJobExecutionEntry(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus, final Timestamp now) {
         final Structures.ExecutionInstanceData executionInstanceData = new Structures.ExecutionInstanceData();
         final long id = data.executionInstanceIdGenerator.getAndIncrement();
-        executionInstanceData.execution = new JobExecutionImpl(id, jobInstance.getInstanceId());
+        executionInstanceData.execution = new JobExecutionImpl(id, jobInstance.getInstanceId(), this);
         executionInstanceData.execution.setExecutionId(id);
         executionInstanceData.execution.setInstanceId(jobInstance.getInstanceId());
         executionInstanceData.execution.setBatchStatus(batchStatus.name());
@@ -608,7 +607,7 @@ public class MemoryPersistenceManager implements PersistenceManagerService {
     public RuntimeFlowInSplitExecution createFlowInSplitExecution(final JobInstance jobInstance, final BatchStatus batchStatus) {
         final Timestamp now = new Timestamp(System.currentTimeMillis());
         final Structures.ExecutionInstanceData executionInstanceData = createRuntimeJobExecutionEntry(jobInstance, null, batchStatus, now);
-        final RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, executionInstanceData.execution.getExecutionId());
+        final RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, executionInstanceData.execution.getExecutionId(), this);
         flowExecution.setBatchStatus(batchStatus.name());
         flowExecution.setCreateTime(now);
         flowExecution.setLastUpdateTime(now);

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/status/DefaultJobStatusManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/status/DefaultJobStatusManager.java b/jbatch/src/main/java/org/apache/batchee/container/services/status/DefaultJobStatusManager.java
index f7a71b2..c0e15e6 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/status/DefaultJobStatusManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/status/DefaultJobStatusManager.java
@@ -18,10 +18,10 @@ package org.apache.batchee.container.services.status;
 
 import org.apache.batchee.container.exception.BatchContainerServiceException;
 import org.apache.batchee.container.services.JobStatusManagerService;
-import org.apache.batchee.spi.PersistenceManagerService;
 import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.JobStatus;
 import org.apache.batchee.container.status.StepStatus;
+import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.runtime.BatchStatus;
 import java.util.Properties;
@@ -29,6 +29,10 @@ import java.util.Properties;
 public class DefaultJobStatusManager implements JobStatusManagerService {
     private PersistenceManagerService persistenceManager;
 
+    public DefaultJobStatusManager(final ServicesManager servicesManager) {
+        this.persistenceManager = servicesManager.service(PersistenceManagerService.class);
+    }
+
     @Override
     public JobStatus createJobStatus(long jobInstanceId) throws BatchContainerServiceException {
         return persistenceManager.createJobStatus(jobInstanceId);
@@ -117,7 +121,7 @@ public class DefaultJobStatusManager implements JobStatusManagerService {
 
     @Override
     public void init(final Properties batchConfig) throws BatchContainerServiceException {
-        persistenceManager = ServicesManager.service(PersistenceManagerService.class);
+        // no-op
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/util/BatchFlowInSplitWorkUnit.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/util/BatchFlowInSplitWorkUnit.java b/jbatch/src/main/java/org/apache/batchee/container/util/BatchFlowInSplitWorkUnit.java
index 2f2bd7c..6f8fb90 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/util/BatchFlowInSplitWorkUnit.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/util/BatchFlowInSplitWorkUnit.java
@@ -19,18 +19,19 @@ package org.apache.batchee.container.util;
 import org.apache.batchee.container.impl.controller.FlowInSplitThreadRootController;
 import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
 import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
 
 import java.util.concurrent.BlockingQueue;
 
 public class BatchFlowInSplitWorkUnit extends BatchParallelWorkUnit {
     protected final BlockingQueue<BatchFlowInSplitWorkUnit> completedThreadQueue;
 
-    public BatchFlowInSplitWorkUnit(final BatchKernelService batchKernelService,
-                                    final RuntimeFlowInSplitExecution jobExecution,
-                                    final FlowInSplitBuilderConfig config) {
-        super(batchKernelService, jobExecution, true);
+    public BatchFlowInSplitWorkUnit(final RuntimeFlowInSplitExecution jobExecution,
+                                    final FlowInSplitBuilderConfig config,
+                                    final ServicesManager manager) {
+        super(jobExecution, true, manager);
         this.completedThreadQueue = config.getCompletedQueue();
-        this.controller = new FlowInSplitThreadRootController(jobExecution, config);
+        this.controller = new FlowInSplitThreadRootController(jobExecution, config, manager);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/util/BatchParallelWorkUnit.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/util/BatchParallelWorkUnit.java b/jbatch/src/main/java/org/apache/batchee/container/util/BatchParallelWorkUnit.java
index f25a4c7..2aeb1db 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/util/BatchParallelWorkUnit.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/util/BatchParallelWorkUnit.java
@@ -18,6 +18,7 @@ package org.apache.batchee.container.util;
 
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
 
 /*
  * I took out the 'work type' constant since I don't see that we want to use
@@ -26,7 +27,8 @@ import org.apache.batchee.container.services.BatchKernelService;
  * perspective, as it returns a 'success' boolean.
  */
 public abstract class BatchParallelWorkUnit extends BatchWorkUnit {
-    public BatchParallelWorkUnit(final BatchKernelService batchKernel, final RuntimeJobExecution jobExecutionImpl, final boolean notifyCallbackWhenDone) {
-        super(batchKernel, jobExecutionImpl, notifyCallbackWhenDone);
+    public BatchParallelWorkUnit(final RuntimeJobExecution jobExecutionImpl,
+                                 final boolean notifyCallbackWhenDone, final ServicesManager manager) {
+        super(manager, jobExecutionImpl, notifyCallbackWhenDone);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/util/BatchPartitionWorkUnit.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/util/BatchPartitionWorkUnit.java b/jbatch/src/main/java/org/apache/batchee/container/util/BatchPartitionWorkUnit.java
index f139c08..cde223f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/util/BatchPartitionWorkUnit.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/util/BatchPartitionWorkUnit.java
@@ -19,18 +19,19 @@ package org.apache.batchee.container.util;
 import org.apache.batchee.container.impl.controller.PartitionThreadRootController;
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
 
 import java.util.concurrent.BlockingQueue;
 
 public class BatchPartitionWorkUnit extends BatchParallelWorkUnit {
     protected final BlockingQueue<BatchPartitionWorkUnit> completedThreadQueue;
 
-    public BatchPartitionWorkUnit(BatchKernelService batchKernelService,
-                                  RuntimeJobExecution jobExecution,
-                                  PartitionsBuilderConfig config) {
-        super(batchKernelService, jobExecution, true);
+    public BatchPartitionWorkUnit(final RuntimeJobExecution jobExecution,
+                                  final PartitionsBuilderConfig config,
+                                  final ServicesManager manager) {
+        super(jobExecution, true, manager);
         this.completedThreadQueue = config.getCompletedQueue();
-        this.controller = new PartitionThreadRootController(jobExecution, config);
+        this.controller = new PartitionThreadRootController(jobExecution, config, manager);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/util/BatchWorkUnit.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/util/BatchWorkUnit.java b/jbatch/src/main/java/org/apache/batchee/container/util/BatchWorkUnit.java
index 3ddf59b..096e78f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/util/BatchWorkUnit.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/util/BatchWorkUnit.java
@@ -21,6 +21,7 @@ import org.apache.batchee.container.exception.BatchContainerRuntimeException;
 import org.apache.batchee.container.impl.controller.JobController;
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
 
 import javax.batch.runtime.BatchStatus;
 import java.io.PrintWriter;
@@ -39,16 +40,16 @@ public class BatchWorkUnit implements Runnable {
 
     protected boolean notifyCallbackWhenDone;
 
-    public BatchWorkUnit(final BatchKernelService batchKernel, final RuntimeJobExecution jobExecutionImpl) {
-        this(batchKernel, jobExecutionImpl, true);
+    public BatchWorkUnit(final ServicesManager manager, final RuntimeJobExecution jobExecutionImpl) {
+        this(manager, jobExecutionImpl, true);
     }
 
-    public BatchWorkUnit(final BatchKernelService batchKernel, final RuntimeJobExecution jobExecutionImpl,
+    public BatchWorkUnit(final ServicesManager manager, final RuntimeJobExecution jobExecutionImpl,
                          final boolean notifyCallbackWhenDone) {
-        this.setBatchKernel(batchKernel);
+        this.setBatchKernel(manager.service(BatchKernelService.class));
         this.setJobExecutionImpl(jobExecutionImpl);
         this.setNotifyCallbackWhenDone(notifyCallbackWhenDone);
-        this.controller = new JobController(jobExecutionImpl);
+        this.controller = new JobController(jobExecutionImpl, manager);
     }
 
     public ThreadRootController getController() {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/servlet/CleanUpWebappListener.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/servlet/CleanUpWebappListener.java b/jbatch/src/main/java/org/apache/batchee/servlet/CleanUpWebappListener.java
index 75baa24..9beef4c 100644
--- a/jbatch/src/main/java/org/apache/batchee/servlet/CleanUpWebappListener.java
+++ b/jbatch/src/main/java/org/apache/batchee/servlet/CleanUpWebappListener.java
@@ -36,7 +36,7 @@ public class CleanUpWebappListener implements ServletContextListener {
 
     @Override
     public void contextDestroyed(final ServletContextEvent sce) {
-        final BatchThreadPoolService threadPoolService = ServicesManager.service(BatchThreadPoolService.class);
+        final BatchThreadPoolService threadPoolService = ServicesManager.find().service(BatchThreadPoolService.class);
         if (CleanUpWebappListener.class.getClassLoader() == sce.getServletContext().getClassLoader()) {
             threadPoolService.shutdown();
 

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/test/java/org/apache/batchee/test/jmx/JMXTest.java
----------------------------------------------------------------------
diff --git a/jbatch/src/test/java/org/apache/batchee/test/jmx/JMXTest.java b/jbatch/src/test/java/org/apache/batchee/test/jmx/JMXTest.java
index 0c454fd..39bcfb6 100644
--- a/jbatch/src/test/java/org/apache/batchee/test/jmx/JMXTest.java
+++ b/jbatch/src/test/java/org/apache/batchee/test/jmx/JMXTest.java
@@ -56,7 +56,7 @@ public class JMXTest {
     }
 
     private static void clearPersistence(final JobOperator jobOperator) {
-        final PersistenceManagerService service = ServicesManager.service(PersistenceManagerService.class);
+        final PersistenceManagerService service = ServicesManager.find().service(PersistenceManagerService.class);
         for (final String name : jobOperator.getJobNames()) {
             for (final JobInstance id : jobOperator.getJobInstances(name, 0, Integer.MAX_VALUE)) {
                 service.cleanUp(id.getInstanceId());
@@ -66,7 +66,7 @@ public class JMXTest {
 
     @AfterClass
     public static void deleteJob() throws Exception {
-        ServicesManager.service(PersistenceManagerService.class).cleanUp(id);
+        ServicesManager.find().service(PersistenceManagerService.class).cleanUp(id);
     }
 
     private static Object attr(final String name) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/test/resources/suites/dev-suite.xml
----------------------------------------------------------------------
diff --git a/jbatch/src/test/resources/suites/dev-suite.xml b/jbatch/src/test/resources/suites/dev-suite.xml
index f497102..4c30caa 100644
--- a/jbatch/src/test/resources/suites/dev-suite.xml
+++ b/jbatch/src/test/resources/suites/dev-suite.xml
@@ -27,11 +27,7 @@
     <classes>
       <class name="com.ibm.jbatch.tck.tests.jslxml.ChunkTests">
         <methods>
-          <include name="testChunkSkipReadNoSkipChildEx"/>
-          <!--
-          <include name="testChunkRetryMultipleExceptions"/>
-          <include name="testChunkSkipMultipleExceptions"/>
-          -->
+          <include name="testChunkOnErrorListener"/>
         </methods>
       </class>
     </classes>


[2/2] git commit: BATCHEE-9 ServicesManager shouldn't be contextual once an action started

Posted by rm...@apache.org.
BATCHEE-9 ServicesManager shouldn't be contextual once an action started


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/8ddf9f75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/8ddf9f75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/8ddf9f75

Branch: refs/heads/master
Commit: 8ddf9f754b731efe18ec48aedcc3cb0aa98814d3
Parents: b303e37
Author: Romain Manni-Bucau <rm...@apache.org>
Authored: Thu Jan 2 10:27:34 2014 +0100
Committer: Romain Manni-Bucau <rm...@apache.org>
Committed: Thu Jan 2 10:27:34 2014 +0100

----------------------------------------------------------------------
 .../container/impl/JobExecutionImpl.java        | 18 ++--
 .../batchee/container/impl/JobOperatorImpl.java | 93 +++++++++++---------
 .../impl/controller/BaseStepController.java     | 42 +++++----
 .../impl/controller/DecisionController.java     | 11 ++-
 .../ExecutionElementControllerFactory.java      | 25 +++---
 .../impl/controller/ExecutionTransitioner.java  | 20 +++--
 .../impl/controller/FlowController.java         |  7 +-
 .../FlowInSplitThreadRootController.java        |  5 +-
 .../impl/controller/JobController.java          |  9 +-
 .../controller/JobThreadRootController.java     | 21 +++--
 .../PartitionThreadRootController.java          |  5 +-
 .../controller/PartitionedStepController.java   | 30 ++++---
 .../SingleThreadedStepController.java           | 14 ++-
 .../impl/controller/SplitController.java        |  6 +-
 .../batchlet/BatchletStepController.java        | 11 ++-
 .../chunk/CheckpointAlgorithmFactory.java       |  5 +-
 .../controller/chunk/CheckpointManager.java     |  5 +-
 .../controller/chunk/ChunkStepController.java   | 45 +++++-----
 .../impl/jobinstance/JobExecutionHelper.java    | 92 ++++++++++---------
 .../RuntimeFlowInSplitExecution.java            |  6 +-
 .../impl/jobinstance/RuntimeJobExecution.java   |  5 +-
 .../container/proxy/ListenerFactory.java        | 54 ++++++------
 .../batchee/container/proxy/ProxyFactory.java   | 46 +++++-----
 .../container/services/ServicesManager.java     | 38 +++-----
 .../services/kernel/DefaultBatchKernel.java     | 37 ++++----
 .../services/locator/ClassLoaderLocator.java    | 15 ++--
 .../persistence/JDBCPersistenceManager.java     |  8 +-
 .../persistence/JPAPersistenceService.java      |  8 +-
 .../persistence/MemoryPersistenceManager.java   |  7 +-
 .../status/DefaultJobStatusManager.java         |  8 +-
 .../util/BatchFlowInSplitWorkUnit.java          | 11 +--
 .../container/util/BatchParallelWorkUnit.java   |  6 +-
 .../container/util/BatchPartitionWorkUnit.java  | 11 +--
 .../batchee/container/util/BatchWorkUnit.java   | 11 +--
 .../batchee/servlet/CleanUpWebappListener.java  |  2 +-
 .../org/apache/batchee/test/jmx/JMXTest.java    |  4 +-
 jbatch/src/test/resources/suites/dev-suite.xml  |  6 +-
 37 files changed, 405 insertions(+), 342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
index 0767168..68ecb5e 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
@@ -19,7 +19,6 @@ package org.apache.batchee.container.impl;
 import org.apache.batchee.container.services.InternalJobExecution;
 import org.apache.batchee.spi.PersistenceManagerService;
 import org.apache.batchee.spi.PersistenceManagerService.TimestampType;
-import org.apache.batchee.container.services.ServicesManager;
 
 import javax.batch.runtime.BatchStatus;
 import java.sql.Timestamp;
@@ -27,7 +26,7 @@ import java.util.Date;
 import java.util.Properties;
 
 public class JobExecutionImpl implements InternalJobExecution {
-    private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
+    private final PersistenceManagerService persistenceManagerService;
 
     private long executionID = 0L;
     private long instanceID = 0L;
@@ -50,7 +49,8 @@ public class JobExecutionImpl implements InternalJobExecution {
         this.jobContext = jobContext;
     }
 
-    public JobExecutionImpl(long executionId, long instanceId) {
+    public JobExecutionImpl(final long executionId, final long instanceId, final PersistenceManagerService persistenceManagerService) {
+        this.persistenceManagerService = persistenceManagerService;
         this.executionID = executionId;
         this.instanceID = instanceId;
     }
@@ -61,7 +61,7 @@ public class JobExecutionImpl implements InternalJobExecution {
             return this.jobContext.getBatchStatus();
         } else {
             // old job, retrieve from the backend
-            final String name = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionBatchStatus(executionID);
+            final String name = persistenceManagerService.jobOperatorQueryJobExecutionBatchStatus(executionID);
             if (name != null) {
                 return BatchStatus.valueOf(name);
             }
@@ -71,7 +71,7 @@ public class JobExecutionImpl implements InternalJobExecution {
 
     @Override
     public Date getCreateTime() {
-        final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.CREATE);
+        final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.CREATE);
         if (ts != null) {
             createTime = ts;
         }
@@ -84,7 +84,7 @@ public class JobExecutionImpl implements InternalJobExecution {
 
     @Override
     public Date getEndTime() {
-        final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.END);
+        final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.END);
         if (ts != null) {
             endTime = ts;
         }
@@ -106,7 +106,7 @@ public class JobExecutionImpl implements InternalJobExecution {
             return this.jobContext.getExitStatus();
         }
 
-        final String persistenceExitStatus = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionExitStatus(executionID);
+        final String persistenceExitStatus = persistenceManagerService.jobOperatorQueryJobExecutionExitStatus(executionID);
         if (persistenceExitStatus != null) {
             exitStatus = persistenceExitStatus;
         }
@@ -116,7 +116,7 @@ public class JobExecutionImpl implements InternalJobExecution {
 
     @Override
     public Date getLastUpdatedTime() {
-        final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.LAST_UPDATED);
+        final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.LAST_UPDATED);
         if (ts != null) {
             this.updateTime = ts;
         }
@@ -129,7 +129,7 @@ public class JobExecutionImpl implements InternalJobExecution {
 
     @Override
     public Date getStartTime() {
-        final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.STARTED);
+        final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.STARTED);
         if (ts != null) {
             startTime = ts;
         }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
index c970534..28fab55 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
@@ -101,15 +101,24 @@ public class JobOperatorImpl implements JobOperator {
 
     public static final String JBATCH_ADMIN = "admin";
 
-    private static final BatchKernelService KERNEL_SERVICE = ServicesManager.service(BatchKernelService.class);
-    private static final PersistenceManagerService PERSISTENCE_SERVICE = ServicesManager.service(PersistenceManagerService.class);
-    private static final JobXMLLoaderService XML_LOADER_SERVICE = ServicesManager.service(JobXMLLoaderService.class);
-    private static final JobStatusManagerService STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
-    private static final SecurityService SECURITY_SERVICE= ServicesManager.service(SecurityService.class);
+    private final BatchKernelService kernelService;
+    private final PersistenceManagerService persistenceManagerService;
+    private final JobXMLLoaderService xmlLoaderService;
+    private final JobStatusManagerService statusManagerService;
+    private final SecurityService securityService;
+
+    public JobOperatorImpl() {
+        final ServicesManager servicesManager = ServicesManager.find();
+        kernelService = servicesManager.service(BatchKernelService.class);
+        persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+        xmlLoaderService = servicesManager.service(JobXMLLoaderService.class);
+        statusManagerService = servicesManager.service(JobStatusManagerService.class);
+        securityService = servicesManager.service(SecurityService.class);
+    }
 
     @Override
     public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(Permissions.START.name)) {
+        if (!securityService.isAuthorized(Permissions.START.name)) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
 
@@ -138,18 +147,18 @@ public class JobOperatorImpl implements JobOperator {
             jobParameterWriter.write("Job parameters on start = null");
         }
 
-        final String jobXML = XML_LOADER_SERVICE.loadJSL(jobXMLName);
-        final InternalJobExecution execution = KERNEL_SERVICE.startJob(jobXML, jobParameters);
+        final String jobXML = xmlLoaderService.loadJSL(jobXMLName);
+        final InternalJobExecution execution = kernelService.startJob(jobXML, jobParameters);
         return execution.getExecutionId();
     }
 
     @Override
     public void abandon(final long executionId) throws NoSuchJobExecutionException, JobExecutionIsRunningException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+        if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
 
-        final InternalJobExecution jobEx = PERSISTENCE_SERVICE.jobOperatorGetJobExecution(executionId);
+        final InternalJobExecution jobEx = persistenceManagerService.jobOperatorGetJobExecution(executionId);
 
         // if it is not in STARTED or STARTING state, mark it as ABANDONED
         if (jobEx.getBatchStatus().equals(BatchStatus.STARTED) || jobEx.getBatchStatus().equals(BatchStatus.STARTING)) {
@@ -157,31 +166,31 @@ public class JobOperatorImpl implements JobOperator {
         }
 
         // update table to reflect ABANDONED state
-        PERSISTENCE_SERVICE.updateBatchStatusOnly(jobEx.getExecutionId(), BatchStatus.ABANDONED, new Timestamp(System.currentTimeMillis()));
+        persistenceManagerService.updateBatchStatusOnly(jobEx.getExecutionId(), BatchStatus.ABANDONED, new Timestamp(System.currentTimeMillis()));
 
         // Don't forget to update JOBSTATUS table
-        STATUS_MANAGER_SERVICE.updateJobBatchStatus(jobEx.getInstanceId(), BatchStatus.ABANDONED);
+        statusManagerService.updateJobBatchStatus(jobEx.getInstanceId(), BatchStatus.ABANDONED);
     }
 
     @Override
     public InternalJobExecution getJobExecution(final long executionId)
         throws NoSuchJobExecutionException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+        if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
-        return KERNEL_SERVICE.getJobExecution(executionId);
+        return kernelService.getJobExecution(executionId);
     }
 
     @Override
     public List<JobExecution> getJobExecutions(final JobInstance instance)
         throws NoSuchJobInstanceException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(instance.getInstanceId())) {
+        if (!securityService.isAuthorized(instance.getInstanceId())) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
 
         // Mediate between one
         final List<JobExecution> executions = new ArrayList<JobExecution>();
-        List<InternalJobExecution> executionImpls = PERSISTENCE_SERVICE.jobOperatorGetJobExecutions(instance.getInstanceId());
+        List<InternalJobExecution> executionImpls = persistenceManagerService.jobOperatorGetJobExecutions(instance.getInstanceId());
         if (executionImpls.size() == 0) {
             throw new NoSuchJobInstanceException("Job: " + instance.getJobName() + " does not exist");
         }
@@ -194,20 +203,20 @@ public class JobOperatorImpl implements JobOperator {
     @Override
     public JobInstance getJobInstance(long executionId)
         throws NoSuchJobExecutionException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+        if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
-        return KERNEL_SERVICE.getJobInstance(executionId);
+        return kernelService.getJobInstance(executionId);
     }
 
     @Override
     public int getJobInstanceCount(String jobName) throws NoSuchJobException, JobSecurityException {
         int jobInstanceCount;
-        if (SECURITY_SERVICE.isAuthorized(JBATCH_ADMIN)) {
+        if (securityService.isAuthorized(JBATCH_ADMIN)) {
             // Do an unfiltered query
-            jobInstanceCount = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceCount(jobName);
+            jobInstanceCount = persistenceManagerService.jobOperatorGetJobInstanceCount(jobName);
         } else {
-            jobInstanceCount = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceCount(jobName, SECURITY_SERVICE.getLoggedUser());
+            jobInstanceCount = persistenceManagerService.jobOperatorGetJobInstanceCount(jobName, securityService.getLoggedUser());
         }
 
         if (jobInstanceCount > 0) {
@@ -229,11 +238,11 @@ public class JobOperatorImpl implements JobOperator {
         }
 
         final List<Long> instanceIds;
-        if (SECURITY_SERVICE.isAuthorized(JBATCH_ADMIN)) {
+        if (securityService.isAuthorized(JBATCH_ADMIN)) {
             // Do an unfiltered query
-            instanceIds = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceIds(jobName, start, count);
+            instanceIds = persistenceManagerService.jobOperatorGetJobInstanceIds(jobName, start, count);
         } else {
-            instanceIds = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceIds(jobName, SECURITY_SERVICE.getLoggedUser(), start, count);
+            instanceIds = persistenceManagerService.jobOperatorGetJobInstanceIds(jobName, securityService.getLoggedUser(), start, count);
         }
 
         // get the jobinstance ids associated with this job name
@@ -242,9 +251,9 @@ public class JobOperatorImpl implements JobOperator {
             // for every job instance id
             for (long id : instanceIds) {
                 // get the job instance obj, add it to the list
-                final JobStatus jobStatus = STATUS_MANAGER_SERVICE.getJobStatus(id);
+                final JobStatus jobStatus = statusManagerService.getJobStatus(id);
                 final JobInstance jobInstance = jobStatus.getJobInstance();
-                if (SECURITY_SERVICE.isAuthorized(jobInstance.getInstanceId())) {
+                if (securityService.isAuthorized(jobInstance.getInstanceId())) {
                     jobInstances.add(jobInstance);
                 }
             }
@@ -262,10 +271,10 @@ public class JobOperatorImpl implements JobOperator {
     @Override
     public Set<String> getJobNames() throws JobSecurityException {
         final Set<String> jobNames = new HashSet<String>();
-        final Map<Long, String> data = PERSISTENCE_SERVICE.jobOperatorGetExternalJobInstanceData();
+        final Map<Long, String> data = persistenceManagerService.jobOperatorGetExternalJobInstanceData();
         for (final Map.Entry<Long, String> entry : data.entrySet()) {
             long instanceId = entry.getKey();
-            if (SECURITY_SERVICE.isAuthorized(instanceId)) {
+            if (securityService.isAuthorized(instanceId)) {
                 jobNames.add(entry.getValue());
             }
         }
@@ -274,11 +283,11 @@ public class JobOperatorImpl implements JobOperator {
 
     @Override
     public Properties getParameters(final long executionId) throws NoSuchJobExecutionException, JobSecurityException {
-        final JobInstance requestedJobInstance = KERNEL_SERVICE.getJobInstance(executionId);
-        if (!SECURITY_SERVICE.isAuthorized(requestedJobInstance.getInstanceId())) {
+        final JobInstance requestedJobInstance = kernelService.getJobInstance(executionId);
+        if (!securityService.isAuthorized(requestedJobInstance.getInstanceId())) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
-        return PERSISTENCE_SERVICE.getParameters(executionId);
+        return persistenceManagerService.getParameters(executionId);
     }
 
 
@@ -287,7 +296,7 @@ public class JobOperatorImpl implements JobOperator {
         final List<Long> jobExecutions = new ArrayList<Long>();
 
         // get the jobexecution ids associated with this job name
-        final Set<Long> executionIds = PERSISTENCE_SERVICE.jobOperatorGetRunningExecutions(jobName);
+        final Set<Long> executionIds = persistenceManagerService.jobOperatorGetRunningExecutions(jobName);
 
         if (executionIds.size() <= 0) {
             throw new NoSuchJobException("Job Name " + jobName + " not found");
@@ -296,9 +305,9 @@ public class JobOperatorImpl implements JobOperator {
         // for every job instance id
         for (final long id : executionIds) {
             try {
-                if (SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(id))) {
-                    if (KERNEL_SERVICE.isExecutionRunning(id)) {
-                        final InternalJobExecution jobEx = KERNEL_SERVICE.getJobExecution(id);
+                if (securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(id))) {
+                    if (kernelService.isExecutionRunning(id)) {
+                        final InternalJobExecution jobEx = kernelService.getJobExecution(id);
                         jobExecutions.add(jobEx.getExecutionId());
                     }
                 }
@@ -313,12 +322,12 @@ public class JobOperatorImpl implements JobOperator {
     public List<StepExecution> getStepExecutions(long executionId)
         throws NoSuchJobExecutionException, JobSecurityException {
 
-        final InternalJobExecution jobEx = KERNEL_SERVICE.getJobExecution(executionId);
+        final InternalJobExecution jobEx = kernelService.getJobExecution(executionId);
         if (jobEx == null) {
             throw new NoSuchJobExecutionException("Job Execution: " + executionId + " not found");
         }
-        if (SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
-            return PERSISTENCE_SERVICE.getStepExecutionsForJobExecution(executionId);
+        if (securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
+            return persistenceManagerService.getStepExecutionsForJobExecution(executionId);
         }
         throw new JobSecurityException("The current user is not authorized to perform this operation");
     }
@@ -345,7 +354,7 @@ public class JobOperatorImpl implements JobOperator {
     }
 
     private long restartInternal(final long oldExecutionId, final Properties restartParameters) throws JobExecutionAlreadyCompleteException, NoSuchJobExecutionException, JobExecutionNotMostRecentException, JobRestartException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(oldExecutionId))) {
+        if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(oldExecutionId))) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
 
@@ -360,15 +369,15 @@ public class JobOperatorImpl implements JobOperator {
             jobParameterWriter.write("Job parameters on restart = null");
         }
 
-        return KERNEL_SERVICE.restartJob(oldExecutionId, restartParameters).getExecutionId();
+        return kernelService.restartJob(oldExecutionId, restartParameters).getExecutionId();
     }
 
     @Override
     public void stop(final long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException, JobSecurityException {
-        if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+        if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
             throw new JobSecurityException("The current user is not authorized to perform this operation");
         }
 
-        KERNEL_SERVICE.stopJob(executionId);
+        kernelService.stopJob(executionId);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
index 044df60..63636eb 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
@@ -73,13 +73,15 @@ public abstract class BaseStepController implements ExecutionElementController {
 
     protected long rootJobExecutionId;
 
-    protected static final BatchKernelService BATCH_KERNEL = ServicesManager.service(BatchKernelService.class);
-    private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
-    private static final JobStatusManagerService JOB_STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
+    protected final BatchKernelService kernelService;
+    private final PersistenceManagerService persistenceManagerService;
+    private final JobStatusManagerService statusManagerService;
 
     protected TransactionManagerAdapter transactionManager = null;
+    private TransactionManagementService txService;
 
-    protected BaseStepController(final RuntimeJobExecution jobExecution, final Step step, final StepContextImpl stepContext, final long rootJobExecutionId) {
+    protected BaseStepController(final RuntimeJobExecution jobExecution, final Step step, final StepContextImpl stepContext, final long rootJobExecutionId,
+                                 final ServicesManager servicesManager) {
         this.jobExecutionImpl = jobExecution;
         this.jobInstance = jobExecution.getJobInstance();
         this.stepContext = stepContext;
@@ -88,13 +90,19 @@ public abstract class BaseStepController implements ExecutionElementController {
             throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
         }
         this.step = step;
+
+        this.txService = servicesManager.service(TransactionManagementService.class);
+        this.kernelService = servicesManager.service(BatchKernelService.class);
+        this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+        this.statusManagerService = servicesManager.service(JobStatusManagerService.class);
     }
 
     protected BaseStepController(final RuntimeJobExecution jobExecution,
                                  final Step step, final StepContextImpl stepContext,
                                  final long rootJobExecutionId,
-                                 final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
-        this(jobExecution, step, stepContext, rootJobExecutionId);
+                                 final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+                                 final ServicesManager servicesManager) {
+        this(jobExecution, step, stepContext, rootJobExecutionId, servicesManager);
         this.analyzerStatusQueue = analyzerStatusQueue;
     }
 
@@ -239,7 +247,7 @@ public abstract class BaseStepController implements ExecutionElementController {
         Timestamp startTS = new Timestamp(time);
         stepContext.setStartTime(startTS);
 
-        PERSISTENCE_MANAGER_SERVICE.updateStepExecution(rootJobExecutionId, stepContext);
+        persistenceManagerService.updateStepExecution(rootJobExecutionId, stepContext);
     }
 
 
@@ -262,17 +270,17 @@ public abstract class BaseStepController implements ExecutionElementController {
 
     protected void updateBatchStatus(final BatchStatus updatedBatchStatus) {
         stepStatus.setBatchStatus(updatedBatchStatus);
-        JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+        statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
         stepContext.setBatchStatus(updatedBatchStatus);
     }
 
     protected boolean shouldStepBeExecuted() {
-        this.stepStatus = JOB_STATUS_MANAGER_SERVICE.getStepStatus(jobInstance.getInstanceId(), step.getId());
+        this.stepStatus = statusManagerService.getStepStatus(jobInstance.getInstanceId(), step.getId());
         if (stepStatus == null) {
             // create new step execution
             final StepExecutionImpl stepExecution = getNewStepExecution(rootJobExecutionId, stepContext);
             // create new step status for this run
-            stepStatus = JOB_STATUS_MANAGER_SERVICE.createStepStatus(stepExecution.getStepExecutionId());
+            stepStatus = statusManagerService.createStepStatus(stepExecution.getStepExecutionId());
             stepContext.setStepExecutionId(stepExecution.getStepExecutionId());
             return true;
         } else {
@@ -332,8 +340,8 @@ public abstract class BaseStepController implements ExecutionElementController {
 
     protected void statusStarting() {
         stepStatus.setBatchStatus(BatchStatus.STARTING);
-        JOB_STATUS_MANAGER_SERVICE.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
-        JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+        statusManagerService.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
+        statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
         stepContext.setBatchStatus(BatchStatus.STARTING);
     }
 
@@ -350,23 +358,23 @@ public abstract class BaseStepController implements ExecutionElementController {
         }
 
         stepStatus.setPersistentUserData(new PersistentDataWrapper(persistentBAOS.toByteArray()));
-        JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+        statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
     }
 
     protected void persistExitStatusAndEndTimestamp() {
         stepStatus.setExitStatus(stepContext.getExitStatus());
-        JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+        statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
 
         // set the end time metric before flushing
         long time = System.currentTimeMillis();
         Timestamp endTS = new Timestamp(time);
         stepContext.setEndTime(endTS);
 
-        PERSISTENCE_MANAGER_SERVICE.updateStepExecution(rootJobExecutionId, stepContext);
+        persistenceManagerService.updateStepExecution(rootJobExecutionId, stepContext);
     }
 
     private StepExecutionImpl getNewStepExecution(long rootJobExecutionId, StepContextImpl stepContext) {
-        return PERSISTENCE_MANAGER_SERVICE.createStepExecution(rootJobExecutionId, stepContext);
+        return persistenceManagerService.createStepExecution(rootJobExecutionId, stepContext);
     }
 
     private void setContextProperties() {
@@ -389,7 +397,7 @@ public abstract class BaseStepController implements ExecutionElementController {
         stepContext.addMetric(MetricImpl.MetricType.COMMIT_COUNT, 0);
         stepContext.addMetric(MetricImpl.MetricType.ROLLBACK_COUNT, 0);
 
-        transactionManager = ServicesManager.service(TransactionManagementService.class).getTransactionManager(stepContext);
+        transactionManager = txService.getTransactionManager(stepContext);
     }
 
     public void setStepContext(final StepContextImpl stepContext) {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
index 498b596..bbd6c93 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
@@ -23,12 +23,13 @@ import org.apache.batchee.container.jsl.ExecutionElement;
 import org.apache.batchee.container.proxy.DeciderProxy;
 import org.apache.batchee.container.proxy.InjectionReferences;
 import org.apache.batchee.container.proxy.ProxyFactory;
-import org.apache.batchee.spi.PersistenceManagerService;
 import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
 import org.apache.batchee.container.status.ExtendedBatchStatus;
 import org.apache.batchee.jaxb.Decision;
 import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.spi.BatchArtifactFactory;
+import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.runtime.StepExecution;
 import java.util.List;
@@ -41,11 +42,13 @@ public class DecisionController implements ExecutionElementController {
     private StepExecution[] previousStepExecutions = null;
 
     private final PersistenceManagerService persistenceService;
+    private final BatchArtifactFactory factory;
 
-    public DecisionController(final RuntimeJobExecution jobExecution, final Decision decision) {
+    public DecisionController(final RuntimeJobExecution jobExecution, final Decision decision, final ServicesManager manager) {
         this.jobExecution = jobExecution;
         this.decision = decision;
-        this.persistenceService = ServicesManager.service(PersistenceManagerService.class);
+        this.persistenceService = manager.service(PersistenceManagerService.class);
+        this.factory = manager.service(BatchArtifactFactory.class);
     }
 
     @Override
@@ -61,7 +64,7 @@ public class DecisionController implements ExecutionElementController {
         //so two of these contexts will always be null
         final InjectionReferences injectionRef = new InjectionReferences(jobExecution.getJobContext(), null, propList);
 
-        final DeciderProxy deciderProxy = ProxyFactory.createDeciderProxy(deciderId, injectionRef, jobExecution);
+        final DeciderProxy deciderProxy = ProxyFactory.createDeciderProxy(factory, deciderId, injectionRef, jobExecution);
 
         final String exitStatus = deciderProxy.decide(this.previousStepExecutions);
 

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
index 631ea3a..1492875 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
@@ -20,6 +20,8 @@ import org.apache.batchee.container.impl.StepContextImpl;
 import org.apache.batchee.container.impl.controller.batchlet.BatchletStepController;
 import org.apache.batchee.container.impl.controller.chunk.ChunkStepController;
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.PartitionDataWrapper;
 import org.apache.batchee.jaxb.Batchlet;
 import org.apache.batchee.jaxb.Chunk;
@@ -34,17 +36,18 @@ import java.util.concurrent.BlockingQueue;
 public class ExecutionElementControllerFactory {
     public static BaseStepController getStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
                                                            final StepContextImpl stepContext, final long rootJobExecutionId,
-                                                           final BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+                                                           final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+                                                           final ServicesManager servicesManager) {
         final Partition partition = step.getPartition();
         if (partition != null) {
 
             if (partition.getMapper() != null) {
-                return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+                return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
             }
 
             if (partition.getPlan() != null) {
                 if (partition.getPlan().getPartitions() != null) {
-                    return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+                    return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
                 }
             }
         }
@@ -54,26 +57,26 @@ public class ExecutionElementControllerFactory {
             if (step.getChunk() != null) {
                 throw new IllegalArgumentException("Step contains both a batchlet and a chunk.  Aborting.");
             }
-            return new BatchletStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+            return new BatchletStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue, servicesManager);
         } else {
             final Chunk chunk = step.getChunk();
             if (chunk == null) {
                 throw new IllegalArgumentException("Step does not contain either a batchlet or a chunk.  Aborting.");
             }
-            return new ChunkStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+            return new ChunkStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue, servicesManager);
         }
     }
 
-    public static DecisionController getDecisionController(RuntimeJobExecution jobExecutionImpl, Decision decision) {
-        return new DecisionController(jobExecutionImpl, decision);
+    public static DecisionController getDecisionController(final ServicesManager servicesManager, final RuntimeJobExecution jobExecutionImpl, final Decision decision) {
+        return new DecisionController(jobExecutionImpl, decision, servicesManager);
     }
 
-    public static FlowController getFlowController(RuntimeJobExecution jobExecutionImpl, Flow flow, long rootJobExecutionId) {
-        return new FlowController(jobExecutionImpl, flow, rootJobExecutionId);
+    public static FlowController getFlowController(final ServicesManager servicesManager, final RuntimeJobExecution jobExecutionImpl, final Flow flow, final long rootJobExecutionId) {
+        return new FlowController(jobExecutionImpl, flow, rootJobExecutionId, servicesManager);
     }
 
-    public static SplitController getSplitController(RuntimeJobExecution jobExecutionImpl, Split split, long rootJobExecutionId) {
-        return new SplitController(jobExecutionImpl, split, rootJobExecutionId);
+    public static SplitController getSplitController(final BatchKernelService kernel, final RuntimeJobExecution jobExecutionImpl, final Split split, final long rootJobExecutionId) {
+        return new SplitController(jobExecutionImpl, split, rootJobExecutionId, kernel);
     }
 
     private ExecutionElementControllerFactory() {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
index 15651c5..faf7669 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
@@ -27,6 +27,8 @@ import org.apache.batchee.container.jsl.IllegalTransitionException;
 import org.apache.batchee.container.jsl.Transition;
 import org.apache.batchee.container.jsl.TransitionElement;
 import org.apache.batchee.container.navigator.ModelNavigator;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
 import org.apache.batchee.container.status.ExtendedBatchStatus;
 import org.apache.batchee.container.util.PartitionDataWrapper;
@@ -44,6 +46,7 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
 public class ExecutionTransitioner {
+    private final ServicesManager manager;
     private RuntimeJobExecution jobExecution;
     private long rootJobExecutionId;
     private ModelNavigator<?> modelNavigator;
@@ -60,19 +63,24 @@ public class ExecutionTransitioner {
 
     private List<Long> stepExecIds;
 
-    public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<?> modelNavigator) {
+    public ExecutionTransitioner(final RuntimeJobExecution jobExecution, final long rootJobExecutionId, final ModelNavigator<?> modelNavigator,
+                                 final ServicesManager servicesManager) {
         this.jobExecution = jobExecution;
         this.rootJobExecutionId = rootJobExecutionId;
         this.modelNavigator = modelNavigator;
         this.jobContext = jobExecution.getJobContext();
+        this.manager = servicesManager;
     }
 
-    public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<JSLJob> jobNavigator, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+    public ExecutionTransitioner(final RuntimeJobExecution jobExecution, final long rootJobExecutionId,
+                                 final ModelNavigator<JSLJob> jobNavigator, final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+                                 final ServicesManager manager) {
         this.jobExecution = jobExecution;
         this.rootJobExecutionId = rootJobExecutionId;
         this.modelNavigator = jobNavigator;
         this.jobContext = jobExecution.getJobContext();
         this.analyzerQueue = analyzerQueue;
+        this.manager = manager;
     }
 
     public ExecutionStatus doExecutionLoop() {
@@ -163,19 +171,19 @@ public class ExecutionTransitioner {
 
         if (currentExecutionElement instanceof Decision) {
             final Decision decision = (Decision) currentExecutionElement;
-            elementController = ExecutionElementControllerFactory.getDecisionController(jobExecution, decision);
+            elementController = ExecutionElementControllerFactory.getDecisionController(manager, jobExecution, decision);
             final DecisionController decisionController = (DecisionController) elementController;
             decisionController.setPreviousStepExecutions(previousExecutionElement, previousElementController);
         } else if (currentExecutionElement instanceof Flow) {
             final Flow flow = (Flow) currentExecutionElement;
-            elementController = ExecutionElementControllerFactory.getFlowController(jobExecution, flow, rootJobExecutionId);
+            elementController = ExecutionElementControllerFactory.getFlowController(manager, jobExecution, flow, rootJobExecutionId);
         } else if (currentExecutionElement instanceof Split) {
             final Split split = (Split) currentExecutionElement;
-            elementController = ExecutionElementControllerFactory.getSplitController(jobExecution, split, rootJobExecutionId);
+            elementController = ExecutionElementControllerFactory.getSplitController(manager.service(BatchKernelService.class), jobExecution, split, rootJobExecutionId);
         } else if (currentExecutionElement instanceof Step) {
             final Step step = (Step) currentExecutionElement;
             final StepContextImpl stepContext = new StepContextImpl(step.getId());
-            elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue);
+            elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue, manager);
         } else {
             elementController = null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
index db0d4c6..a4b6fda 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
@@ -22,6 +22,7 @@ import org.apache.batchee.container.impl.JobContextImpl;
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.navigator.ModelNavigator;
 import org.apache.batchee.container.navigator.NavigatorFactory;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
 import org.apache.batchee.container.status.ExtendedBatchStatus;
 import org.apache.batchee.jaxb.Flow;
@@ -32,6 +33,7 @@ import java.util.List;
 public class FlowController implements ExecutionElementController {
     private final RuntimeJobExecution jobExecution;
     private final JobContextImpl jobContext;
+    private final ServicesManager manager;
 
     protected ModelNavigator<Flow> flowNavigator;
 
@@ -40,18 +42,19 @@ public class FlowController implements ExecutionElementController {
 
     private ExecutionTransitioner transitioner;
 
-    public FlowController(final RuntimeJobExecution jobExecution, final Flow flow, final long rootJobExecutionId) {
+    public FlowController(final RuntimeJobExecution jobExecution, final Flow flow, final long rootJobExecutionId, final ServicesManager manager) {
         this.jobExecution = jobExecution;
         this.jobContext = jobExecution.getJobContext();
         this.flowNavigator = NavigatorFactory.createFlowNavigator(flow);
         this.flow = flow;
         this.rootJobExecutionId = rootJobExecutionId;
+        this.manager = manager;
     }
 
     @Override
     public ExecutionStatus execute() {
         if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
-            transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator);
+            transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator, manager);
             return transitioner.doExecutionLoop();
         }
         return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
index 93f6160..ea6f249 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
@@ -17,6 +17,7 @@
 package org.apache.batchee.container.impl.controller;
 
 import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
 import org.apache.batchee.container.status.ExtendedBatchStatus;
 import org.apache.batchee.container.util.FlowInSplitBuilderConfig;
@@ -25,8 +26,8 @@ public class FlowInSplitThreadRootController extends JobThreadRootController {
     // Careful, we have a separately named reference to the same object in the parent class
     private RuntimeFlowInSplitExecution flowInSplitExecution;
 
-    public FlowInSplitThreadRootController(final RuntimeFlowInSplitExecution flowInSplitExecution, final FlowInSplitBuilderConfig config) {
-        super(flowInSplitExecution, config.getRootJobExecutionId());
+    public FlowInSplitThreadRootController(final RuntimeFlowInSplitExecution flowInSplitExecution, final FlowInSplitBuilderConfig config, final ServicesManager manager) {
+        super(flowInSplitExecution, config.getRootJobExecutionId(), manager);
         this.flowInSplitExecution = flowInSplitExecution;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
index 3446ba1..d671eb5 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
@@ -17,13 +17,14 @@
 package org.apache.batchee.container.impl.controller;
 
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.ServicesManager;
 
 public class JobController extends JobThreadRootController {
-    private JobController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId) {
-        super(jobExecution, rootJobExecutionId);
+    private JobController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId, final ServicesManager manager) {
+        super(jobExecution, rootJobExecutionId, manager);
     }
 
-    public JobController(final RuntimeJobExecution jobExecution) {
-        this(jobExecution, jobExecution.getExecutionId());
+    public JobController(final RuntimeJobExecution jobExecution, final ServicesManager manager) {
+        this(jobExecution, jobExecution.getExecutionId(), manager);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
index 1dd8c3b..feb3f1c 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
@@ -25,12 +25,13 @@ import org.apache.batchee.container.proxy.InjectionReferences;
 import org.apache.batchee.container.proxy.JobListenerProxy;
 import org.apache.batchee.container.proxy.ListenerFactory;
 import org.apache.batchee.container.services.JobStatusManagerService;
-import org.apache.batchee.spi.PersistenceManagerService;
 import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.status.ExecutionStatus;
 import org.apache.batchee.container.status.ExtendedBatchStatus;
 import org.apache.batchee.container.util.PartitionDataWrapper;
 import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.spi.BatchArtifactFactory;
+import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.runtime.BatchStatus;
 import java.io.PrintWriter;
@@ -52,22 +53,25 @@ public abstract class JobThreadRootController implements ThreadRootController {
     protected final ModelNavigator<JSLJob> jobNavigator;
     protected final JobStatusManagerService jobStatusService;
     protected final PersistenceManagerService persistenceService;
+    protected final ServicesManager manager;
 
     private ExecutionTransitioner transitioner;
     private BlockingQueue<PartitionDataWrapper> analyzerQueue;
 
-    public JobThreadRootController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId) {
+    public JobThreadRootController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId,
+                                   final ServicesManager servicesManager) {
         this.jobExecution = jobExecution;
         this.jobContext = jobExecution.getJobContext();
         this.rootJobExecutionId = rootJobExecutionId;
         this.jobInstanceId = jobExecution.getInstanceId();
-        this.jobStatusService = ServicesManager.service(JobStatusManagerService.class);
-        this.persistenceService = ServicesManager.service(PersistenceManagerService.class);
+        this.jobStatusService = servicesManager.service(JobStatusManagerService.class);
+        this.persistenceService = servicesManager.service(PersistenceManagerService.class);
         this.jobNavigator = jobExecution.getJobNavigator();
+        this.manager = servicesManager;
 
         final JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
         final InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
-        listenerFactory = new ListenerFactory(jobModel, injectionRef, jobExecution);
+        listenerFactory = new ListenerFactory(servicesManager.service(BatchArtifactFactory.class), jobModel, injectionRef, jobExecution);
         jobExecution.setListenerFactory(listenerFactory);
     }
 
@@ -75,8 +79,9 @@ public abstract class JobThreadRootController implements ThreadRootController {
      * By not passing the rootJobExecutionId, we are "orphaning" the subjob execution and making it not findable from the parent.
      * This is exactly what we want for getStepExecutions()... we don't want it to get extraneous entries for the partitions.
      */
-    public JobThreadRootController(final RuntimeJobExecution jobExecution, final BlockingQueue<PartitionDataWrapper> analyzerQueue) {
-        this(jobExecution, jobExecution.getExecutionId());
+    public JobThreadRootController(final RuntimeJobExecution jobExecution, final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+                                   final ServicesManager servicesManager) {
+        this(jobExecution, jobExecution.getExecutionId(), servicesManager);
         this.analyzerQueue = analyzerQueue;
     }
 
@@ -96,7 +101,7 @@ public abstract class JobThreadRootController implements ThreadRootController {
                 // The BIG loop transitioning
                 // within the job !!!
                 // --------------------
-                transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue);
+                transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue, manager);
                 retVal = transitioner.doExecutionLoop();
                 ExtendedBatchStatus extBatchStatus = retVal.getExtendedBatchStatus();
                 switch (extBatchStatus) {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
index 920ca99..8cd3593 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
@@ -17,14 +17,15 @@
 package org.apache.batchee.container.impl.controller;
 
 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.PartitionsBuilderConfig;
 
 /**
  * Currently there's no special function on top of the subjob required of the partition.
  */
 public class PartitionThreadRootController extends JobThreadRootController {
-    public PartitionThreadRootController(RuntimeJobExecution jobExecution, PartitionsBuilderConfig config) {
-        super(jobExecution, config.getAnalyzerQueue());
+    public PartitionThreadRootController(final RuntimeJobExecution jobExecution, final PartitionsBuilderConfig config, final ServicesManager servicesManager) {
+        super(jobExecution, config.getAnalyzerQueue(), servicesManager);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index 9797233..01444ef 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -26,6 +26,7 @@ import org.apache.batchee.container.proxy.PartitionMapperProxy;
 import org.apache.batchee.container.proxy.PartitionReducerProxy;
 import org.apache.batchee.container.proxy.ProxyFactory;
 import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.BatchPartitionPlan;
 import org.apache.batchee.container.util.BatchPartitionWorkUnit;
 import org.apache.batchee.container.util.BatchWorkUnit;
@@ -39,6 +40,7 @@ import org.apache.batchee.jaxb.PartitionMapper;
 import org.apache.batchee.jaxb.PartitionReducer;
 import org.apache.batchee.jaxb.Property;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 
 import javax.batch.api.partition.PartitionPlan;
 import javax.batch.api.partition.PartitionReducer.PartitionStatus;
@@ -80,8 +82,12 @@ public class PartitionedStepController extends BaseStepController {
 
     BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
 
-    protected PartitionedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, StepContextImpl stepContext, long rootJobExecutionId) {
-        super(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+    private final BatchArtifactFactory factory;
+
+    protected PartitionedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
+                                        final long rootJobExecutionId, final ServicesManager servicesManager) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
+        factory = servicesManager.service(BatchArtifactFactory.class);
     }
 
     @Override
@@ -96,7 +102,7 @@ public class PartitionedStepController extends BaseStepController {
             if (parallelBatchWorkUnits != null) {
                 for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
                     try {
-                        BATCH_KERNEL.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+                        kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
                     } catch (Exception e) {
                         // TODO - Is this what we want to know.
                         // Blow up if it happens to force the issue.
@@ -128,7 +134,7 @@ public class PartitionedStepController extends BaseStepController {
             // Set all the contexts associated with this controller.
             // Some of them may be null
             final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propertyList);
-            final PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            final PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(factory, partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);
 
 
             final PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
@@ -280,9 +286,9 @@ public class PartitionedStepController extends BaseStepController {
             PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
             // Then build all the subjobs but do not start them yet
             if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
-                parallelBatchWorkUnits = BATCH_KERNEL.buildOnRestartParallelPartitions(config);
+                parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config);
             } else {
-                parallelBatchWorkUnits = BATCH_KERNEL.buildNewParallelPartitions(config);
+                parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config);
             }
 
             // NOTE:  At this point I might not have as many work units as I had partitions, since some may have already completed.
@@ -304,9 +310,9 @@ public class PartitionedStepController extends BaseStepController {
         for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
             final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);
             if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
-                BATCH_KERNEL.restartGeneratedJob(workUnit);
+                kernelService.restartGeneratedJob(workUnit);
             } else {
-                BATCH_KERNEL.startGeneratedJob(workUnit);
+                kernelService.startGeneratedJob(workUnit);
             }
         }
 
@@ -336,9 +342,9 @@ public class PartitionedStepController extends BaseStepController {
             if (numCurrentCompleted < numTotalForThisExecution) {
                 if (numCurrentSubmitted < numTotalForThisExecution) {
                     if (stepStatus.getStartCount() > 1) {
-                        BATCH_KERNEL.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+                        kernelService.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
                     } else {
-                        BATCH_KERNEL.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+                        kernelService.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
                     }
                 }
             } else {
@@ -388,14 +394,14 @@ public class PartitionedStepController extends BaseStepController {
         if (analyzer != null) {
             final List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties().getPropertyList();
             injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-            analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(factory, analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
         }
 
         final PartitionReducer partitionReducer = step.getPartition().getReducer();
         if (partitionReducer != null) {
             final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties().getPropertyList();
             injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-            partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(factory, partitionReducer.getRef(), injectionRef, stepContext, jobExecutionImpl);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
index d085622..94738c0 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
@@ -23,11 +23,13 @@ import org.apache.batchee.container.proxy.InjectionReferences;
 import org.apache.batchee.container.proxy.PartitionCollectorProxy;
 import org.apache.batchee.container.proxy.ProxyFactory;
 import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.PartitionDataWrapper;
 import org.apache.batchee.container.util.PartitionDataWrapper.PartitionEventType;
 import org.apache.batchee.jaxb.Collector;
 import org.apache.batchee.jaxb.Property;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 
 import java.io.Serializable;
 import java.util.List;
@@ -41,11 +43,17 @@ import java.util.concurrent.BlockingQueue;
  * separate main thread with controller).
  */
 public abstract class SingleThreadedStepController extends BaseStepController implements Controller {
+    private final BatchArtifactFactory factory;
+
     // Collector only used from partition threads, not main thread
     protected PartitionCollectorProxy collectorProxy = null;
 
-    protected SingleThreadedStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
-        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+    protected SingleThreadedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
+                                           final StepContextImpl stepContext, final long rootJobExecutionId,
+                                           final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+                                           final ServicesManager servicesManager) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, servicesManager);
+        factory = servicesManager.service(BatchArtifactFactory.class);
     }
 
     List<StepListenerProxy> stepListeners = null;
@@ -67,7 +75,7 @@ public abstract class SingleThreadedStepController extends BaseStepController im
                  * contexts may be null
                  */
                 injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-                this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(collector.getRef(), injectionRef, this.stepContext, jobExecutionImpl);
+                this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(factory, collector.getRef(), injectionRef, this.stepContext, jobExecutionImpl);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
index b882de6..43c26bb 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
@@ -60,13 +60,13 @@ public class SplitController implements ExecutionElementController {
 
     protected Split split;
 
-    public SplitController(RuntimeJobExecution jobExecution, Split split, long rootJobExecutionId) {
+    public SplitController(final RuntimeJobExecution jobExecution, final Split split, final long rootJobExecutionId,
+                           final BatchKernelService kernelService) {
         this.jobExecution = jobExecution;
         this.jobContext = jobExecution.getJobContext();
         this.rootJobExecutionId = rootJobExecutionId;
         this.split = split;
-
-        batchKernel = ServicesManager.service(BatchKernelService.class);
+        this.batchKernel = kernelService;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
index a0f0cbb..13b60df 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
@@ -23,29 +23,34 @@ import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.proxy.BatchletProxy;
 import org.apache.batchee.container.proxy.InjectionReferences;
 import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.PartitionDataWrapper;
 import org.apache.batchee.jaxb.Batchlet;
 import org.apache.batchee.jaxb.Property;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 
 import javax.batch.runtime.BatchStatus;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
 public class BatchletStepController extends SingleThreadedStepController {
+    private final BatchArtifactFactory factory;
     private BatchletProxy batchletProxy;
 
     public BatchletStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
                                   final StepContextImpl stepContext, final long rootJobExecutionId,
-                                  final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
-        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+                                  final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+                                  final ServicesManager manager) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, manager);
+        factory = manager.service(BatchArtifactFactory.class);
     }
 
     private void invokeBatchlet(final Batchlet batchlet) throws BatchContainerServiceException {
         final String batchletId = batchlet.getRef();
         final List<Property> propList = (batchlet.getProperties() == null) ? null : batchlet.getProperties().getPropertyList();
         final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-        batchletProxy = ProxyFactory.createBatchletProxy(batchletId, injectionRef, stepContext, jobExecutionImpl);
+        batchletProxy = ProxyFactory.createBatchletProxy(factory, batchletId, injectionRef, stepContext, jobExecutionImpl);
 
         if (!wasStopIssued()) {
             final String processRetVal = batchletProxy.process();

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
index 34f2fae..781f41f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
@@ -24,14 +24,15 @@ import org.apache.batchee.container.proxy.InjectionReferences;
 import org.apache.batchee.container.proxy.ProxyFactory;
 import org.apache.batchee.jaxb.Chunk;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 
 public final class CheckpointAlgorithmFactory {
-    public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final Step step, final InjectionReferences injectionReferences, final StepContextImpl stepContext, final RuntimeJobExecution jobExecution) {
+    public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final BatchArtifactFactory factory, final Step step, final InjectionReferences injectionReferences, final StepContextImpl stepContext, final RuntimeJobExecution jobExecution) {
         final Chunk chunk = step.getChunk();
         final String checkpointType = chunk.getCheckpointPolicy();
         final CheckpointAlgorithmProxy proxy;
         if ("custom".equalsIgnoreCase(checkpointType)) {
-            proxy = ProxyFactory.createCheckpointAlgorithmProxy(chunk.getCheckpointAlgorithm().getRef(), injectionReferences, stepContext, jobExecution);
+            proxy = ProxyFactory.createCheckpointAlgorithmProxy(factory, chunk.getCheckpointAlgorithm().getRef(), injectionReferences, stepContext, jobExecution);
         } else /* "item" */ {
             proxy = new CheckpointAlgorithmProxy(new ItemCheckpointAlgorithm());
         }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
index 76cae58..f7f01bf 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -38,14 +38,15 @@ public class CheckpointManager {
 
     public CheckpointManager(final ItemReaderProxy reader, final ItemWriterProxy writer,
                              final CheckpointAlgorithm chkptAlg,
-                             final long jobInstanceID, final String stepId) {
+                             final long jobInstanceID, final String stepId,
+                             final PersistenceManagerService persistenceManagerService) {
         this.readerProxy = reader;
         this.writerProxy = writer;
         this.checkpointAlgorithm = chkptAlg;
         this.stepId = stepId;
         this.jobInstanceID = jobInstanceID;
 
-        this.persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+        this.persistenceManagerService = persistenceManagerService;
     }
 
     public boolean applyCheckPointPolicy() {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index 7c8eec7..61cadfe 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -47,6 +47,7 @@ import org.apache.batchee.jaxb.ItemReader;
 import org.apache.batchee.jaxb.ItemWriter;
 import org.apache.batchee.jaxb.Property;
 import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
 import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.api.chunk.CheckpointAlgorithm;
@@ -64,7 +65,8 @@ public class ChunkStepController extends SingleThreadedStepController {
     private final static String sourceClass = ChunkStepController.class.getName();
     private final static Logger logger = Logger.getLogger(sourceClass);
 
-    private final PersistenceManagerService persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+    private final PersistenceManagerService persistenceManagerService;
+    private final BatchArtifactFactory artifactFactory;
 
     private Chunk chunk = null;
     private ItemReaderProxy readerProxy = null;
@@ -84,8 +86,11 @@ public class ChunkStepController extends SingleThreadedStepController {
     private boolean rollbackRetry = false;
 
     public ChunkStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
-                               final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
-        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+                               final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+                               final ServicesManager servicesManager) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, servicesManager);
+        this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+        this.artifactFactory = servicesManager.service(BatchArtifactFactory.class);
     }
 
     /**
@@ -472,7 +477,6 @@ public class ChunkStepController extends SingleThreadedStepController {
         int timeInterval = ChunkHelper.getTimeLimit(chunk);
         boolean checkPointed = true;
         boolean rollback = false;
-        Throwable caughtThrowable = null;
 
         // begin new transaction at first iteration or after a checkpoint commit
 
@@ -498,7 +502,7 @@ public class ChunkStepController extends SingleThreadedStepController {
                         positionWriterAtCheckpoint();
                         checkpointManager = new CheckpointManager(readerProxy, writerProxy,
                             getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl
-                            .getJobInstance().getInstanceId(), step.getId());
+                            .getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
                     }
                 }
 
@@ -581,7 +585,6 @@ public class ChunkStepController extends SingleThreadedStepController {
 
             }
         } catch (final Exception e) {
-            caughtThrowable = e;
             logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
             // Only try to call onError() if we have an Exception, but not an Error.
             for (ChunkListenerProxy chunkProxy : chunkListeners) {
@@ -591,20 +594,20 @@ public class ChunkStepController extends SingleThreadedStepController {
                     logger.log(Level.SEVERE, e1.getMessage(), e1);
                 }
             }
+            rollback(e);
         } catch (final Throwable t) {
-            caughtThrowable = t;
-            logger.log(Level.SEVERE, t.getMessage(), t);
-        } finally {
-            if (caughtThrowable != null) {
-                transactionManager.setRollbackOnly();
-                readerProxy.close();
-                writerProxy.close();
-                transactionManager.rollback();
-                throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", caughtThrowable);
-            }
+            rollback(t);
         }
     }
 
+    private void rollback(final Throwable t) {
+        transactionManager.setRollbackOnly();
+        readerProxy.close();
+        writerProxy.close();
+        transactionManager.rollback();
+        throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
+    }
+
     protected void invokeCoreStep() throws BatchContainerServiceException {
 
         this.chunk = step.getChunk();
@@ -637,7 +640,7 @@ public class ChunkStepController extends SingleThreadedStepController {
             final ItemReader itemReader = chunk.getReader();
             final List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
             final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemReaderProps);
-            readerProxy = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            readerProxy = ProxyFactory.createItemReaderProxy(artifactFactory, itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
         }
 
         {
@@ -645,7 +648,7 @@ public class ChunkStepController extends SingleThreadedStepController {
             if (itemProcessor != null) {
                 final List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
                 final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemProcessorProps);
-                processorProxy = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef, stepContext, jobExecutionImpl);
+                processorProxy = ProxyFactory.createItemProcessorProxy(artifactFactory, itemProcessor.getRef(), injectionRef, stepContext, jobExecutionImpl);
             }
         }
 
@@ -653,7 +656,7 @@ public class ChunkStepController extends SingleThreadedStepController {
             final ItemWriter itemWriter = chunk.getWriter();
             final List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
             final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemWriterProps);
-            writerProxy = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            writerProxy = ProxyFactory.createItemWriterProxy(artifactFactory, itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
         }
 
         {
@@ -665,7 +668,7 @@ public class ChunkStepController extends SingleThreadedStepController {
             }
 
             final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-            checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(step, injectionRef, stepContext, jobExecutionImpl);
+            checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(artifactFactory, step, injectionRef, stepContext, jobExecutionImpl);
         }
 
         {
@@ -689,7 +692,7 @@ public class ChunkStepController extends SingleThreadedStepController {
                 chkptAlg = checkpointProxy;
             }
 
-            checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
+            checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
 
             skipHandler = new SkipHandler(chunk);
             skipHandler.addSkipProcessListener(skipProcessListeners);