You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:02 UTC
[25/62] importing batchee from github - a fork from the IBm RI
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/factory/CDIBatchArtifactFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/factory/CDIBatchArtifactFactory.java b/jbatch/src/main/java/org/apache/batchee/container/services/factory/CDIBatchArtifactFactory.java
new file mode 100755
index 0000000..e817479
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/factory/CDIBatchArtifactFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.services.factory;
+
+import org.apache.batchee.container.cdi.BatchCDIInjectionExtension;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.spi.BatchArtifactFactory;
+
+import javax.enterprise.context.Dependent;
+import javax.enterprise.context.spi.CreationalContext;
+import javax.enterprise.inject.spi.Bean;
+import javax.enterprise.inject.spi.BeanManager;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Set;
+
+public class CDIBatchArtifactFactory extends DefaultBatchArtifactFactory implements BatchArtifactFactory {
+ @Override
+ public Instance load(final String batchId) {
+ try {
+ final BeanManager bm = getBeanManager();
+ if (bm == null) {
+ return super.load(batchId);
+ }
+
+ final Set<Bean<?>> beans = bm.getBeans(batchId);
+ final Bean<?> bean = bm.resolve(beans);
+ if (bean == null) { // fallback to try to instantiate it from TCCL as per the spec
+ return super.load(batchId);
+ }
+ final Class<?> clazz = bean.getBeanClass();
+ final CreationalContext creationalContext = bm.createCreationalContext(bean);
+ final Object artifactInstance = bm.getReference(bean, clazz, creationalContext);
+ if (Dependent.class.equals(bean.getScope()) || !bm.isNormalScope(bean.getScope())) { // need to be released
+ return new Instance(artifactInstance, new Closeable() {
+ @Override
+ public void close() throws IOException {
+ creationalContext.release();
+ }
+ });
+ }
+ return new Instance(artifactInstance, null);
+ } catch (final Exception e) {
+ // no-op
+ }
+ return null;
+ }
+
+ @Override
+ public void init(final Properties batchConfig) throws BatchContainerServiceException {
+ // no-op
+ }
+
+ protected BeanManager getBeanManager() {
+ final BatchCDIInjectionExtension instance = BatchCDIInjectionExtension.getInstance();
+ if (instance == null) {
+ return null;
+ }
+ return instance.getBeanManager();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java b/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java
new file mode 100755
index 0000000..5171aa8
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.services.factory;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.util.DependencyInjections;
+import org.apache.batchee.spi.BatchArtifactFactory;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class DefaultBatchArtifactFactory implements BatchArtifactFactory, XMLStreamConstants {
+ private final static String BATCH_XML = "META-INF/batch.xml";
+ private final static String BATCHEE_XML = "META-INF/batchee.xml"; // used for out extensions to get short names, spec doesn't impose to read multiple batch.xml so using it as a workaround
+ private final static QName BATCH_ROOT_ELEM = new QName("http://xmlns.jcp.org/xml/ns/javaee", "batch-artifacts");
+
+ // Uses TCCL
+ @Override
+ public Instance load(final String batchId) {
+ final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ final ArtifactMap artifactMap = createArtifactsMap(tccl);
+
+ Object loadedArtifact = artifactMap.getArtifactById(batchId);
+ if (loadedArtifact == null) {
+ try {
+ final Class<?> artifactClass = tccl.loadClass(batchId);
+ if (artifactClass != null) {
+ loadedArtifact = artifactClass.newInstance();
+ }
+ } catch (final ClassNotFoundException e) {
+ throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
+ } catch (final InstantiationException e) {
+ throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
+ } catch (final IllegalAccessException e) {
+ throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
+ }
+ }
+
+ if (ProxyFactory.getInjectionReferences() != null) {
+ DependencyInjections.injectReferences(loadedArtifact, ProxyFactory.getInjectionReferences());
+ }
+
+ return new Instance(loadedArtifact, null);
+ }
+
+ private ArtifactMap createArtifactsMap(final ClassLoader tccl) {
+ final ArtifactMap artifactMap = new ArtifactMap();
+ initArtifactMapFromClassLoader(artifactMap, tccl, BATCH_XML);
+ initArtifactMapFromClassLoader(artifactMap, tccl, BATCHEE_XML);
+ return artifactMap;
+ }
+
+ private ArtifactMap initArtifactMapFromClassLoader(final ArtifactMap map, final ClassLoader loader, final String name) {
+ final Enumeration<URL> urls;
+ try {
+ urls = loader.getResources(name);
+ } catch (final IOException e) {
+ // try it as fallback
+ final InputStream is = loader.getResourceAsStream(name);
+ if (is == null) {
+ return null;
+ }
+ populateArtifactMapFromStream(map, is);
+ return map;
+ }
+
+ final Collection<URL> parsedUrls = new LinkedList<URL>();
+ while (urls.hasMoreElements()) {
+ final URL url = urls.nextElement();
+ if (parsedUrls.contains(url)) { // can happen in weird classloaders graphs
+ continue;
+ }
+ parsedUrls.add(url);
+
+ final InputStream is;
+ try {
+ is = url.openStream();
+ } catch (final IOException e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ populateArtifactMapFromStream(map, is);
+ }
+ return map;
+ }
+
+ protected void populateArtifactMapFromStream(final ArtifactMap tempMap, final InputStream is) {
+ final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
+ try {
+ final XMLStreamReader xmlStreamReader = xmlInputFactory.createXMLStreamReader(is);
+
+ boolean processedRoot = false;
+
+ // We are going to take advantage of the simplified structure of a
+ // line
+ // E.g.:
+ // <batch-artifacts>
+ // <item-processor id=MyItemProcessor class=jsr352/sample/MyItemProcessorImpl/>
+ // ..
+ // </batch-artifacts>
+ //
+ // and have much simpler logic than general-purpose parsing would
+ // require.
+ while (xmlStreamReader.hasNext()) {
+ int event = xmlStreamReader.next();
+
+ // Until we reach end of document
+ if (event == END_DOCUMENT) {
+ break;
+ }
+
+ // At this point we have either:
+ // A) just passed START_DOCUMENT, and are at START_ELEMENT for the root,
+ // <batch-artifacts>, or
+ // B) we have just passed END_ELEMENT for one of the artifacts which is a child of
+ // <batch-artifacts>.
+ //
+ // Only handle START_ELEMENT now so we can skip whitespace CHARACTERS events.
+ //
+ if (event == START_ELEMENT) {
+ if (!processedRoot) {
+ QName rootQName = xmlStreamReader.getName();
+ if (!rootQName.equals(BATCH_ROOT_ELEM)) {
+ throw new IllegalStateException("Expecting document with root element QName: " + BATCH_ROOT_ELEM
+ + ", but found root element with QName: " + rootQName);
+ } else {
+ processedRoot = true;
+ }
+ } else {
+
+ // Should only need localName
+ final String annotationShortName = xmlStreamReader.getLocalName();
+ final String id = xmlStreamReader.getAttributeValue(null, "id");
+ final String className = xmlStreamReader.getAttributeValue(null, "class");
+ tempMap.addEntry(annotationShortName, id, className);
+
+ // Ignore anything else (text/whitespace) within this
+ // element
+ while (event != END_ELEMENT) {
+ event = xmlStreamReader.next();
+ }
+ }
+ }
+ }
+ xmlStreamReader.close();
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ is.close();
+ } catch (final IOException e) {
+ // no-op
+ }
+ }
+ }
+
+ private class ArtifactMap {
+ private Map<String, Class<?>> idToArtifactClassMap = new HashMap<String, Class<?>>();
+
+ // Maps to a list of types not a single type since there's no reason a single artifact couldn't be annotated
+ // with >1 batch artifact annotation type.
+ private Map<String, List<String>> idToArtifactTypeListMap = new HashMap<String, List<String>>();
+
+ /*
+ * Init already synchronized, so no need to synch further
+ */
+ private void addEntry(final String batchTypeName, final String id, final String className) {
+ try {
+ final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ if (!idToArtifactClassMap.containsKey(id)) {
+ Class<?> artifactClass = contextClassLoader.loadClass(className);
+
+ idToArtifactClassMap.put(id, artifactClass);
+ List<String> typeList = new ArrayList<String>();
+ typeList.add(batchTypeName);
+ idToArtifactTypeListMap.put(id, typeList);
+ } else {
+ final Class<?> artifactClass = contextClassLoader.loadClass(className);
+
+ // Already contains entry for this 'id', let's make sure it's the same Class
+ // which thus must implement >1 batch artifact "type" (i.e. contains >1 batch artifact annotation).
+ if (!idToArtifactClassMap.get(id).equals(artifactClass)) {
+ throw new IllegalArgumentException("Already loaded a different class for id = " + id);
+ }
+ List<String> typeList = idToArtifactTypeListMap.get(id);
+ typeList.add(batchTypeName);
+ }
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Object getArtifactById(final String id) {
+ Object artifactInstance = null;
+
+ try {
+ final Class<?> clazz = idToArtifactClassMap.get(id);
+ if (clazz != null) {
+ artifactInstance = (idToArtifactClassMap.get(id)).newInstance();
+ }
+ } catch (final IllegalAccessException e) {
+ throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + id, e);
+ } catch (final InstantiationException e) {
+ throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + id, e);
+ }
+
+
+ return artifactInstance;
+ }
+ }
+
+ @Override
+ public void init(final Properties batchConfig) throws BatchContainerServiceException {
+ // no-op
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
new file mode 100755
index 0000000..1015739
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
@@ -0,0 +1,288 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.services.kernel;
+
+import org.apache.batchee.container.ThreadRootController;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.impl.jobinstance.JobExecutionHelper;
+import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.util.BatchFlowInSplitWorkUnit;
+import org.apache.batchee.container.util.BatchPartitionWorkUnit;
+import org.apache.batchee.container.util.BatchWorkUnit;
+import org.apache.batchee.container.util.FlowInSplitBuilderConfig;
+import org.apache.batchee.container.util.PartitionsBuilderConfig;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.spi.BatchThreadPoolService;
+import org.apache.batchee.spi.PersistenceManagerService;
+
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobExecutionNotRunningException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.JobInstance;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DefaultBatchKernel implements BatchKernelService {
+ private final Map<Long, ThreadRootController> executionId2jobControllerMap = new ConcurrentHashMap<Long, ThreadRootController>();
+ private final Set<Long> instanceIdExecutingSet = new HashSet<Long>();
+
+ private final BatchThreadPoolService executorService;
+ private final PersistenceManagerService persistenceService;
+
+ public DefaultBatchKernel() {
+ executorService = ServicesManager.service(BatchThreadPoolService.class);
+ persistenceService = ServicesManager.service(PersistenceManagerService.class);
+ }
+
+ @Override
+ public void init(final Properties pgcConfig) throws BatchContainerServiceException {
+ // no-op
+ }
+
+ @Override
+ public InternalJobExecution startJob(final String jobXML, final Properties jobParameters) throws JobStartException {
+ final RuntimeJobExecution jobExecution = JobExecutionHelper.startJob(jobXML, jobParameters);
+
+ // TODO - register with status manager
+
+ final BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+ registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+ executorService.executeTask(batchWork, null);
+
+ return jobExecution.getJobOperatorJobExecution();
+ }
+
+ @Override
+ public void stopJob(final long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
+
+ final ThreadRootController controller = this.executionId2jobControllerMap.get(executionId);
+ if (controller == null) {
+ throw new JobExecutionNotRunningException("JobExecution with execution id of " + executionId + "is not running.");
+ }
+ controller.stop();
+ }
+
+ @Override
+ public InternalJobExecution restartJob(final long executionId, final Properties jobOverrideProps) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+ final RuntimeJobExecution jobExecution = JobExecutionHelper.restartJob(executionId, jobOverrideProps);
+ final BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+
+ registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+ executorService.executeTask(batchWork, null);
+
+ return jobExecution.getJobOperatorJobExecution();
+ }
+
+ @Override
+ public void jobExecutionDone(final RuntimeJobExecution jobExecution) {
+ // Remove from executionId, instanceId map,set after job is done
+ this.executionId2jobControllerMap.remove(jobExecution.getExecutionId());
+ this.instanceIdExecutingSet.remove(jobExecution.getInstanceId());
+
+ for (final Closeable closeable : jobExecution.getReleasables()) { // release CDI beans for instance
+ try {
+ closeable.close();
+ } catch (final IOException e) {
+ // no-op
+ }
+ }
+
+ // AJM: ah - purge jobExecution from map here and flush to DB?
+ // edit: no long want a 2 tier for the jobexecution...do want it for step execution
+ // renamed method to flushAndRemoveStepExecution
+
+ }
+
+ public InternalJobExecution getJobExecution(final long executionId) throws NoSuchJobExecutionException {
+ return JobExecutionHelper.getPersistedJobOperatorJobExecution(executionId);
+ }
+
+ @Override
+ public void startGeneratedJob(final BatchWorkUnit batchWork) {
+ executorService.executeTask(batchWork, null);
+ }
+
+ @Override
+ public int getJobInstanceCount(final String jobName) {
+ return persistenceService.jobOperatorGetJobInstanceCount(jobName);
+ }
+
+ @Override
+ public JobInstance getJobInstance(final long executionId) {
+ return JobExecutionHelper.getJobInstance(executionId);
+ }
+
+
+ /**
+ * Build a list of batch work units and set them up in STARTING state but don't start them yet.
+ */
+
+ @Override
+ public List<BatchPartitionWorkUnit> buildNewParallelPartitions(final PartitionsBuilderConfig config)
+ throws JobRestartException, JobStartException {
+
+ final List<JSLJob> jobModels = config.getJobModels();
+ final Properties[] partitionPropertiesArray = config.getPartitionProperties();
+ final List<BatchPartitionWorkUnit> batchWorkUnits = new ArrayList<BatchPartitionWorkUnit>(jobModels.size());
+
+ int instance = 0;
+ for (final JSLJob parallelJob : jobModels) {
+ final Properties partitionProps = (partitionPropertiesArray == null) ? null : partitionPropertiesArray[instance];
+ final RuntimeJobExecution jobExecution = JobExecutionHelper.startPartition(parallelJob, partitionProps);
+ jobExecution.setPartitionInstance(instance);
+
+ final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+
+ registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+ batchWorkUnits.add(batchWork);
+ instance++;
+ }
+
+ return batchWorkUnits;
+ }
+
+ @Override
+ public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(final PartitionsBuilderConfig config) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+ final List<JSLJob> jobModels = config.getJobModels();
+ final Properties[] partitionProperties = config.getPartitionProperties();
+ final List<BatchPartitionWorkUnit> batchWorkUnits = new ArrayList<BatchPartitionWorkUnit>(jobModels.size());
+
+ //for now let always use a Properties array. We can add some more convenience methods later for null properties and what not
+
+ int instance = 0;
+ for (final JSLJob parallelJob : jobModels) {
+
+ final Properties partitionProps = (partitionProperties == null) ? null : partitionProperties[instance];
+
+ try {
+ final long execId = getMostRecentExecutionId(parallelJob);
+ final RuntimeJobExecution jobExecution;
+ try {
+ jobExecution = JobExecutionHelper.restartPartition(execId, parallelJob, partitionProps);
+ jobExecution.setPartitionInstance(instance);
+ } catch (final NoSuchJobExecutionException e) {
+ throw new IllegalStateException("Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId, e);
+ }
+
+ final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+ registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+ batchWorkUnits.add(batchWork);
+ } catch (final JobExecutionAlreadyCompleteException e) {
+ // no-op
+ }
+
+ instance++;
+ }
+
+ return batchWorkUnits;
+ }
+
+ @Override
+ public void restartGeneratedJob(final BatchWorkUnit batchWork) throws JobRestartException {
+ executorService.executeTask(batchWork, null);
+ }
+
+ @Override
+ public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(final FlowInSplitBuilderConfig config) {
+ final JSLJob parallelJob = config.getJobModel();
+
+ final RuntimeFlowInSplitExecution execution = JobExecutionHelper.startFlowInSplit(parallelJob);
+ final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, execution, config);
+
+ registerCurrentInstanceAndExecution(execution, batchWork.getController());
+ return batchWork;
+ }
+
+ private long getMostRecentExecutionId(final JSLJob jobModel) {
+
+ //There can only be one instance associated with a subjob's id since it is generated from an unique
+ //job instance id. So there should be no way to directly start a subjob with particular
+ final List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 2);
+
+ // Maybe we should blow up on '0' too?
+ if (instanceIds.size() > 1) {
+ throw new IllegalStateException("Found " + instanceIds.size() + " entries for instance id = " + jobModel.getId() + ", which should not have happened. Blowing up.");
+ }
+
+ final List<InternalJobExecution> partitionExecs = persistenceService.jobOperatorGetJobExecutions(instanceIds.get(0));
+
+ Long execId = Long.MIN_VALUE;
+ for (final InternalJobExecution partitionExec : partitionExecs) {
+ if (partitionExec.getExecutionId() > execId) {
+ execId = partitionExec.getExecutionId();
+ }
+ }
+ return execId;
+ }
+
+ @Override
+ public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(final FlowInSplitBuilderConfig config)
+ throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+ final JSLJob jobModel = config.getJobModel();
+ final long execId = getMostRecentExecutionId(jobModel);
+ final RuntimeFlowInSplitExecution jobExecution;
+ try {
+ jobExecution = JobExecutionHelper.restartFlowInSplit(execId, jobModel);
+ } catch (final NoSuchJobExecutionException e) {
+ throw new IllegalStateException("Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId, e);
+ }
+
+ final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, jobExecution, config);
+
+ registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+ return batchWork;
+ }
+
+ private void registerCurrentInstanceAndExecution(final RuntimeJobExecution jobExecution, final ThreadRootController controller) {
+ final long execId = jobExecution.getExecutionId();
+ final long instanceId = jobExecution.getInstanceId();
+ final String errorPrefix = "Tried to execute with Job executionId = " + execId + " and instanceId = " + instanceId + " ";
+ if (executionId2jobControllerMap.get(execId) != null) {
+ throw new IllegalStateException(errorPrefix + "but executionId is already currently executing.");
+ } else if (instanceIdExecutingSet.contains(instanceId)) {
+ throw new IllegalStateException(errorPrefix + "but another execution with this instanceId is already currently executing.");
+ } else {
+ instanceIdExecutingSet.add(instanceId);
+ executionId2jobControllerMap.put(jobExecution.getExecutionId(), controller);
+ }
+ }
+
+ @Override
+ public boolean isExecutionRunning(final long executionId) {
+ return executionId2jobControllerMap.containsKey(executionId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/loader/DefaultJobXMLLoaderService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/loader/DefaultJobXMLLoaderService.java b/jbatch/src/main/java/org/apache/batchee/container/services/loader/DefaultJobXMLLoaderService.java
new file mode 100755
index 0000000..be8fc04
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/loader/DefaultJobXMLLoaderService.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.services.loader;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.spi.JobXMLLoaderService;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class DefaultJobXMLLoaderService implements JobXMLLoaderService {
+ private static final String PREFIX = "META-INF/batch-jobs/";
+
+ @Override
+ public String loadJSL(final String id) {
+ final String jobXML = loadJobFromBatchJobs(id);
+ if (jobXML == null) {
+ throw new BatchContainerServiceException("Could not load job xml with id: " + id);
+ }
+ return jobXML;
+
+ }
+
+
+ private static String loadJobFromBatchJobs(final String id) {
+ final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ final String relativePath = PREFIX + id + ".xml";
+ final InputStream stream = tccl.getResourceAsStream(relativePath);
+ if (stream == null) {
+ throw new BatchContainerRuntimeException(new FileNotFoundException(
+ "Cannot find an XML file under " + PREFIX + " with the following name " + id + ".xml"));
+ }
+
+ return readJobXML(stream);
+ }
+
+
+ private static String readJobXML(final InputStream stream) {
+ final StringBuilder out = new StringBuilder();
+ try {
+ final byte[] b = new byte[4096];
+ for (int i; (i = stream.read(b)) != -1; ) {
+ out.append(new String(b, 0, i));
+ }
+ } catch (final FileNotFoundException e) {
+ throw new BatchContainerServiceException(e);
+ } catch (final IOException e) {
+ throw new BatchContainerServiceException(e);
+ }
+ return out.toString();
+ }
+
+
+ @Override
+ public void init(final Properties batchConfig) throws BatchContainerServiceException {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java b/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
new file mode 100644
index 0000000..dbea45f
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright 2012,2013 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.services.locator;
+
+import org.apache.batchee.container.services.ServicesManager;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This is not the default since it could create memory leaks
+ * But in a server it is quite easy to use setServiceManager/resetServiceManager
+ * to handle it (either from the server itself or from an app in lightweight containers).
+ * Note: initializeServiceManager can be used instead of setServiceManager to use default behavior
+ */
+public class ClassLoaderLocator extends SingletonLocator {
+ private static final Map<ClassLoader, ServicesManager> MANAGERS = new ConcurrentHashMap<ClassLoader, ServicesManager>();
+
+ public static void setServiceManager(final ClassLoader key, final ServicesManager manager) {
+ MANAGERS.put(key, manager);
+ }
+
+ public static void initializeServiceManager(final ClassLoader key, final Properties props) {
+ final ServicesManager mgr = new ServicesManager();
+ mgr.init(props);
+ setServiceManager(key, mgr);
+ }
+
+ public static ServicesManager resetServiceManager(final ClassLoader key) {
+ return MANAGERS.remove(key);
+ }
+
+ @Override
+ public ServicesManager find() {
+ ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ if (tccl == null) {
+ return super.find();
+ }
+
+ do {
+ final ServicesManager mgr = MANAGERS.get(tccl);
+ if (mgr != null) {
+ return mgr;
+ }
+ tccl = tccl.getParent();
+ } while (tccl != null);
+ return super.find();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/locator/SingletonLocator.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/locator/SingletonLocator.java b/jbatch/src/main/java/org/apache/batchee/container/services/locator/SingletonLocator.java
new file mode 100644
index 0000000..2dfa61a
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/locator/SingletonLocator.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright 2012,2013 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.services.locator;
+
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.services.ServicesManagerLocator;
+
+public class SingletonLocator implements ServicesManagerLocator {
+ public static final ServicesManagerLocator INSTANCE = new SingletonLocator();
+
+ private static final ServicesManager MANAGER = new ServicesManager();
+ static {
+ MANAGER.init(null);
+ }
+
+ @Override
+ public ServicesManager find() {
+ return MANAGER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/package-info.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/package-info.java b/jbatch/src/main/java/org/apache/batchee/container/services/package-info.java
new file mode 100755
index 0000000..8698f13
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2013 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+/**
+ * This interfaces are tightly coupled to the existing
+ * batch implementation.
+ */
+package org.apache.batchee.container.services;
\ No newline at end of file