You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2014/01/02 10:27:54 UTC
[1/2] BATCHEE-9 ServicesManager shouldn't be contextual once an
action started
Updated Branches:
refs/heads/master b303e37e6 -> 8ddf9f754
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
index 5b89a88..6230a76 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
@@ -42,10 +42,6 @@ import javax.batch.runtime.JobInstance;
import java.util.Properties;
public class JobExecutionHelper {
- private static final JobStatusManagerService JOB_STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
- private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
- private static final SecurityService SECURITY_SERVICE = ServicesManager.service(SecurityService.class);
-
private static ModelNavigator<JSLJob> getResolvedJobNavigator(final String jobXml, final Properties jobParameters, final boolean parallelExecution) {
final JSLJob jobModel = new JobModelResolver().resolveModel(jobXml);
final PropertyResolver<JSLJob> propResolver = PropertyResolverFactory.createJobPropertyResolver(parallelExecution);
@@ -69,17 +65,19 @@ public class JobExecutionHelper {
return new JobContextImpl(jobNavigator, jslProperties);
}
- private static JobInstance getNewJobInstance(final String name, final String jobXml) {
- return PERSISTENCE_MANAGER_SERVICE.createJobInstance(name, SECURITY_SERVICE.getLoggedUser(), jobXml);
+ private static JobInstance getNewJobInstance(final ServicesManager servicesManager, final String name, final String jobXml) {
+ return servicesManager.service(PersistenceManagerService.class).createJobInstance(
+ name, servicesManager.service(SecurityService.class).getLoggedUser(), jobXml);
}
- private static JobInstance getNewSubJobInstance(final String name) {
- return PERSISTENCE_MANAGER_SERVICE.createSubJobInstance(name, SECURITY_SERVICE.getLoggedUser());
+ private static JobInstance getNewSubJobInstance(final ServicesManager servicesManager, final String name) {
+ return servicesManager.service(PersistenceManagerService.class).createSubJobInstance(
+ name, servicesManager.service(SecurityService.class).getLoggedUser());
}
- private static JobStatus createNewJobStatus(final JobInstance jobInstance) {
+ private static JobStatus createNewJobStatus(final JobStatusManagerService statusManagerService, final JobInstance jobInstance) {
final long instanceId = jobInstance.getInstanceId();
- final JobStatus jobStatus = JOB_STATUS_MANAGER_SERVICE.createJobStatus(instanceId);
+ final JobStatus jobStatus = statusManagerService.createJobStatus(instanceId);
jobStatus.setJobInstance(jobInstance);
return jobStatus;
}
@@ -91,47 +89,50 @@ public class JobExecutionHelper {
}
}
- public static RuntimeJobExecution startJob(final String jobXML, final Properties jobParameters) throws JobStartException {
+ public static RuntimeJobExecution startJob(final ServicesManager servicesManager, final String jobXML, final Properties jobParameters) throws JobStartException {
final JSLJob jobModel = new JobModelResolver().resolveModel(jobXML);
final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, jobParameters, false);
final JobContextImpl jobContext = getJobContext(jobNavigator);
- final JobInstance jobInstance = getNewJobInstance(jobNavigator.getRootModelElement().getId(), jobXML);
- final RuntimeJobExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
+ final JobInstance jobInstance = getNewJobInstance(servicesManager, jobNavigator.getRootModelElement().getId(), jobXML);
+ final RuntimeJobExecution executionHelper = servicesManager.service(PersistenceManagerService.class).createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
executionHelper.prepareForExecution(jobContext);
- final JobStatus jobStatus = createNewJobStatus(jobInstance);
- JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+ final JobStatusManagerService statusManagerService = servicesManager.service(JobStatusManagerService.class);
+ final JobStatus jobStatus = createNewJobStatus(statusManagerService, jobInstance);
+ statusManagerService.updateJobStatus(jobStatus);
return executionHelper;
}
- public static RuntimeFlowInSplitExecution startFlowInSplit(final JSLJob jobModel) throws JobStartException {
+ public static RuntimeFlowInSplitExecution startFlowInSplit(final ServicesManager servicesManager, final JSLJob jobModel) throws JobStartException {
final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, null, true);
final JobContextImpl jobContext = getJobContext(jobNavigator);
- final JobInstance jobInstance = getNewSubJobInstance(jobNavigator.getRootModelElement().getId());
- final RuntimeFlowInSplitExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
+ final JobInstance jobInstance = getNewSubJobInstance(servicesManager, jobNavigator.getRootModelElement().getId());
+ final RuntimeFlowInSplitExecution executionHelper = servicesManager.service(PersistenceManagerService.class).createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
executionHelper.prepareForExecution(jobContext);
- final JobStatus jobStatus = createNewJobStatus(jobInstance);
- JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+ final JobStatusManagerService statusManagerService = servicesManager.service(JobStatusManagerService.class);
+ final JobStatus jobStatus = createNewJobStatus(statusManagerService, jobInstance);
+ statusManagerService.updateJobStatus(jobStatus);
return executionHelper;
}
- public static RuntimeJobExecution startPartition(JSLJob jobModel, Properties jobParameters) throws JobStartException {
+ public static RuntimeJobExecution startPartition(final ServicesManager servicesManager, final JSLJob jobModel, final Properties jobParameters) throws JobStartException {
final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, jobParameters, true);
final JobContextImpl jobContext = getJobContext(jobNavigator);
- final JobInstance jobInstance = getNewSubJobInstance(jobNavigator.getRootModelElement().getId());
+ final JobInstance jobInstance = getNewSubJobInstance(servicesManager, jobNavigator.getRootModelElement().getId());
- final RuntimeJobExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
+ final RuntimeJobExecution executionHelper = servicesManager.service(PersistenceManagerService.class).createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
executionHelper.prepareForExecution(jobContext);
- final JobStatus jobStatus = createNewJobStatus(jobInstance);
- JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+ final JobStatusManagerService statusManagerService = servicesManager.service(JobStatusManagerService.class);
+ final JobStatus jobStatus = createNewJobStatus(statusManagerService, jobInstance);
+ statusManagerService.updateJobStatus(jobStatus);
return executionHelper;
}
@@ -148,35 +149,38 @@ public class JobExecutionHelper {
}
}
- private static void validateJobExecutionIsMostRecent(final long jobInstanceId, final long executionId) throws JobExecutionNotMostRecentException {
- final long mostRecentExecutionId = PERSISTENCE_MANAGER_SERVICE.getMostRecentExecutionId(jobInstanceId);
+ private static void validateJobExecutionIsMostRecent(final PersistenceManagerService persistenceManagerService, final long jobInstanceId, final long executionId) throws JobExecutionNotMostRecentException {
+ final long mostRecentExecutionId = persistenceManagerService.getMostRecentExecutionId(jobInstanceId);
if (mostRecentExecutionId != executionId) {
throw new JobExecutionNotMostRecentException("ExecutionId: " + executionId + " is not the most recent execution.");
}
}
- public static RuntimeJobExecution restartPartition(final long execId, final JSLJob gennedJobModel, final Properties partitionProps) throws JobRestartException,
+ public static RuntimeJobExecution restartPartition(final ServicesManager servicesManager, final long execId, final JSLJob gennedJobModel, final Properties partitionProps) throws JobRestartException,
JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
- return restartExecution(execId, gennedJobModel, partitionProps, true, false);
+ return restartExecution(servicesManager, execId, gennedJobModel, partitionProps, true, false);
}
- public static RuntimeFlowInSplitExecution restartFlowInSplit(final long execId, final JSLJob gennedJobModel) throws JobRestartException,
+ public static RuntimeFlowInSplitExecution restartFlowInSplit(final ServicesManager servicesManager, final long execId, final JSLJob gennedJobModel) throws JobRestartException,
JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
- return (RuntimeFlowInSplitExecution) restartExecution(execId, gennedJobModel, null, true, true);
+ return (RuntimeFlowInSplitExecution) restartExecution(servicesManager, execId, gennedJobModel, null, true, true);
}
- public static RuntimeJobExecution restartJob(final long executionId, final Properties restartJobParameters) throws JobRestartException,
+ public static RuntimeJobExecution restartJob(final ServicesManager servicesManager, final long executionId, final Properties restartJobParameters) throws JobRestartException,
JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
- return restartExecution(executionId, null, restartJobParameters, false, false);
+ return restartExecution(servicesManager, executionId, null, restartJobParameters, false, false);
}
- private static RuntimeJobExecution restartExecution(final long executionId, final JSLJob gennedJobModel, final Properties restartJobParameters, final boolean parallelExecution, final boolean flowInSplit) throws JobRestartException,
+ private static RuntimeJobExecution restartExecution(final ServicesManager servicesManager, final long executionId, final JSLJob gennedJobModel, final Properties restartJobParameters, final boolean parallelExecution, final boolean flowInSplit) throws JobRestartException,
JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
- final long jobInstanceId = PERSISTENCE_MANAGER_SERVICE.getJobInstanceIdByExecutionId(executionId);
- final JobStatus jobStatus = JOB_STATUS_MANAGER_SERVICE.getJobStatus(jobInstanceId);
+ final PersistenceManagerService persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+ final JobStatusManagerService jobStatusManagerService = servicesManager.service(JobStatusManagerService.class);
+
+ final long jobInstanceId = persistenceManagerService.getJobInstanceIdByExecutionId(executionId);
+ final JobStatus jobStatus = jobStatusManagerService.getJobStatus(jobInstanceId);
- validateJobExecutionIsMostRecent(jobInstanceId, executionId);
+ validateJobExecutionIsMostRecent(persistenceManagerService, jobInstanceId, executionId);
validateJobInstanceNotCompleteOrAbandonded(jobStatus);
@@ -196,22 +200,22 @@ public class JobExecutionHelper {
final RuntimeJobExecution executionHelper;
if (flowInSplit) {
- executionHelper = PERSISTENCE_MANAGER_SERVICE.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
+ executionHelper = persistenceManagerService.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
} else {
- executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, restartJobParameters, jobContext.getBatchStatus());
+ executionHelper = persistenceManagerService.createJobExecution(jobInstance, restartJobParameters, jobContext.getBatchStatus());
}
executionHelper.prepareForExecution(jobContext, jobStatus.getRestartOn());
- JOB_STATUS_MANAGER_SERVICE.updateJobStatusWithNewExecution(jobInstance.getInstanceId(), executionHelper.getExecutionId());
+ jobStatusManagerService.updateJobStatusWithNewExecution(jobInstance.getInstanceId(), executionHelper.getExecutionId());
return executionHelper;
}
- public static InternalJobExecution getPersistedJobOperatorJobExecution(final long jobExecutionId) throws NoSuchJobExecutionException {
- return PERSISTENCE_MANAGER_SERVICE.jobOperatorGetJobExecution(jobExecutionId);
+ public static InternalJobExecution getPersistedJobOperatorJobExecution(final PersistenceManagerService persistenceManagerService, final long jobExecutionId) throws NoSuchJobExecutionException {
+ return persistenceManagerService.jobOperatorGetJobExecution(jobExecutionId);
}
- public static JobInstance getJobInstance(final long executionId) {
- return JOB_STATUS_MANAGER_SERVICE.getJobStatusFromExecutionId(executionId).getJobInstance();
+ public static JobInstance getJobInstance(final JobStatusManagerService statusManagerService, final long executionId) {
+ return statusManagerService.getJobStatusFromExecutionId(executionId).getJobInstance();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
index 8570351..bf065bf 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
@@ -16,13 +16,15 @@
*/
package org.apache.batchee.container.impl.jobinstance;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
+import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.runtime.JobInstance;
public class RuntimeFlowInSplitExecution extends RuntimeJobExecution {
- public RuntimeFlowInSplitExecution(final JobInstance jobInstance, final long executionId) {
- super(jobInstance, executionId);
+ public RuntimeFlowInSplitExecution(final JobInstance jobInstance, final long executionId, final PersistenceManagerService manager) {
+ super(jobInstance, executionId, manager);
}
private ExecutionStatus flowStatus;
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
index d9fdb84..b1b81c4 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
@@ -22,6 +22,7 @@ import org.apache.batchee.container.navigator.ModelNavigator;
import org.apache.batchee.container.proxy.ListenerFactory;
import org.apache.batchee.container.services.InternalJobExecution;
import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobInstance;
@@ -43,10 +44,10 @@ public class RuntimeJobExecution {
private Integer partitionInstance = null;
private Collection<Closeable> releasables = new ArrayList<Closeable>();
- public RuntimeJobExecution(final JobInstance jobInstance, final long executionId) {
+ public RuntimeJobExecution(final JobInstance jobInstance, final long executionId, final PersistenceManagerService persistenceManagerService) {
this.jobInstance = jobInstance;
this.executionId = executionId;
- this.operatorJobExecution = new JobExecutionImpl(executionId, jobInstance.getInstanceId());
+ this.operatorJobExecution = new JobExecutionImpl(executionId, jobInstance.getInstanceId(), persistenceManagerService);
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
index f568205..a872c30 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
@@ -24,6 +24,7 @@ import org.apache.batchee.jaxb.Listener;
import org.apache.batchee.jaxb.Listeners;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
import javax.batch.api.chunk.listener.ChunkListener;
import javax.batch.api.chunk.listener.ItemProcessListener;
@@ -46,29 +47,30 @@ import java.util.concurrent.ConcurrentHashMap;
public class ListenerFactory {
- private List<ListenerInfo> jobLevelListenerInfo = null;
-
- private Map<String, List<ListenerInfo>> stepLevelListenerInfo = new ConcurrentHashMap<String, List<ListenerInfo>>();
+ private final BatchArtifactFactory factory;
+ private final List<ListenerInfo> jobLevelListenerInfo;
+ private final Map<String, List<ListenerInfo>> stepLevelListenerInfo = new ConcurrentHashMap<String, List<ListenerInfo>>();
/*
* Build job-level ListenerInfo(s) up-front, but build step-level ones
* lazily.
*/
- public ListenerFactory(final JSLJob jobModel, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
+ public ListenerFactory(final BatchArtifactFactory factory, final JSLJob jobModel, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
jobLevelListenerInfo = new ArrayList<ListenerInfo>();
+ this.factory = factory;
Listeners jobLevelListeners = jobModel.getListeners();
- jobLevelListenerInfo.addAll(globalListeners("org.apache.batchee.job.listeners.before", injectionRefs, execution));
+ jobLevelListenerInfo.addAll(globalListeners(factory, "org.apache.batchee.job.listeners.before", injectionRefs, execution));
if (jobLevelListeners != null) {
for (final Listener listener : jobLevelListeners.getListenerList()) {
- jobLevelListenerInfo.add(buildListenerInfo(listener, injectionRefs, execution));
+ jobLevelListenerInfo.add(buildListenerInfo(factory, listener, injectionRefs, execution));
}
}
- jobLevelListenerInfo.addAll(globalListeners("org.apache.batchee.job.listeners.after", injectionRefs, execution));
+ jobLevelListenerInfo.addAll(globalListeners(factory, "org.apache.batchee.job.listeners.after", injectionRefs, execution));
}
- private static Collection<ListenerInfo> globalListeners(final String key, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
+ private static Collection<ListenerInfo> globalListeners(final BatchArtifactFactory factory, final String key, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
final String globalListeners = ServicesManager.value(key, null);
if (globalListeners != null) {
final String[] refs = globalListeners.split(",");
@@ -78,7 +80,7 @@ public class ListenerFactory {
final Listener listener = new Listener();
listener.setRef(ref);
- list.add(buildListenerInfo(listener, injectionRefs, execution));
+ list.add(buildListenerInfo(factory, listener, injectionRefs, execution));
}
return list;
@@ -91,7 +93,7 @@ public class ListenerFactory {
*
* @JobListener, even if that is the only type of listener annotation found.
*/
- private List<ListenerInfo> getStepListenerInfo(final Step step, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
+ private List<ListenerInfo> getStepListenerInfo(final BatchArtifactFactory factory, final Step step, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
List<ListenerInfo> stepListenerInfoList = stepLevelListenerInfo.get(step.getId());
if (stepListenerInfoList == null) {
synchronized (this) {
@@ -100,28 +102,28 @@ public class ListenerFactory {
stepListenerInfoList = new ArrayList<ListenerInfo>();
stepLevelListenerInfo.put(step.getId(), stepListenerInfoList);
- stepListenerInfoList.addAll(globalListeners("org.apache.batchee.step.listeners.before", injectionRefs, execution));
+ stepListenerInfoList.addAll(globalListeners(factory, "org.apache.batchee.step.listeners.before", injectionRefs, execution));
final Listeners stepLevelListeners = step.getListeners();
if (stepLevelListeners != null) {
for (final Listener listener : stepLevelListeners.getListenerList()) {
- stepListenerInfoList.add(buildListenerInfo(listener, injectionRefs, execution));
+ stepListenerInfoList.add(buildListenerInfo(factory, listener, injectionRefs, execution));
}
}
- stepListenerInfoList.addAll(globalListeners("org.apache.batchee.step.listeners.after", injectionRefs, execution));
+ stepListenerInfoList.addAll(globalListeners(factory, "org.apache.batchee.step.listeners.after", injectionRefs, execution));
}
}
}
return stepListenerInfoList;
}
- private static ListenerInfo buildListenerInfo(final Listener listener, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
+ private static ListenerInfo buildListenerInfo(final BatchArtifactFactory factory, final Listener listener, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
final String id = listener.getRef();
final List<Property> propList = (listener.getProperties() == null) ? null : listener.getProperties().getPropertyList();
injectionRefs.setProps(propList);
- final Object listenerArtifact = ProxyFactory.loadArtifact(id, injectionRefs, execution);
+ final Object listenerArtifact = ProxyFactory.loadArtifact(factory, id, injectionRefs, execution);
if (listenerArtifact == null) {
throw new IllegalArgumentException("Load of artifact id: " + id + " returned <null>.");
}
@@ -140,7 +142,7 @@ public class ListenerFactory {
}
public List<ChunkListenerProxy> getChunkListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<ChunkListenerProxy> retVal = new ArrayList<ChunkListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isChunkListener()) {
@@ -154,7 +156,7 @@ public class ListenerFactory {
}
public List<ItemProcessListenerProxy> getItemProcessListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<ItemProcessListenerProxy> retVal = new ArrayList<ItemProcessListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isItemProcessListener()) {
@@ -168,7 +170,7 @@ public class ListenerFactory {
}
public List<ItemReadListenerProxy> getItemReadListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<ItemReadListenerProxy> retVal = new ArrayList<ItemReadListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isItemReadListener()) {
@@ -182,7 +184,7 @@ public class ListenerFactory {
}
public List<ItemWriteListenerProxy> getItemWriteListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<ItemWriteListenerProxy> retVal = new ArrayList<ItemWriteListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isItemWriteListener()) {
@@ -196,7 +198,7 @@ public class ListenerFactory {
}
public List<RetryProcessListenerProxy> getRetryProcessListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<RetryProcessListenerProxy> retVal = new ArrayList<RetryProcessListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isRetryProcessListener()) {
@@ -210,7 +212,7 @@ public class ListenerFactory {
}
public List<RetryReadListenerProxy> getRetryReadListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<RetryReadListenerProxy> retVal = new ArrayList<RetryReadListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isRetryReadListener()) {
@@ -224,7 +226,7 @@ public class ListenerFactory {
}
public List<RetryWriteListenerProxy> getRetryWriteListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<RetryWriteListenerProxy> retVal = new ArrayList<RetryWriteListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isRetryWriteListener()) {
@@ -238,7 +240,7 @@ public class ListenerFactory {
}
public List<SkipProcessListenerProxy> getSkipProcessListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<SkipProcessListenerProxy> retVal = new ArrayList<SkipProcessListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isSkipProcessListener()) {
@@ -252,7 +254,7 @@ public class ListenerFactory {
}
public List<SkipReadListenerProxy> getSkipReadListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<SkipReadListenerProxy> retVal = new ArrayList<SkipReadListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isSkipReadListener()) {
@@ -266,7 +268,7 @@ public class ListenerFactory {
}
public List<SkipWriteListenerProxy> getSkipWriteListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<SkipWriteListenerProxy> retVal = new ArrayList<SkipWriteListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isSkipWriteListener()) {
@@ -280,7 +282,7 @@ public class ListenerFactory {
}
public List<StepListenerProxy> getStepListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(step, injectionRefs, execution);
+ final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<StepListenerProxy> retVal = new ArrayList<StepListenerProxy>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isStepListener()) {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/proxy/ProxyFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ProxyFactory.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ProxyFactory.java
index de27560..3a1c487 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ProxyFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/proxy/ProxyFactory.java
@@ -18,7 +18,6 @@ package org.apache.batchee.container.proxy;
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
-import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.spi.BatchArtifactFactory;
import javax.batch.api.Batchlet;
@@ -36,13 +35,12 @@ import javax.batch.api.partition.PartitionReducer;
* Introduce a level of indirection so proxies are not instantiated directly by newing them up.
*/
public class ProxyFactory {
- private static final BatchArtifactFactory ARTIFACT_FACTORY = ServicesManager.service(BatchArtifactFactory.class);
private static final ThreadLocal<InjectionReferences> INJECTION_CONTEXT = new ThreadLocal<InjectionReferences>();
- protected static Object loadArtifact(final String id, final InjectionReferences injectionReferences, final RuntimeJobExecution execution) {
+ protected static Object loadArtifact(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionReferences, final RuntimeJobExecution execution) {
INJECTION_CONTEXT.set(injectionReferences);
try {
- final BatchArtifactFactory.Instance instance = ARTIFACT_FACTORY.load(id);
+ final BatchArtifactFactory.Instance instance = factory.load(id);
if (instance == null) {
return null;
}
@@ -65,15 +63,15 @@ public class ProxyFactory {
/*
* Decider
*/
- public static DeciderProxy createDeciderProxy(final String id, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
- return new DeciderProxy(Decider.class.cast(loadArtifact(id, injectionRefs, execution)));
+ public static DeciderProxy createDeciderProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final RuntimeJobExecution execution) {
+ return new DeciderProxy(Decider.class.cast(loadArtifact(factory, id, injectionRefs, execution)));
}
/*
* Batchlet artifact
*/
- public static BatchletProxy createBatchletProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final Batchlet loadedArtifact = (Batchlet) loadArtifact(id, injectionRefs, execution);
+ public static BatchletProxy createBatchletProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+ final Batchlet loadedArtifact = (Batchlet) loadArtifact(factory, id, injectionRefs, execution);
final BatchletProxy proxy = new BatchletProxy(loadedArtifact);
proxy.setStepContext(stepContext);
return proxy;
@@ -83,29 +81,29 @@ public class ProxyFactory {
* The four main chunk-related artifacts
*/
- public static CheckpointAlgorithmProxy createCheckpointAlgorithmProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final CheckpointAlgorithm loadedArtifact = (CheckpointAlgorithm) loadArtifact(id, injectionRefs, execution);
+ public static CheckpointAlgorithmProxy createCheckpointAlgorithmProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+ final CheckpointAlgorithm loadedArtifact = (CheckpointAlgorithm) loadArtifact(factory, id, injectionRefs, execution);
final CheckpointAlgorithmProxy proxy = new CheckpointAlgorithmProxy(loadedArtifact);
proxy.setStepContext(stepContext);
return proxy;
}
- public static ItemReaderProxy createItemReaderProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final ItemReader loadedArtifact = (ItemReader) loadArtifact(id, injectionRefs, execution);
+ public static ItemReaderProxy createItemReaderProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+ final ItemReader loadedArtifact = (ItemReader) loadArtifact(factory, id, injectionRefs, execution);
final ItemReaderProxy proxy = new ItemReaderProxy(loadedArtifact);
proxy.setStepContext(stepContext);
return proxy;
}
- public static ItemProcessorProxy createItemProcessorProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final ItemProcessor loadedArtifact = (ItemProcessor) loadArtifact(id, injectionRefs, execution);
+ public static ItemProcessorProxy createItemProcessorProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+ final ItemProcessor loadedArtifact = (ItemProcessor) loadArtifact(factory, id, injectionRefs, execution);
final ItemProcessorProxy proxy = new ItemProcessorProxy(loadedArtifact);
proxy.setStepContext(stepContext);
return proxy;
}
- public static ItemWriterProxy createItemWriterProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final ItemWriter loadedArtifact = (ItemWriter) loadArtifact(id, injectionRefs, execution);
+ public static ItemWriterProxy createItemWriterProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+ final ItemWriter loadedArtifact = (ItemWriter) loadArtifact(factory, id, injectionRefs, execution);
final ItemWriterProxy proxy = new ItemWriterProxy(loadedArtifact);
proxy.setStepContext(stepContext);
return proxy;
@@ -115,29 +113,29 @@ public class ProxyFactory {
* The four partition-related artifacts
*/
- public static PartitionReducerProxy createPartitionReducerProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final PartitionReducer loadedArtifact = (PartitionReducer) loadArtifact(id, injectionRefs, execution);
+ public static PartitionReducerProxy createPartitionReducerProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+ final PartitionReducer loadedArtifact = (PartitionReducer) loadArtifact(factory, id, injectionRefs, execution);
final PartitionReducerProxy proxy = new PartitionReducerProxy(loadedArtifact);
proxy.setStepContext(stepContext);
return proxy;
}
- public static PartitionMapperProxy createPartitionMapperProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final PartitionMapper loadedArtifact = (PartitionMapper) loadArtifact(id, injectionRefs, execution);
+ public static PartitionMapperProxy createPartitionMapperProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+ final PartitionMapper loadedArtifact = (PartitionMapper) loadArtifact(factory, id, injectionRefs, execution);
final PartitionMapperProxy proxy = new PartitionMapperProxy(loadedArtifact);
proxy.setStepContext(stepContext);
return proxy;
}
- public static PartitionAnalyzerProxy createPartitionAnalyzerProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final PartitionAnalyzer loadedArtifact = (PartitionAnalyzer) loadArtifact(id, injectionRefs, execution);
+ public static PartitionAnalyzerProxy createPartitionAnalyzerProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+ final PartitionAnalyzer loadedArtifact = (PartitionAnalyzer) loadArtifact(factory, id, injectionRefs, execution);
final PartitionAnalyzerProxy proxy = new PartitionAnalyzerProxy(loadedArtifact);
proxy.setStepContext(stepContext);
return proxy;
}
- public static PartitionCollectorProxy createPartitionCollectorProxy(final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
- final PartitionCollector loadedArtifact = (PartitionCollector) loadArtifact(id, injectionRefs, execution);
+ public static PartitionCollectorProxy createPartitionCollectorProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs, final StepContextImpl stepContext, final RuntimeJobExecution execution) {
+ final PartitionCollector loadedArtifact = (PartitionCollector) loadArtifact(factory, id, injectionRefs, execution);
final PartitionCollectorProxy proxy = new PartitionCollectorProxy(loadedArtifact);
proxy.setStepContext(stepContext);
return proxy;
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java b/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
index 46cfaac..8418127 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
@@ -40,10 +40,7 @@ import org.apache.batchee.spi.TransactionManagementService;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
+import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -82,8 +79,8 @@ public class ServicesManager implements BatchContainerConstants {
servicesManagerLocator = locator;
}
- public static <T extends BatchService> T service(final Class<T> api) {
- return api.cast(Proxy.newProxyInstance(ServicesManager.class.getClassLoader(), new Class<?>[]{ api }, new ServiceHandler<T>(api)));
+ public static ServicesManager find() {
+ return servicesManagerLocator.find();
}
public static String value(final String key, final String defaultValue) {
@@ -149,7 +146,7 @@ public class ServicesManager implements BatchContainerConstants {
}
}
- private <T extends BatchService> T getService(final Class<T> clazz) throws BatchContainerServiceException {
+ public <T extends BatchService> T service(final Class<T> clazz) throws BatchContainerServiceException {
T service = clazz.cast(serviceRegistry.get(clazz.getName()));
if (service == null) {
// Probably don't want to be loading two on two different threads so lock the whole table.
@@ -192,9 +189,15 @@ public class ServicesManager implements BatchContainerConstants {
return service;
}
- private static <T> T load(final Class<T> expected, final String className) throws Exception {
+ private <T> T load(final Class<T> expected, final String className) throws Exception {
final Class<?> cls = Class.forName(className);
if (cls != null) {
+ try {
+ final Constructor<?> constructor = cls.getConstructor(ServicesManager.class);
+ return expected.cast(constructor.newInstance(this));
+ } catch (final Throwable th) {
+ // try no arg constructor
+ }
if (cls.getConstructor() != null) {
return expected.cast(cls.newInstance());
}
@@ -202,24 +205,5 @@ public class ServicesManager implements BatchContainerConstants {
}
throw new Exception("Exception loading Service class " + className + " make sure it exists");
}
-
- // just an handler getting the right service instance using the ServicesManagerLocator
- private static class ServiceHandler<T extends BatchService> implements InvocationHandler {
- private final Class<T> service;
-
- public ServiceHandler(final Class<T> api) {
- this.service = api;
- }
-
- @Override
- public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
- final T instance = servicesManagerLocator.find().getService(service);
- try {
- return method.invoke(instance, args);
- } catch (final InvocationTargetException ite) {
- throw ite.getCause();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
index b44a0b4..7dc488a 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
@@ -23,6 +23,7 @@ import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.services.BatchKernelService;
import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.container.services.JobStatusManagerService;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.BatchFlowInSplitWorkUnit;
import org.apache.batchee.container.util.BatchPartitionWorkUnit;
@@ -56,10 +57,12 @@ public class DefaultBatchKernel implements BatchKernelService {
private final BatchThreadPoolService executorService;
private final PersistenceManagerService persistenceService;
+ private final ServicesManager servicesManager;
- public DefaultBatchKernel() {
- executorService = ServicesManager.service(BatchThreadPoolService.class);
- persistenceService = ServicesManager.service(PersistenceManagerService.class);
+ public DefaultBatchKernel(final ServicesManager servicesManager) {
+ this.servicesManager = servicesManager;
+ executorService = servicesManager.service(BatchThreadPoolService.class);
+ persistenceService = servicesManager.service(PersistenceManagerService.class);
}
@Override
@@ -69,11 +72,11 @@ public class DefaultBatchKernel implements BatchKernelService {
@Override
public InternalJobExecution startJob(final String jobXML, final Properties jobParameters) throws JobStartException {
- final RuntimeJobExecution jobExecution = JobExecutionHelper.startJob(jobXML, jobParameters);
+ final RuntimeJobExecution jobExecution = JobExecutionHelper.startJob(servicesManager, jobXML, jobParameters);
// TODO - register with status manager
- final BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+ final BatchWorkUnit batchWork = new BatchWorkUnit(servicesManager, jobExecution);
registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
executorService.executeTask(batchWork, null);
@@ -93,8 +96,8 @@ public class DefaultBatchKernel implements BatchKernelService {
@Override
public InternalJobExecution restartJob(final long executionId, final Properties jobOverrideProps) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
- final RuntimeJobExecution jobExecution = JobExecutionHelper.restartJob(executionId, jobOverrideProps);
- final BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+ final RuntimeJobExecution jobExecution = JobExecutionHelper.restartJob(servicesManager, executionId, jobOverrideProps);
+ final BatchWorkUnit batchWork = new BatchWorkUnit(servicesManager, jobExecution);
registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
@@ -124,7 +127,7 @@ public class DefaultBatchKernel implements BatchKernelService {
}
public InternalJobExecution getJobExecution(final long executionId) throws NoSuchJobExecutionException {
- return JobExecutionHelper.getPersistedJobOperatorJobExecution(executionId);
+ return JobExecutionHelper.getPersistedJobOperatorJobExecution(servicesManager.service(PersistenceManagerService.class), executionId);
}
@Override
@@ -139,7 +142,7 @@ public class DefaultBatchKernel implements BatchKernelService {
@Override
public JobInstance getJobInstance(final long executionId) {
- return JobExecutionHelper.getJobInstance(executionId);
+ return JobExecutionHelper.getJobInstance(servicesManager.service(JobStatusManagerService.class), executionId);
}
@@ -158,10 +161,10 @@ public class DefaultBatchKernel implements BatchKernelService {
int instance = 0;
for (final JSLJob parallelJob : jobModels) {
final Properties partitionProps = (partitionPropertiesArray == null) ? null : partitionPropertiesArray[instance];
- final RuntimeJobExecution jobExecution = JobExecutionHelper.startPartition(parallelJob, partitionProps);
+ final RuntimeJobExecution jobExecution = JobExecutionHelper.startPartition(servicesManager, parallelJob, partitionProps);
jobExecution.setPartitionInstance(instance);
- final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+ final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(jobExecution, config, servicesManager);
registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
@@ -190,13 +193,13 @@ public class DefaultBatchKernel implements BatchKernelService {
final long execId = getMostRecentExecutionId(parallelJob);
final RuntimeJobExecution jobExecution;
try {
- jobExecution = JobExecutionHelper.restartPartition(execId, parallelJob, partitionProps);
+ jobExecution = JobExecutionHelper.restartPartition(servicesManager, execId, parallelJob, partitionProps);
jobExecution.setPartitionInstance(instance);
} catch (final NoSuchJobExecutionException e) {
throw new IllegalStateException("Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId, e);
}
- final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+ final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(jobExecution, config, servicesManager);
registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
batchWorkUnits.add(batchWork);
@@ -219,8 +222,8 @@ public class DefaultBatchKernel implements BatchKernelService {
public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(final FlowInSplitBuilderConfig config) {
final JSLJob parallelJob = config.getJobModel();
- final RuntimeFlowInSplitExecution execution = JobExecutionHelper.startFlowInSplit(parallelJob);
- final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, execution, config);
+ final RuntimeFlowInSplitExecution execution = JobExecutionHelper.startFlowInSplit(servicesManager, parallelJob);
+ final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(execution, config, servicesManager);
registerCurrentInstanceAndExecution(execution, batchWork.getController());
return batchWork;
@@ -256,12 +259,12 @@ public class DefaultBatchKernel implements BatchKernelService {
final long execId = getMostRecentExecutionId(jobModel);
final RuntimeFlowInSplitExecution jobExecution;
try {
- jobExecution = JobExecutionHelper.restartFlowInSplit(execId, jobModel);
+ jobExecution = JobExecutionHelper.restartFlowInSplit(servicesManager, execId, jobModel);
} catch (final NoSuchJobExecutionException e) {
throw new IllegalStateException("Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId, e);
}
- final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, jobExecution, config);
+ final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(jobExecution, config, servicesManager);
registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
return batchWork;
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java b/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
index dbea45f..3255ff9 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
@@ -1,13 +1,12 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Copyright 2012,2013 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
index b99e881..056b05e 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
@@ -934,7 +934,7 @@ public class JDBCPersistenceManager implements PersistenceManagerService {
final String jobName = rs.getString(dictionary.jobInstanceColumns(3));
- final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instanceId);
+ final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instanceId, this);
jobEx.setCreateTime(createtime);
jobEx.setStartTime(starttime);
jobEx.setEndTime(endtime);
@@ -979,7 +979,7 @@ public class JDBCPersistenceManager implements PersistenceManagerService {
final String exitStatus = rs.getString(dictionary.jobExecutionColumns(4));
final String jobName = rs.getString(dictionary.jobInstanceColumns(3));
- final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, jobInstanceId);
+ final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, jobInstanceId, this);
jobEx.setCreateTime(createtime);
jobEx.setStartTime(starttime);
jobEx.setEndTime(endtime);
@@ -1154,7 +1154,7 @@ public class JDBCPersistenceManager implements PersistenceManagerService {
public RuntimeJobExecution createJobExecution(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus) {
final Timestamp now = new Timestamp(System.currentTimeMillis());
final long newExecutionId = createRuntimeJobExecutionEntry(jobInstance, jobParameters, batchStatus, now);
- final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, newExecutionId);
+ final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, newExecutionId, this);
jobExecution.setBatchStatus(batchStatus.name());
jobExecution.setCreateTime(now);
jobExecution.setLastUpdateTime(now);
@@ -1195,7 +1195,7 @@ public class JDBCPersistenceManager implements PersistenceManagerService {
public RuntimeFlowInSplitExecution createFlowInSplitExecution(final JobInstance jobInstance, final BatchStatus batchStatus) {
final Timestamp now = new Timestamp(System.currentTimeMillis());
final long newExecutionId = createRuntimeJobExecutionEntry(jobInstance, null, batchStatus, now);
- final RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, newExecutionId);
+ final RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, newExecutionId, this);
flowExecution.setBatchStatus(batchStatus.name());
flowExecution.setCreateTime(now);
flowExecution.setLastUpdateTime(now);
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
index 8c0f6f0..5d8a4fe 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
@@ -344,7 +344,7 @@ public class JPAPersistenceService implements PersistenceManagerService {
final List<InternalJobExecution> result = new ArrayList<InternalJobExecution>(list.size());
for (final JobExecutionEntity entity : list) {
- final JobExecutionImpl jobEx = new JobExecutionImpl(entity.getExecutionId(), jobInstanceId);
+ final JobExecutionImpl jobEx = new JobExecutionImpl(entity.getExecutionId(), jobInstanceId, this);
jobEx.setCreateTime(entity.getCreateTime());
jobEx.setStartTime(entity.getStartTime());
jobEx.setEndTime(entity.getEndTime());
@@ -390,7 +390,7 @@ public class JPAPersistenceService implements PersistenceManagerService {
try {
final JobExecutionEntity instance = em.find(JobExecutionEntity.class, jobExecutionId);
- final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instance.getInstance().getJobInstanceId());
+ final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instance.getInstance().getJobInstanceId(), this);
jobEx.setCreateTime(instance.getCreateTime());
jobEx.setStartTime(instance.getStartTime());
jobEx.setEndTime(instance.getEndTime());
@@ -496,7 +496,7 @@ public class JPAPersistenceService implements PersistenceManagerService {
em.persist(instance);
txProvider.commit(tx);
- final RuntimeFlowInSplitExecution jobExecution = new RuntimeFlowInSplitExecution(jobInstance, instance.getExecutionId());
+ final RuntimeFlowInSplitExecution jobExecution = new RuntimeFlowInSplitExecution(jobInstance, instance.getExecutionId(), this);
jobExecution.setBatchStatus(batchStatus.name());
jobExecution.setCreateTime(instance.getCreateTime());
jobExecution.setLastUpdateTime(instance.getCreateTime());
@@ -561,7 +561,7 @@ public class JPAPersistenceService implements PersistenceManagerService {
em.persist(execution);
txProvider.commit(tx);
- final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, execution.getExecutionId());
+ final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, execution.getExecutionId(), this);
jobExecution.setBatchStatus(batchStatus.name());
jobExecution.setCreateTime(execution.getCreateTime());
jobExecution.setLastUpdateTime(execution.getCreateTime());
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManager.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManager.java
index 48e9ee5..0fbc623 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManager.java
@@ -51,7 +51,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
@@ -324,7 +323,7 @@ public class MemoryPersistenceManager implements PersistenceManagerService {
final Structures.ExecutionInstanceData executionInstanceData = createRuntimeJobExecutionEntry(jobInstance, jobParameters, batchStatus, now);
executionInstanceData.execution.setJobName(jobInstance.getJobName());
- final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, executionInstanceData.execution.getExecutionId());
+ final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, executionInstanceData.execution.getExecutionId(), this);
jobExecution.setBatchStatus(batchStatus.name());
jobExecution.setCreateTime(now);
jobExecution.setLastUpdateTime(now);
@@ -335,7 +334,7 @@ public class MemoryPersistenceManager implements PersistenceManagerService {
private Structures.ExecutionInstanceData createRuntimeJobExecutionEntry(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus, final Timestamp now) {
final Structures.ExecutionInstanceData executionInstanceData = new Structures.ExecutionInstanceData();
final long id = data.executionInstanceIdGenerator.getAndIncrement();
- executionInstanceData.execution = new JobExecutionImpl(id, jobInstance.getInstanceId());
+ executionInstanceData.execution = new JobExecutionImpl(id, jobInstance.getInstanceId(), this);
executionInstanceData.execution.setExecutionId(id);
executionInstanceData.execution.setInstanceId(jobInstance.getInstanceId());
executionInstanceData.execution.setBatchStatus(batchStatus.name());
@@ -608,7 +607,7 @@ public class MemoryPersistenceManager implements PersistenceManagerService {
public RuntimeFlowInSplitExecution createFlowInSplitExecution(final JobInstance jobInstance, final BatchStatus batchStatus) {
final Timestamp now = new Timestamp(System.currentTimeMillis());
final Structures.ExecutionInstanceData executionInstanceData = createRuntimeJobExecutionEntry(jobInstance, null, batchStatus, now);
- final RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, executionInstanceData.execution.getExecutionId());
+ final RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, executionInstanceData.execution.getExecutionId(), this);
flowExecution.setBatchStatus(batchStatus.name());
flowExecution.setCreateTime(now);
flowExecution.setLastUpdateTime(now);
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/services/status/DefaultJobStatusManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/status/DefaultJobStatusManager.java b/jbatch/src/main/java/org/apache/batchee/container/services/status/DefaultJobStatusManager.java
index f7a71b2..c0e15e6 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/status/DefaultJobStatusManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/status/DefaultJobStatusManager.java
@@ -18,10 +18,10 @@ package org.apache.batchee.container.services.status;
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.container.services.JobStatusManagerService;
-import org.apache.batchee.spi.PersistenceManagerService;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.JobStatus;
import org.apache.batchee.container.status.StepStatus;
+import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.runtime.BatchStatus;
import java.util.Properties;
@@ -29,6 +29,10 @@ import java.util.Properties;
public class DefaultJobStatusManager implements JobStatusManagerService {
private PersistenceManagerService persistenceManager;
+ public DefaultJobStatusManager(final ServicesManager servicesManager) {
+ this.persistenceManager = servicesManager.service(PersistenceManagerService.class);
+ }
+
@Override
public JobStatus createJobStatus(long jobInstanceId) throws BatchContainerServiceException {
return persistenceManager.createJobStatus(jobInstanceId);
@@ -117,7 +121,7 @@ public class DefaultJobStatusManager implements JobStatusManagerService {
@Override
public void init(final Properties batchConfig) throws BatchContainerServiceException {
- persistenceManager = ServicesManager.service(PersistenceManagerService.class);
+ // no-op
}
/*
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/util/BatchFlowInSplitWorkUnit.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/util/BatchFlowInSplitWorkUnit.java b/jbatch/src/main/java/org/apache/batchee/container/util/BatchFlowInSplitWorkUnit.java
index 2f2bd7c..6f8fb90 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/util/BatchFlowInSplitWorkUnit.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/util/BatchFlowInSplitWorkUnit.java
@@ -19,18 +19,19 @@ package org.apache.batchee.container.util;
import org.apache.batchee.container.impl.controller.FlowInSplitThreadRootController;
import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
import java.util.concurrent.BlockingQueue;
public class BatchFlowInSplitWorkUnit extends BatchParallelWorkUnit {
protected final BlockingQueue<BatchFlowInSplitWorkUnit> completedThreadQueue;
- public BatchFlowInSplitWorkUnit(final BatchKernelService batchKernelService,
- final RuntimeFlowInSplitExecution jobExecution,
- final FlowInSplitBuilderConfig config) {
- super(batchKernelService, jobExecution, true);
+ public BatchFlowInSplitWorkUnit(final RuntimeFlowInSplitExecution jobExecution,
+ final FlowInSplitBuilderConfig config,
+ final ServicesManager manager) {
+ super(jobExecution, true, manager);
this.completedThreadQueue = config.getCompletedQueue();
- this.controller = new FlowInSplitThreadRootController(jobExecution, config);
+ this.controller = new FlowInSplitThreadRootController(jobExecution, config, manager);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/util/BatchParallelWorkUnit.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/util/BatchParallelWorkUnit.java b/jbatch/src/main/java/org/apache/batchee/container/util/BatchParallelWorkUnit.java
index f25a4c7..2aeb1db 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/util/BatchParallelWorkUnit.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/util/BatchParallelWorkUnit.java
@@ -18,6 +18,7 @@ package org.apache.batchee.container.util;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
/*
* I took out the 'work type' constant since I don't see that we want to use
@@ -26,7 +27,8 @@ import org.apache.batchee.container.services.BatchKernelService;
* perspective, as it returns a 'success' boolean.
*/
public abstract class BatchParallelWorkUnit extends BatchWorkUnit {
- public BatchParallelWorkUnit(final BatchKernelService batchKernel, final RuntimeJobExecution jobExecutionImpl, final boolean notifyCallbackWhenDone) {
- super(batchKernel, jobExecutionImpl, notifyCallbackWhenDone);
+ public BatchParallelWorkUnit(final RuntimeJobExecution jobExecutionImpl,
+ final boolean notifyCallbackWhenDone, final ServicesManager manager) {
+ super(manager, jobExecutionImpl, notifyCallbackWhenDone);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/util/BatchPartitionWorkUnit.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/util/BatchPartitionWorkUnit.java b/jbatch/src/main/java/org/apache/batchee/container/util/BatchPartitionWorkUnit.java
index f139c08..cde223f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/util/BatchPartitionWorkUnit.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/util/BatchPartitionWorkUnit.java
@@ -19,18 +19,19 @@ package org.apache.batchee.container.util;
import org.apache.batchee.container.impl.controller.PartitionThreadRootController;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
import java.util.concurrent.BlockingQueue;
public class BatchPartitionWorkUnit extends BatchParallelWorkUnit {
protected final BlockingQueue<BatchPartitionWorkUnit> completedThreadQueue;
- public BatchPartitionWorkUnit(BatchKernelService batchKernelService,
- RuntimeJobExecution jobExecution,
- PartitionsBuilderConfig config) {
- super(batchKernelService, jobExecution, true);
+ public BatchPartitionWorkUnit(final RuntimeJobExecution jobExecution,
+ final PartitionsBuilderConfig config,
+ final ServicesManager manager) {
+ super(jobExecution, true, manager);
this.completedThreadQueue = config.getCompletedQueue();
- this.controller = new PartitionThreadRootController(jobExecution, config);
+ this.controller = new PartitionThreadRootController(jobExecution, config, manager);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/util/BatchWorkUnit.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/util/BatchWorkUnit.java b/jbatch/src/main/java/org/apache/batchee/container/util/BatchWorkUnit.java
index 3ddf59b..096e78f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/util/BatchWorkUnit.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/util/BatchWorkUnit.java
@@ -21,6 +21,7 @@ import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.impl.controller.JobController;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
import javax.batch.runtime.BatchStatus;
import java.io.PrintWriter;
@@ -39,16 +40,16 @@ public class BatchWorkUnit implements Runnable {
protected boolean notifyCallbackWhenDone;
- public BatchWorkUnit(final BatchKernelService batchKernel, final RuntimeJobExecution jobExecutionImpl) {
- this(batchKernel, jobExecutionImpl, true);
+ public BatchWorkUnit(final ServicesManager manager, final RuntimeJobExecution jobExecutionImpl) {
+ this(manager, jobExecutionImpl, true);
}
- public BatchWorkUnit(final BatchKernelService batchKernel, final RuntimeJobExecution jobExecutionImpl,
+ public BatchWorkUnit(final ServicesManager manager, final RuntimeJobExecution jobExecutionImpl,
final boolean notifyCallbackWhenDone) {
- this.setBatchKernel(batchKernel);
+ this.setBatchKernel(manager.service(BatchKernelService.class));
this.setJobExecutionImpl(jobExecutionImpl);
this.setNotifyCallbackWhenDone(notifyCallbackWhenDone);
- this.controller = new JobController(jobExecutionImpl);
+ this.controller = new JobController(jobExecutionImpl, manager);
}
public ThreadRootController getController() {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/servlet/CleanUpWebappListener.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/servlet/CleanUpWebappListener.java b/jbatch/src/main/java/org/apache/batchee/servlet/CleanUpWebappListener.java
index 75baa24..9beef4c 100644
--- a/jbatch/src/main/java/org/apache/batchee/servlet/CleanUpWebappListener.java
+++ b/jbatch/src/main/java/org/apache/batchee/servlet/CleanUpWebappListener.java
@@ -36,7 +36,7 @@ public class CleanUpWebappListener implements ServletContextListener {
@Override
public void contextDestroyed(final ServletContextEvent sce) {
- final BatchThreadPoolService threadPoolService = ServicesManager.service(BatchThreadPoolService.class);
+ final BatchThreadPoolService threadPoolService = ServicesManager.find().service(BatchThreadPoolService.class);
if (CleanUpWebappListener.class.getClassLoader() == sce.getServletContext().getClassLoader()) {
threadPoolService.shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/test/java/org/apache/batchee/test/jmx/JMXTest.java
----------------------------------------------------------------------
diff --git a/jbatch/src/test/java/org/apache/batchee/test/jmx/JMXTest.java b/jbatch/src/test/java/org/apache/batchee/test/jmx/JMXTest.java
index 0c454fd..39bcfb6 100644
--- a/jbatch/src/test/java/org/apache/batchee/test/jmx/JMXTest.java
+++ b/jbatch/src/test/java/org/apache/batchee/test/jmx/JMXTest.java
@@ -56,7 +56,7 @@ public class JMXTest {
}
private static void clearPersistence(final JobOperator jobOperator) {
- final PersistenceManagerService service = ServicesManager.service(PersistenceManagerService.class);
+ final PersistenceManagerService service = ServicesManager.find().service(PersistenceManagerService.class);
for (final String name : jobOperator.getJobNames()) {
for (final JobInstance id : jobOperator.getJobInstances(name, 0, Integer.MAX_VALUE)) {
service.cleanUp(id.getInstanceId());
@@ -66,7 +66,7 @@ public class JMXTest {
@AfterClass
public static void deleteJob() throws Exception {
- ServicesManager.service(PersistenceManagerService.class).cleanUp(id);
+ ServicesManager.find().service(PersistenceManagerService.class).cleanUp(id);
}
private static Object attr(final String name) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/test/resources/suites/dev-suite.xml
----------------------------------------------------------------------
diff --git a/jbatch/src/test/resources/suites/dev-suite.xml b/jbatch/src/test/resources/suites/dev-suite.xml
index f497102..4c30caa 100644
--- a/jbatch/src/test/resources/suites/dev-suite.xml
+++ b/jbatch/src/test/resources/suites/dev-suite.xml
@@ -27,11 +27,7 @@
<classes>
<class name="com.ibm.jbatch.tck.tests.jslxml.ChunkTests">
<methods>
- <include name="testChunkSkipReadNoSkipChildEx"/>
- <!--
- <include name="testChunkRetryMultipleExceptions"/>
- <include name="testChunkSkipMultipleExceptions"/>
- -->
+ <include name="testChunkOnErrorListener"/>
</methods>
</class>
</classes>
[2/2] git commit: BATCHEE-9 ServicesManager shouldn't be contextual
once an action started
Posted by rm...@apache.org.
BATCHEE-9 ServicesManager shouldn't be contextual once an action started
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/8ddf9f75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/8ddf9f75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/8ddf9f75
Branch: refs/heads/master
Commit: 8ddf9f754b731efe18ec48aedcc3cb0aa98814d3
Parents: b303e37
Author: Romain Manni-Bucau <rm...@apache.org>
Authored: Thu Jan 2 10:27:34 2014 +0100
Committer: Romain Manni-Bucau <rm...@apache.org>
Committed: Thu Jan 2 10:27:34 2014 +0100
----------------------------------------------------------------------
.../container/impl/JobExecutionImpl.java | 18 ++--
.../batchee/container/impl/JobOperatorImpl.java | 93 +++++++++++---------
.../impl/controller/BaseStepController.java | 42 +++++----
.../impl/controller/DecisionController.java | 11 ++-
.../ExecutionElementControllerFactory.java | 25 +++---
.../impl/controller/ExecutionTransitioner.java | 20 +++--
.../impl/controller/FlowController.java | 7 +-
.../FlowInSplitThreadRootController.java | 5 +-
.../impl/controller/JobController.java | 9 +-
.../controller/JobThreadRootController.java | 21 +++--
.../PartitionThreadRootController.java | 5 +-
.../controller/PartitionedStepController.java | 30 ++++---
.../SingleThreadedStepController.java | 14 ++-
.../impl/controller/SplitController.java | 6 +-
.../batchlet/BatchletStepController.java | 11 ++-
.../chunk/CheckpointAlgorithmFactory.java | 5 +-
.../controller/chunk/CheckpointManager.java | 5 +-
.../controller/chunk/ChunkStepController.java | 45 +++++-----
.../impl/jobinstance/JobExecutionHelper.java | 92 ++++++++++---------
.../RuntimeFlowInSplitExecution.java | 6 +-
.../impl/jobinstance/RuntimeJobExecution.java | 5 +-
.../container/proxy/ListenerFactory.java | 54 ++++++------
.../batchee/container/proxy/ProxyFactory.java | 46 +++++-----
.../container/services/ServicesManager.java | 38 +++-----
.../services/kernel/DefaultBatchKernel.java | 37 ++++----
.../services/locator/ClassLoaderLocator.java | 15 ++--
.../persistence/JDBCPersistenceManager.java | 8 +-
.../persistence/JPAPersistenceService.java | 8 +-
.../persistence/MemoryPersistenceManager.java | 7 +-
.../status/DefaultJobStatusManager.java | 8 +-
.../util/BatchFlowInSplitWorkUnit.java | 11 +--
.../container/util/BatchParallelWorkUnit.java | 6 +-
.../container/util/BatchPartitionWorkUnit.java | 11 +--
.../batchee/container/util/BatchWorkUnit.java | 11 +--
.../batchee/servlet/CleanUpWebappListener.java | 2 +-
.../org/apache/batchee/test/jmx/JMXTest.java | 4 +-
jbatch/src/test/resources/suites/dev-suite.xml | 6 +-
37 files changed, 405 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
index 0767168..68ecb5e 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobExecutionImpl.java
@@ -19,7 +19,6 @@ package org.apache.batchee.container.impl;
import org.apache.batchee.container.services.InternalJobExecution;
import org.apache.batchee.spi.PersistenceManagerService;
import org.apache.batchee.spi.PersistenceManagerService.TimestampType;
-import org.apache.batchee.container.services.ServicesManager;
import javax.batch.runtime.BatchStatus;
import java.sql.Timestamp;
@@ -27,7 +26,7 @@ import java.util.Date;
import java.util.Properties;
public class JobExecutionImpl implements InternalJobExecution {
- private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
+ private final PersistenceManagerService persistenceManagerService;
private long executionID = 0L;
private long instanceID = 0L;
@@ -50,7 +49,8 @@ public class JobExecutionImpl implements InternalJobExecution {
this.jobContext = jobContext;
}
- public JobExecutionImpl(long executionId, long instanceId) {
+ public JobExecutionImpl(final long executionId, final long instanceId, final PersistenceManagerService persistenceManagerService) {
+ this.persistenceManagerService = persistenceManagerService;
this.executionID = executionId;
this.instanceID = instanceId;
}
@@ -61,7 +61,7 @@ public class JobExecutionImpl implements InternalJobExecution {
return this.jobContext.getBatchStatus();
} else {
// old job, retrieve from the backend
- final String name = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionBatchStatus(executionID);
+ final String name = persistenceManagerService.jobOperatorQueryJobExecutionBatchStatus(executionID);
if (name != null) {
return BatchStatus.valueOf(name);
}
@@ -71,7 +71,7 @@ public class JobExecutionImpl implements InternalJobExecution {
@Override
public Date getCreateTime() {
- final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.CREATE);
+ final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.CREATE);
if (ts != null) {
createTime = ts;
}
@@ -84,7 +84,7 @@ public class JobExecutionImpl implements InternalJobExecution {
@Override
public Date getEndTime() {
- final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.END);
+ final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.END);
if (ts != null) {
endTime = ts;
}
@@ -106,7 +106,7 @@ public class JobExecutionImpl implements InternalJobExecution {
return this.jobContext.getExitStatus();
}
- final String persistenceExitStatus = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionExitStatus(executionID);
+ final String persistenceExitStatus = persistenceManagerService.jobOperatorQueryJobExecutionExitStatus(executionID);
if (persistenceExitStatus != null) {
exitStatus = persistenceExitStatus;
}
@@ -116,7 +116,7 @@ public class JobExecutionImpl implements InternalJobExecution {
@Override
public Date getLastUpdatedTime() {
- final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.LAST_UPDATED);
+ final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.LAST_UPDATED);
if (ts != null) {
this.updateTime = ts;
}
@@ -129,7 +129,7 @@ public class JobExecutionImpl implements InternalJobExecution {
@Override
public Date getStartTime() {
- final Timestamp ts = PERSISTENCE_MANAGER_SERVICE.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.STARTED);
+ final Timestamp ts = persistenceManagerService.jobOperatorQueryJobExecutionTimestamp(executionID, TimestampType.STARTED);
if (ts != null) {
startTime = ts;
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
index c970534..28fab55 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
@@ -101,15 +101,24 @@ public class JobOperatorImpl implements JobOperator {
public static final String JBATCH_ADMIN = "admin";
- private static final BatchKernelService KERNEL_SERVICE = ServicesManager.service(BatchKernelService.class);
- private static final PersistenceManagerService PERSISTENCE_SERVICE = ServicesManager.service(PersistenceManagerService.class);
- private static final JobXMLLoaderService XML_LOADER_SERVICE = ServicesManager.service(JobXMLLoaderService.class);
- private static final JobStatusManagerService STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
- private static final SecurityService SECURITY_SERVICE= ServicesManager.service(SecurityService.class);
+ private final BatchKernelService kernelService;
+ private final PersistenceManagerService persistenceManagerService;
+ private final JobXMLLoaderService xmlLoaderService;
+ private final JobStatusManagerService statusManagerService;
+ private final SecurityService securityService;
+
+ public JobOperatorImpl() {
+ final ServicesManager servicesManager = ServicesManager.find();
+ kernelService = servicesManager.service(BatchKernelService.class);
+ persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+ xmlLoaderService = servicesManager.service(JobXMLLoaderService.class);
+ statusManagerService = servicesManager.service(JobStatusManagerService.class);
+ securityService = servicesManager.service(SecurityService.class);
+ }
@Override
public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(Permissions.START.name)) {
+ if (!securityService.isAuthorized(Permissions.START.name)) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
@@ -138,18 +147,18 @@ public class JobOperatorImpl implements JobOperator {
jobParameterWriter.write("Job parameters on start = null");
}
- final String jobXML = XML_LOADER_SERVICE.loadJSL(jobXMLName);
- final InternalJobExecution execution = KERNEL_SERVICE.startJob(jobXML, jobParameters);
+ final String jobXML = xmlLoaderService.loadJSL(jobXMLName);
+ final InternalJobExecution execution = kernelService.startJob(jobXML, jobParameters);
return execution.getExecutionId();
}
@Override
public void abandon(final long executionId) throws NoSuchJobExecutionException, JobExecutionIsRunningException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+ if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
- final InternalJobExecution jobEx = PERSISTENCE_SERVICE.jobOperatorGetJobExecution(executionId);
+ final InternalJobExecution jobEx = persistenceManagerService.jobOperatorGetJobExecution(executionId);
// if it is not in STARTED or STARTING state, mark it as ABANDONED
if (jobEx.getBatchStatus().equals(BatchStatus.STARTED) || jobEx.getBatchStatus().equals(BatchStatus.STARTING)) {
@@ -157,31 +166,31 @@ public class JobOperatorImpl implements JobOperator {
}
// update table to reflect ABANDONED state
- PERSISTENCE_SERVICE.updateBatchStatusOnly(jobEx.getExecutionId(), BatchStatus.ABANDONED, new Timestamp(System.currentTimeMillis()));
+ persistenceManagerService.updateBatchStatusOnly(jobEx.getExecutionId(), BatchStatus.ABANDONED, new Timestamp(System.currentTimeMillis()));
// Don't forget to update JOBSTATUS table
- STATUS_MANAGER_SERVICE.updateJobBatchStatus(jobEx.getInstanceId(), BatchStatus.ABANDONED);
+ statusManagerService.updateJobBatchStatus(jobEx.getInstanceId(), BatchStatus.ABANDONED);
}
@Override
public InternalJobExecution getJobExecution(final long executionId)
throws NoSuchJobExecutionException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+ if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
- return KERNEL_SERVICE.getJobExecution(executionId);
+ return kernelService.getJobExecution(executionId);
}
@Override
public List<JobExecution> getJobExecutions(final JobInstance instance)
throws NoSuchJobInstanceException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(instance.getInstanceId())) {
+ if (!securityService.isAuthorized(instance.getInstanceId())) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
// Mediate between one
final List<JobExecution> executions = new ArrayList<JobExecution>();
- List<InternalJobExecution> executionImpls = PERSISTENCE_SERVICE.jobOperatorGetJobExecutions(instance.getInstanceId());
+ List<InternalJobExecution> executionImpls = persistenceManagerService.jobOperatorGetJobExecutions(instance.getInstanceId());
if (executionImpls.size() == 0) {
throw new NoSuchJobInstanceException("Job: " + instance.getJobName() + " does not exist");
}
@@ -194,20 +203,20 @@ public class JobOperatorImpl implements JobOperator {
@Override
public JobInstance getJobInstance(long executionId)
throws NoSuchJobExecutionException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+ if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
- return KERNEL_SERVICE.getJobInstance(executionId);
+ return kernelService.getJobInstance(executionId);
}
@Override
public int getJobInstanceCount(String jobName) throws NoSuchJobException, JobSecurityException {
int jobInstanceCount;
- if (SECURITY_SERVICE.isAuthorized(JBATCH_ADMIN)) {
+ if (securityService.isAuthorized(JBATCH_ADMIN)) {
// Do an unfiltered query
- jobInstanceCount = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceCount(jobName);
+ jobInstanceCount = persistenceManagerService.jobOperatorGetJobInstanceCount(jobName);
} else {
- jobInstanceCount = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceCount(jobName, SECURITY_SERVICE.getLoggedUser());
+ jobInstanceCount = persistenceManagerService.jobOperatorGetJobInstanceCount(jobName, securityService.getLoggedUser());
}
if (jobInstanceCount > 0) {
@@ -229,11 +238,11 @@ public class JobOperatorImpl implements JobOperator {
}
final List<Long> instanceIds;
- if (SECURITY_SERVICE.isAuthorized(JBATCH_ADMIN)) {
+ if (securityService.isAuthorized(JBATCH_ADMIN)) {
// Do an unfiltered query
- instanceIds = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceIds(jobName, start, count);
+ instanceIds = persistenceManagerService.jobOperatorGetJobInstanceIds(jobName, start, count);
} else {
- instanceIds = PERSISTENCE_SERVICE.jobOperatorGetJobInstanceIds(jobName, SECURITY_SERVICE.getLoggedUser(), start, count);
+ instanceIds = persistenceManagerService.jobOperatorGetJobInstanceIds(jobName, securityService.getLoggedUser(), start, count);
}
// get the jobinstance ids associated with this job name
@@ -242,9 +251,9 @@ public class JobOperatorImpl implements JobOperator {
// for every job instance id
for (long id : instanceIds) {
// get the job instance obj, add it to the list
- final JobStatus jobStatus = STATUS_MANAGER_SERVICE.getJobStatus(id);
+ final JobStatus jobStatus = statusManagerService.getJobStatus(id);
final JobInstance jobInstance = jobStatus.getJobInstance();
- if (SECURITY_SERVICE.isAuthorized(jobInstance.getInstanceId())) {
+ if (securityService.isAuthorized(jobInstance.getInstanceId())) {
jobInstances.add(jobInstance);
}
}
@@ -262,10 +271,10 @@ public class JobOperatorImpl implements JobOperator {
@Override
public Set<String> getJobNames() throws JobSecurityException {
final Set<String> jobNames = new HashSet<String>();
- final Map<Long, String> data = PERSISTENCE_SERVICE.jobOperatorGetExternalJobInstanceData();
+ final Map<Long, String> data = persistenceManagerService.jobOperatorGetExternalJobInstanceData();
for (final Map.Entry<Long, String> entry : data.entrySet()) {
long instanceId = entry.getKey();
- if (SECURITY_SERVICE.isAuthorized(instanceId)) {
+ if (securityService.isAuthorized(instanceId)) {
jobNames.add(entry.getValue());
}
}
@@ -274,11 +283,11 @@ public class JobOperatorImpl implements JobOperator {
@Override
public Properties getParameters(final long executionId) throws NoSuchJobExecutionException, JobSecurityException {
- final JobInstance requestedJobInstance = KERNEL_SERVICE.getJobInstance(executionId);
- if (!SECURITY_SERVICE.isAuthorized(requestedJobInstance.getInstanceId())) {
+ final JobInstance requestedJobInstance = kernelService.getJobInstance(executionId);
+ if (!securityService.isAuthorized(requestedJobInstance.getInstanceId())) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
- return PERSISTENCE_SERVICE.getParameters(executionId);
+ return persistenceManagerService.getParameters(executionId);
}
@@ -287,7 +296,7 @@ public class JobOperatorImpl implements JobOperator {
final List<Long> jobExecutions = new ArrayList<Long>();
// get the jobexecution ids associated with this job name
- final Set<Long> executionIds = PERSISTENCE_SERVICE.jobOperatorGetRunningExecutions(jobName);
+ final Set<Long> executionIds = persistenceManagerService.jobOperatorGetRunningExecutions(jobName);
if (executionIds.size() <= 0) {
throw new NoSuchJobException("Job Name " + jobName + " not found");
@@ -296,9 +305,9 @@ public class JobOperatorImpl implements JobOperator {
// for every job instance id
for (final long id : executionIds) {
try {
- if (SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(id))) {
- if (KERNEL_SERVICE.isExecutionRunning(id)) {
- final InternalJobExecution jobEx = KERNEL_SERVICE.getJobExecution(id);
+ if (securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(id))) {
+ if (kernelService.isExecutionRunning(id)) {
+ final InternalJobExecution jobEx = kernelService.getJobExecution(id);
jobExecutions.add(jobEx.getExecutionId());
}
}
@@ -313,12 +322,12 @@ public class JobOperatorImpl implements JobOperator {
public List<StepExecution> getStepExecutions(long executionId)
throws NoSuchJobExecutionException, JobSecurityException {
- final InternalJobExecution jobEx = KERNEL_SERVICE.getJobExecution(executionId);
+ final InternalJobExecution jobEx = kernelService.getJobExecution(executionId);
if (jobEx == null) {
throw new NoSuchJobExecutionException("Job Execution: " + executionId + " not found");
}
- if (SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
- return PERSISTENCE_SERVICE.getStepExecutionsForJobExecution(executionId);
+ if (securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
+ return persistenceManagerService.getStepExecutionsForJobExecution(executionId);
}
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
@@ -345,7 +354,7 @@ public class JobOperatorImpl implements JobOperator {
}
private long restartInternal(final long oldExecutionId, final Properties restartParameters) throws JobExecutionAlreadyCompleteException, NoSuchJobExecutionException, JobExecutionNotMostRecentException, JobRestartException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(oldExecutionId))) {
+ if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(oldExecutionId))) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
@@ -360,15 +369,15 @@ public class JobOperatorImpl implements JobOperator {
jobParameterWriter.write("Job parameters on restart = null");
}
- return KERNEL_SERVICE.restartJob(oldExecutionId, restartParameters).getExecutionId();
+ return kernelService.restartJob(oldExecutionId, restartParameters).getExecutionId();
}
@Override
public void stop(final long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException, JobSecurityException {
- if (!SECURITY_SERVICE.isAuthorized(PERSISTENCE_SERVICE.getJobInstanceIdByExecutionId(executionId))) {
+ if (!securityService.isAuthorized(persistenceManagerService.getJobInstanceIdByExecutionId(executionId))) {
throw new JobSecurityException("The current user is not authorized to perform this operation");
}
- KERNEL_SERVICE.stopJob(executionId);
+ kernelService.stopJob(executionId);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
index 044df60..63636eb 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
@@ -73,13 +73,15 @@ public abstract class BaseStepController implements ExecutionElementController {
protected long rootJobExecutionId;
- protected static final BatchKernelService BATCH_KERNEL = ServicesManager.service(BatchKernelService.class);
- private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
- private static final JobStatusManagerService JOB_STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
+ protected final BatchKernelService kernelService;
+ private final PersistenceManagerService persistenceManagerService;
+ private final JobStatusManagerService statusManagerService;
protected TransactionManagerAdapter transactionManager = null;
+ private TransactionManagementService txService;
- protected BaseStepController(final RuntimeJobExecution jobExecution, final Step step, final StepContextImpl stepContext, final long rootJobExecutionId) {
+ protected BaseStepController(final RuntimeJobExecution jobExecution, final Step step, final StepContextImpl stepContext, final long rootJobExecutionId,
+ final ServicesManager servicesManager) {
this.jobExecutionImpl = jobExecution;
this.jobInstance = jobExecution.getJobInstance();
this.stepContext = stepContext;
@@ -88,13 +90,19 @@ public abstract class BaseStepController implements ExecutionElementController {
throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
}
this.step = step;
+
+ this.txService = servicesManager.service(TransactionManagementService.class);
+ this.kernelService = servicesManager.service(BatchKernelService.class);
+ this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+ this.statusManagerService = servicesManager.service(JobStatusManagerService.class);
}
protected BaseStepController(final RuntimeJobExecution jobExecution,
final Step step, final StepContextImpl stepContext,
final long rootJobExecutionId,
- final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- this(jobExecution, step, stepContext, rootJobExecutionId);
+ final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+ final ServicesManager servicesManager) {
+ this(jobExecution, step, stepContext, rootJobExecutionId, servicesManager);
this.analyzerStatusQueue = analyzerStatusQueue;
}
@@ -239,7 +247,7 @@ public abstract class BaseStepController implements ExecutionElementController {
Timestamp startTS = new Timestamp(time);
stepContext.setStartTime(startTS);
- PERSISTENCE_MANAGER_SERVICE.updateStepExecution(rootJobExecutionId, stepContext);
+ persistenceManagerService.updateStepExecution(rootJobExecutionId, stepContext);
}
@@ -262,17 +270,17 @@ public abstract class BaseStepController implements ExecutionElementController {
protected void updateBatchStatus(final BatchStatus updatedBatchStatus) {
stepStatus.setBatchStatus(updatedBatchStatus);
- JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+ statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
stepContext.setBatchStatus(updatedBatchStatus);
}
protected boolean shouldStepBeExecuted() {
- this.stepStatus = JOB_STATUS_MANAGER_SERVICE.getStepStatus(jobInstance.getInstanceId(), step.getId());
+ this.stepStatus = statusManagerService.getStepStatus(jobInstance.getInstanceId(), step.getId());
if (stepStatus == null) {
// create new step execution
final StepExecutionImpl stepExecution = getNewStepExecution(rootJobExecutionId, stepContext);
// create new step status for this run
- stepStatus = JOB_STATUS_MANAGER_SERVICE.createStepStatus(stepExecution.getStepExecutionId());
+ stepStatus = statusManagerService.createStepStatus(stepExecution.getStepExecutionId());
stepContext.setStepExecutionId(stepExecution.getStepExecutionId());
return true;
} else {
@@ -332,8 +340,8 @@ public abstract class BaseStepController implements ExecutionElementController {
protected void statusStarting() {
stepStatus.setBatchStatus(BatchStatus.STARTING);
- JOB_STATUS_MANAGER_SERVICE.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
- JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+ statusManagerService.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
+ statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
stepContext.setBatchStatus(BatchStatus.STARTING);
}
@@ -350,23 +358,23 @@ public abstract class BaseStepController implements ExecutionElementController {
}
stepStatus.setPersistentUserData(new PersistentDataWrapper(persistentBAOS.toByteArray()));
- JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+ statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
}
protected void persistExitStatusAndEndTimestamp() {
stepStatus.setExitStatus(stepContext.getExitStatus());
- JOB_STATUS_MANAGER_SERVICE.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+ statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
// set the end time metric before flushing
long time = System.currentTimeMillis();
Timestamp endTS = new Timestamp(time);
stepContext.setEndTime(endTS);
- PERSISTENCE_MANAGER_SERVICE.updateStepExecution(rootJobExecutionId, stepContext);
+ persistenceManagerService.updateStepExecution(rootJobExecutionId, stepContext);
}
private StepExecutionImpl getNewStepExecution(long rootJobExecutionId, StepContextImpl stepContext) {
- return PERSISTENCE_MANAGER_SERVICE.createStepExecution(rootJobExecutionId, stepContext);
+ return persistenceManagerService.createStepExecution(rootJobExecutionId, stepContext);
}
private void setContextProperties() {
@@ -389,7 +397,7 @@ public abstract class BaseStepController implements ExecutionElementController {
stepContext.addMetric(MetricImpl.MetricType.COMMIT_COUNT, 0);
stepContext.addMetric(MetricImpl.MetricType.ROLLBACK_COUNT, 0);
- transactionManager = ServicesManager.service(TransactionManagementService.class).getTransactionManager(stepContext);
+ transactionManager = txService.getTransactionManager(stepContext);
}
public void setStepContext(final StepContextImpl stepContext) {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
index 498b596..bbd6c93 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/DecisionController.java
@@ -23,12 +23,13 @@ import org.apache.batchee.container.jsl.ExecutionElement;
import org.apache.batchee.container.proxy.DeciderProxy;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;
-import org.apache.batchee.spi.PersistenceManagerService;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.jaxb.Decision;
import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.spi.BatchArtifactFactory;
+import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.runtime.StepExecution;
import java.util.List;
@@ -41,11 +42,13 @@ public class DecisionController implements ExecutionElementController {
private StepExecution[] previousStepExecutions = null;
private final PersistenceManagerService persistenceService;
+ private final BatchArtifactFactory factory;
- public DecisionController(final RuntimeJobExecution jobExecution, final Decision decision) {
+ public DecisionController(final RuntimeJobExecution jobExecution, final Decision decision, final ServicesManager manager) {
this.jobExecution = jobExecution;
this.decision = decision;
- this.persistenceService = ServicesManager.service(PersistenceManagerService.class);
+ this.persistenceService = manager.service(PersistenceManagerService.class);
+ this.factory = manager.service(BatchArtifactFactory.class);
}
@Override
@@ -61,7 +64,7 @@ public class DecisionController implements ExecutionElementController {
//so two of these contexts will always be null
final InjectionReferences injectionRef = new InjectionReferences(jobExecution.getJobContext(), null, propList);
- final DeciderProxy deciderProxy = ProxyFactory.createDeciderProxy(deciderId, injectionRef, jobExecution);
+ final DeciderProxy deciderProxy = ProxyFactory.createDeciderProxy(factory, deciderId, injectionRef, jobExecution);
final String exitStatus = deciderProxy.decide(this.previousStepExecutions);
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
index 631ea3a..1492875 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionElementControllerFactory.java
@@ -20,6 +20,8 @@ import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.controller.batchlet.BatchletStepController;
import org.apache.batchee.container.impl.controller.chunk.ChunkStepController;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.jaxb.Batchlet;
import org.apache.batchee.jaxb.Chunk;
@@ -34,17 +36,18 @@ import java.util.concurrent.BlockingQueue;
public class ExecutionElementControllerFactory {
public static BaseStepController getStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
final StepContextImpl stepContext, final long rootJobExecutionId,
- final BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+ final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+ final ServicesManager servicesManager) {
final Partition partition = step.getPartition();
if (partition != null) {
if (partition.getMapper() != null) {
- return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+ return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
}
if (partition.getPlan() != null) {
if (partition.getPlan().getPartitions() != null) {
- return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+ return new PartitionedStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
}
}
}
@@ -54,26 +57,26 @@ public class ExecutionElementControllerFactory {
if (step.getChunk() != null) {
throw new IllegalArgumentException("Step contains both a batchlet and a chunk. Aborting.");
}
- return new BatchletStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+ return new BatchletStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue, servicesManager);
} else {
final Chunk chunk = step.getChunk();
if (chunk == null) {
throw new IllegalArgumentException("Step does not contain either a batchlet or a chunk. Aborting.");
}
- return new ChunkStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+ return new ChunkStepController(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue, servicesManager);
}
}
- public static DecisionController getDecisionController(RuntimeJobExecution jobExecutionImpl, Decision decision) {
- return new DecisionController(jobExecutionImpl, decision);
+ public static DecisionController getDecisionController(final ServicesManager servicesManager, final RuntimeJobExecution jobExecutionImpl, final Decision decision) {
+ return new DecisionController(jobExecutionImpl, decision, servicesManager);
}
- public static FlowController getFlowController(RuntimeJobExecution jobExecutionImpl, Flow flow, long rootJobExecutionId) {
- return new FlowController(jobExecutionImpl, flow, rootJobExecutionId);
+ public static FlowController getFlowController(final ServicesManager servicesManager, final RuntimeJobExecution jobExecutionImpl, final Flow flow, final long rootJobExecutionId) {
+ return new FlowController(jobExecutionImpl, flow, rootJobExecutionId, servicesManager);
}
- public static SplitController getSplitController(RuntimeJobExecution jobExecutionImpl, Split split, long rootJobExecutionId) {
- return new SplitController(jobExecutionImpl, split, rootJobExecutionId);
+ public static SplitController getSplitController(final BatchKernelService kernel, final RuntimeJobExecution jobExecutionImpl, final Split split, final long rootJobExecutionId) {
+ return new SplitController(jobExecutionImpl, split, rootJobExecutionId, kernel);
}
private ExecutionElementControllerFactory() {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
index 15651c5..faf7669 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/ExecutionTransitioner.java
@@ -27,6 +27,8 @@ import org.apache.batchee.container.jsl.IllegalTransitionException;
import org.apache.batchee.container.jsl.Transition;
import org.apache.batchee.container.jsl.TransitionElement;
import org.apache.batchee.container.navigator.ModelNavigator;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.container.util.PartitionDataWrapper;
@@ -44,6 +46,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
public class ExecutionTransitioner {
+ private final ServicesManager manager;
private RuntimeJobExecution jobExecution;
private long rootJobExecutionId;
private ModelNavigator<?> modelNavigator;
@@ -60,19 +63,24 @@ public class ExecutionTransitioner {
private List<Long> stepExecIds;
- public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<?> modelNavigator) {
+ public ExecutionTransitioner(final RuntimeJobExecution jobExecution, final long rootJobExecutionId, final ModelNavigator<?> modelNavigator,
+ final ServicesManager servicesManager) {
this.jobExecution = jobExecution;
this.rootJobExecutionId = rootJobExecutionId;
this.modelNavigator = modelNavigator;
this.jobContext = jobExecution.getJobContext();
+ this.manager = servicesManager;
}
- public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<JSLJob> jobNavigator, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+ public ExecutionTransitioner(final RuntimeJobExecution jobExecution, final long rootJobExecutionId,
+ final ModelNavigator<JSLJob> jobNavigator, final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+ final ServicesManager manager) {
this.jobExecution = jobExecution;
this.rootJobExecutionId = rootJobExecutionId;
this.modelNavigator = jobNavigator;
this.jobContext = jobExecution.getJobContext();
this.analyzerQueue = analyzerQueue;
+ this.manager = manager;
}
public ExecutionStatus doExecutionLoop() {
@@ -163,19 +171,19 @@ public class ExecutionTransitioner {
if (currentExecutionElement instanceof Decision) {
final Decision decision = (Decision) currentExecutionElement;
- elementController = ExecutionElementControllerFactory.getDecisionController(jobExecution, decision);
+ elementController = ExecutionElementControllerFactory.getDecisionController(manager, jobExecution, decision);
final DecisionController decisionController = (DecisionController) elementController;
decisionController.setPreviousStepExecutions(previousExecutionElement, previousElementController);
} else if (currentExecutionElement instanceof Flow) {
final Flow flow = (Flow) currentExecutionElement;
- elementController = ExecutionElementControllerFactory.getFlowController(jobExecution, flow, rootJobExecutionId);
+ elementController = ExecutionElementControllerFactory.getFlowController(manager, jobExecution, flow, rootJobExecutionId);
} else if (currentExecutionElement instanceof Split) {
final Split split = (Split) currentExecutionElement;
- elementController = ExecutionElementControllerFactory.getSplitController(jobExecution, split, rootJobExecutionId);
+ elementController = ExecutionElementControllerFactory.getSplitController(manager.service(BatchKernelService.class), jobExecution, split, rootJobExecutionId);
} else if (currentExecutionElement instanceof Step) {
final Step step = (Step) currentExecutionElement;
final StepContextImpl stepContext = new StepContextImpl(step.getId());
- elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue);
+ elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue, manager);
} else {
elementController = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
index db0d4c6..a4b6fda 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowController.java
@@ -22,6 +22,7 @@ import org.apache.batchee.container.impl.JobContextImpl;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.navigator.ModelNavigator;
import org.apache.batchee.container.navigator.NavigatorFactory;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.jaxb.Flow;
@@ -32,6 +33,7 @@ import java.util.List;
public class FlowController implements ExecutionElementController {
private final RuntimeJobExecution jobExecution;
private final JobContextImpl jobContext;
+ private final ServicesManager manager;
protected ModelNavigator<Flow> flowNavigator;
@@ -40,18 +42,19 @@ public class FlowController implements ExecutionElementController {
private ExecutionTransitioner transitioner;
- public FlowController(final RuntimeJobExecution jobExecution, final Flow flow, final long rootJobExecutionId) {
+ public FlowController(final RuntimeJobExecution jobExecution, final Flow flow, final long rootJobExecutionId, final ServicesManager manager) {
this.jobExecution = jobExecution;
this.jobContext = jobExecution.getJobContext();
this.flowNavigator = NavigatorFactory.createFlowNavigator(flow);
this.flow = flow;
this.rootJobExecutionId = rootJobExecutionId;
+ this.manager = manager;
}
@Override
public ExecutionStatus execute() {
if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
- transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator);
+ transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator, manager);
return transitioner.doExecutionLoop();
}
return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
index 93f6160..ea6f249 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/FlowInSplitThreadRootController.java
@@ -17,6 +17,7 @@
package org.apache.batchee.container.impl.controller;
import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.container.util.FlowInSplitBuilderConfig;
@@ -25,8 +26,8 @@ public class FlowInSplitThreadRootController extends JobThreadRootController {
// Careful, we have a separately named reference to the same object in the parent class
private RuntimeFlowInSplitExecution flowInSplitExecution;
- public FlowInSplitThreadRootController(final RuntimeFlowInSplitExecution flowInSplitExecution, final FlowInSplitBuilderConfig config) {
- super(flowInSplitExecution, config.getRootJobExecutionId());
+ public FlowInSplitThreadRootController(final RuntimeFlowInSplitExecution flowInSplitExecution, final FlowInSplitBuilderConfig config, final ServicesManager manager) {
+ super(flowInSplitExecution, config.getRootJobExecutionId(), manager);
this.flowInSplitExecution = flowInSplitExecution;
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
index 3446ba1..d671eb5 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobController.java
@@ -17,13 +17,14 @@
package org.apache.batchee.container.impl.controller;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.ServicesManager;
public class JobController extends JobThreadRootController {
- private JobController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId) {
- super(jobExecution, rootJobExecutionId);
+ private JobController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId, final ServicesManager manager) {
+ super(jobExecution, rootJobExecutionId, manager);
}
- public JobController(final RuntimeJobExecution jobExecution) {
- this(jobExecution, jobExecution.getExecutionId());
+ public JobController(final RuntimeJobExecution jobExecution, final ServicesManager manager) {
+ this(jobExecution, jobExecution.getExecutionId(), manager);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
index 1dd8c3b..feb3f1c 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
@@ -25,12 +25,13 @@ import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.JobListenerProxy;
import org.apache.batchee.container.proxy.ListenerFactory;
import org.apache.batchee.container.services.JobStatusManagerService;
-import org.apache.batchee.spi.PersistenceManagerService;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.spi.BatchArtifactFactory;
+import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.runtime.BatchStatus;
import java.io.PrintWriter;
@@ -52,22 +53,25 @@ public abstract class JobThreadRootController implements ThreadRootController {
protected final ModelNavigator<JSLJob> jobNavigator;
protected final JobStatusManagerService jobStatusService;
protected final PersistenceManagerService persistenceService;
+ protected final ServicesManager manager;
private ExecutionTransitioner transitioner;
private BlockingQueue<PartitionDataWrapper> analyzerQueue;
- public JobThreadRootController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId) {
+ public JobThreadRootController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId,
+ final ServicesManager servicesManager) {
this.jobExecution = jobExecution;
this.jobContext = jobExecution.getJobContext();
this.rootJobExecutionId = rootJobExecutionId;
this.jobInstanceId = jobExecution.getInstanceId();
- this.jobStatusService = ServicesManager.service(JobStatusManagerService.class);
- this.persistenceService = ServicesManager.service(PersistenceManagerService.class);
+ this.jobStatusService = servicesManager.service(JobStatusManagerService.class);
+ this.persistenceService = servicesManager.service(PersistenceManagerService.class);
this.jobNavigator = jobExecution.getJobNavigator();
+ this.manager = servicesManager;
final JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
final InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
- listenerFactory = new ListenerFactory(jobModel, injectionRef, jobExecution);
+ listenerFactory = new ListenerFactory(servicesManager.service(BatchArtifactFactory.class), jobModel, injectionRef, jobExecution);
jobExecution.setListenerFactory(listenerFactory);
}
@@ -75,8 +79,9 @@ public abstract class JobThreadRootController implements ThreadRootController {
* By not passing the rootJobExecutionId, we are "orphaning" the subjob execution and making it not findable from the parent.
* This is exactly what we want for getStepExecutions()... we don't want it to get extraneous entries for the partitions.
*/
- public JobThreadRootController(final RuntimeJobExecution jobExecution, final BlockingQueue<PartitionDataWrapper> analyzerQueue) {
- this(jobExecution, jobExecution.getExecutionId());
+ public JobThreadRootController(final RuntimeJobExecution jobExecution, final BlockingQueue<PartitionDataWrapper> analyzerQueue,
+ final ServicesManager servicesManager) {
+ this(jobExecution, jobExecution.getExecutionId(), servicesManager);
this.analyzerQueue = analyzerQueue;
}
@@ -96,7 +101,7 @@ public abstract class JobThreadRootController implements ThreadRootController {
// The BIG loop transitioning
// within the job !!!
// --------------------
- transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue);
+ transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue, manager);
retVal = transitioner.doExecutionLoop();
ExtendedBatchStatus extBatchStatus = retVal.getExtendedBatchStatus();
switch (extBatchStatus) {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
index 920ca99..8cd3593 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
@@ -17,14 +17,15 @@
package org.apache.batchee.container.impl.controller;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionsBuilderConfig;
/**
* Currently there's no special function on top of the subjob required of the partition.
*/
public class PartitionThreadRootController extends JobThreadRootController {
- public PartitionThreadRootController(RuntimeJobExecution jobExecution, PartitionsBuilderConfig config) {
- super(jobExecution, config.getAnalyzerQueue());
+ public PartitionThreadRootController(final RuntimeJobExecution jobExecution, final PartitionsBuilderConfig config, final ServicesManager servicesManager) {
+ super(jobExecution, config.getAnalyzerQueue(), servicesManager);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index 9797233..01444ef 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -26,6 +26,7 @@ import org.apache.batchee.container.proxy.PartitionMapperProxy;
import org.apache.batchee.container.proxy.PartitionReducerProxy;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.BatchPartitionPlan;
import org.apache.batchee.container.util.BatchPartitionWorkUnit;
import org.apache.batchee.container.util.BatchWorkUnit;
@@ -39,6 +40,7 @@ import org.apache.batchee.jaxb.PartitionMapper;
import org.apache.batchee.jaxb.PartitionReducer;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
import javax.batch.api.partition.PartitionPlan;
import javax.batch.api.partition.PartitionReducer.PartitionStatus;
@@ -80,8 +82,12 @@ public class PartitionedStepController extends BaseStepController {
BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
- protected PartitionedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, StepContextImpl stepContext, long rootJobExecutionId) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+ private final BatchArtifactFactory factory;
+
+ protected PartitionedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
+ final long rootJobExecutionId, final ServicesManager servicesManager) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
+ factory = servicesManager.service(BatchArtifactFactory.class);
}
@Override
@@ -96,7 +102,7 @@ public class PartitionedStepController extends BaseStepController {
if (parallelBatchWorkUnits != null) {
for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
try {
- BATCH_KERNEL.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+ kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
} catch (Exception e) {
// TODO - Is this what we want to know.
// Blow up if it happens to force the issue.
@@ -128,7 +134,7 @@ public class PartitionedStepController extends BaseStepController {
// Set all the contexts associated with this controller.
// Some of them may be null
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propertyList);
- final PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ final PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(factory, partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);
final PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
@@ -280,9 +286,9 @@ public class PartitionedStepController extends BaseStepController {
PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
// Then build all the subjobs but do not start them yet
if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
- parallelBatchWorkUnits = BATCH_KERNEL.buildOnRestartParallelPartitions(config);
+ parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config);
} else {
- parallelBatchWorkUnits = BATCH_KERNEL.buildNewParallelPartitions(config);
+ parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config);
}
// NOTE: At this point I might not have as many work units as I had partitions, since some may have already completed.
@@ -304,9 +310,9 @@ public class PartitionedStepController extends BaseStepController {
for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);
if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
- BATCH_KERNEL.restartGeneratedJob(workUnit);
+ kernelService.restartGeneratedJob(workUnit);
} else {
- BATCH_KERNEL.startGeneratedJob(workUnit);
+ kernelService.startGeneratedJob(workUnit);
}
}
@@ -336,9 +342,9 @@ public class PartitionedStepController extends BaseStepController {
if (numCurrentCompleted < numTotalForThisExecution) {
if (numCurrentSubmitted < numTotalForThisExecution) {
if (stepStatus.getStartCount() > 1) {
- BATCH_KERNEL.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+ kernelService.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
} else {
- BATCH_KERNEL.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+ kernelService.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
}
}
} else {
@@ -388,14 +394,14 @@ public class PartitionedStepController extends BaseStepController {
if (analyzer != null) {
final List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties().getPropertyList();
injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(factory, analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
}
final PartitionReducer partitionReducer = step.getPartition().getReducer();
if (partitionReducer != null) {
final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties().getPropertyList();
injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(factory, partitionReducer.getRef(), injectionRef, stepContext, jobExecutionImpl);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
index d085622..94738c0 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
@@ -23,11 +23,13 @@ import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.PartitionCollectorProxy;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.container.util.PartitionDataWrapper.PartitionEventType;
import org.apache.batchee.jaxb.Collector;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
import java.io.Serializable;
import java.util.List;
@@ -41,11 +43,17 @@ import java.util.concurrent.BlockingQueue;
* separate main thread with controller).
*/
public abstract class SingleThreadedStepController extends BaseStepController implements Controller {
+ private final BatchArtifactFactory factory;
+
// Collector only used from partition threads, not main thread
protected PartitionCollectorProxy collectorProxy = null;
- protected SingleThreadedStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ protected SingleThreadedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
+ final StepContextImpl stepContext, final long rootJobExecutionId,
+ final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+ final ServicesManager servicesManager) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, servicesManager);
+ factory = servicesManager.service(BatchArtifactFactory.class);
}
List<StepListenerProxy> stepListeners = null;
@@ -67,7 +75,7 @@ public abstract class SingleThreadedStepController extends BaseStepController im
* contexts may be null
*/
injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(collector.getRef(), injectionRef, this.stepContext, jobExecutionImpl);
+ this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(factory, collector.getRef(), injectionRef, this.stepContext, jobExecutionImpl);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
index b882de6..43c26bb 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
@@ -60,13 +60,13 @@ public class SplitController implements ExecutionElementController {
protected Split split;
- public SplitController(RuntimeJobExecution jobExecution, Split split, long rootJobExecutionId) {
+ public SplitController(final RuntimeJobExecution jobExecution, final Split split, final long rootJobExecutionId,
+ final BatchKernelService kernelService) {
this.jobExecution = jobExecution;
this.jobContext = jobExecution.getJobContext();
this.rootJobExecutionId = rootJobExecutionId;
this.split = split;
-
- batchKernel = ServicesManager.service(BatchKernelService.class);
+ this.batchKernel = kernelService;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
index a0f0cbb..13b60df 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
@@ -23,29 +23,34 @@ import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.proxy.BatchletProxy;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.jaxb.Batchlet;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
import javax.batch.runtime.BatchStatus;
import java.util.List;
import java.util.concurrent.BlockingQueue;
public class BatchletStepController extends SingleThreadedStepController {
+ private final BatchArtifactFactory factory;
private BatchletProxy batchletProxy;
public BatchletStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
final StepContextImpl stepContext, final long rootJobExecutionId,
- final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+ final ServicesManager manager) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, manager);
+ factory = manager.service(BatchArtifactFactory.class);
}
private void invokeBatchlet(final Batchlet batchlet) throws BatchContainerServiceException {
final String batchletId = batchlet.getRef();
final List<Property> propList = (batchlet.getProperties() == null) ? null : batchlet.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- batchletProxy = ProxyFactory.createBatchletProxy(batchletId, injectionRef, stepContext, jobExecutionImpl);
+ batchletProxy = ProxyFactory.createBatchletProxy(factory, batchletId, injectionRef, stepContext, jobExecutionImpl);
if (!wasStopIssued()) {
final String processRetVal = batchletProxy.process();
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
index 34f2fae..781f41f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
@@ -24,14 +24,15 @@ import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.jaxb.Chunk;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
public final class CheckpointAlgorithmFactory {
- public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final Step step, final InjectionReferences injectionReferences, final StepContextImpl stepContext, final RuntimeJobExecution jobExecution) {
+ public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final BatchArtifactFactory factory, final Step step, final InjectionReferences injectionReferences, final StepContextImpl stepContext, final RuntimeJobExecution jobExecution) {
final Chunk chunk = step.getChunk();
final String checkpointType = chunk.getCheckpointPolicy();
final CheckpointAlgorithmProxy proxy;
if ("custom".equalsIgnoreCase(checkpointType)) {
- proxy = ProxyFactory.createCheckpointAlgorithmProxy(chunk.getCheckpointAlgorithm().getRef(), injectionReferences, stepContext, jobExecution);
+ proxy = ProxyFactory.createCheckpointAlgorithmProxy(factory, chunk.getCheckpointAlgorithm().getRef(), injectionReferences, stepContext, jobExecution);
} else /* "item" */ {
proxy = new CheckpointAlgorithmProxy(new ItemCheckpointAlgorithm());
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
index 76cae58..f7f01bf 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -38,14 +38,15 @@ public class CheckpointManager {
public CheckpointManager(final ItemReaderProxy reader, final ItemWriterProxy writer,
final CheckpointAlgorithm chkptAlg,
- final long jobInstanceID, final String stepId) {
+ final long jobInstanceID, final String stepId,
+ final PersistenceManagerService persistenceManagerService) {
this.readerProxy = reader;
this.writerProxy = writer;
this.checkpointAlgorithm = chkptAlg;
this.stepId = stepId;
this.jobInstanceID = jobInstanceID;
- this.persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+ this.persistenceManagerService = persistenceManagerService;
}
public boolean applyCheckPointPolicy() {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/8ddf9f75/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index 7c8eec7..61cadfe 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -47,6 +47,7 @@ import org.apache.batchee.jaxb.ItemReader;
import org.apache.batchee.jaxb.ItemWriter;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.api.chunk.CheckpointAlgorithm;
@@ -64,7 +65,8 @@ public class ChunkStepController extends SingleThreadedStepController {
private final static String sourceClass = ChunkStepController.class.getName();
private final static Logger logger = Logger.getLogger(sourceClass);
- private final PersistenceManagerService persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+ private final PersistenceManagerService persistenceManagerService;
+ private final BatchArtifactFactory artifactFactory;
private Chunk chunk = null;
private ItemReaderProxy readerProxy = null;
@@ -84,8 +86,11 @@ public class ChunkStepController extends SingleThreadedStepController {
private boolean rollbackRetry = false;
public ChunkStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
- final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
+ final ServicesManager servicesManager) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, servicesManager);
+ this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
+ this.artifactFactory = servicesManager.service(BatchArtifactFactory.class);
}
/**
@@ -472,7 +477,6 @@ public class ChunkStepController extends SingleThreadedStepController {
int timeInterval = ChunkHelper.getTimeLimit(chunk);
boolean checkPointed = true;
boolean rollback = false;
- Throwable caughtThrowable = null;
// begin new transaction at first iteration or after a checkpoint commit
@@ -498,7 +502,7 @@ public class ChunkStepController extends SingleThreadedStepController {
positionWriterAtCheckpoint();
checkpointManager = new CheckpointManager(readerProxy, writerProxy,
getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl
- .getJobInstance().getInstanceId(), step.getId());
+ .getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
}
}
@@ -581,7 +585,6 @@ public class ChunkStepController extends SingleThreadedStepController {
}
} catch (final Exception e) {
- caughtThrowable = e;
logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
// Only try to call onError() if we have an Exception, but not an Error.
for (ChunkListenerProxy chunkProxy : chunkListeners) {
@@ -591,20 +594,20 @@ public class ChunkStepController extends SingleThreadedStepController {
logger.log(Level.SEVERE, e1.getMessage(), e1);
}
}
+ rollback(e);
} catch (final Throwable t) {
- caughtThrowable = t;
- logger.log(Level.SEVERE, t.getMessage(), t);
- } finally {
- if (caughtThrowable != null) {
- transactionManager.setRollbackOnly();
- readerProxy.close();
- writerProxy.close();
- transactionManager.rollback();
- throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", caughtThrowable);
- }
+ rollback(t);
}
}
+ private void rollback(final Throwable t) {
+ transactionManager.setRollbackOnly();
+ readerProxy.close();
+ writerProxy.close();
+ transactionManager.rollback();
+ throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
+ }
+
protected void invokeCoreStep() throws BatchContainerServiceException {
this.chunk = step.getChunk();
@@ -637,7 +640,7 @@ public class ChunkStepController extends SingleThreadedStepController {
final ItemReader itemReader = chunk.getReader();
final List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemReaderProps);
- readerProxy = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ readerProxy = ProxyFactory.createItemReaderProxy(artifactFactory, itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
}
{
@@ -645,7 +648,7 @@ public class ChunkStepController extends SingleThreadedStepController {
if (itemProcessor != null) {
final List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemProcessorProps);
- processorProxy = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ processorProxy = ProxyFactory.createItemProcessorProxy(artifactFactory, itemProcessor.getRef(), injectionRef, stepContext, jobExecutionImpl);
}
}
@@ -653,7 +656,7 @@ public class ChunkStepController extends SingleThreadedStepController {
final ItemWriter itemWriter = chunk.getWriter();
final List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemWriterProps);
- writerProxy = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ writerProxy = ProxyFactory.createItemWriterProxy(artifactFactory, itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
}
{
@@ -665,7 +668,7 @@ public class ChunkStepController extends SingleThreadedStepController {
}
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(step, injectionRef, stepContext, jobExecutionImpl);
+ checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(artifactFactory, step, injectionRef, stepContext, jobExecutionImpl);
}
{
@@ -689,7 +692,7 @@ public class ChunkStepController extends SingleThreadedStepController {
chkptAlg = checkpointProxy;
}
- checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
+ checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
skipHandler = new SkipHandler(chunk);
skipHandler.addSkipProcessListener(skipProcessListeners);