You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:02 UTC

[25/62] importing batchee from github - a fork from the IBm RI

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/factory/CDIBatchArtifactFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/factory/CDIBatchArtifactFactory.java b/jbatch/src/main/java/org/apache/batchee/container/services/factory/CDIBatchArtifactFactory.java
new file mode 100755
index 0000000..e817479
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/factory/CDIBatchArtifactFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.services.factory;
+
+import org.apache.batchee.container.cdi.BatchCDIInjectionExtension;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.spi.BatchArtifactFactory;
+
+import javax.enterprise.context.Dependent;
+import javax.enterprise.context.spi.CreationalContext;
+import javax.enterprise.inject.spi.Bean;
+import javax.enterprise.inject.spi.BeanManager;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Set;
+
+public class CDIBatchArtifactFactory extends DefaultBatchArtifactFactory implements BatchArtifactFactory {
+    @Override
+    public Instance load(final String batchId) {
+        try {
+            final BeanManager bm = getBeanManager();
+            if (bm == null) {
+                return super.load(batchId);
+            }
+
+            final Set<Bean<?>> beans = bm.getBeans(batchId);
+            final Bean<?> bean = bm.resolve(beans);
+            if (bean == null) { // fallback to try to instantiate it from TCCL as per the spec
+                return super.load(batchId);
+            }
+            final Class<?> clazz = bean.getBeanClass();
+            final CreationalContext creationalContext = bm.createCreationalContext(bean);
+            final Object artifactInstance = bm.getReference(bean, clazz, creationalContext);
+            if (Dependent.class.equals(bean.getScope()) || !bm.isNormalScope(bean.getScope())) { // need to be released
+                return new Instance(artifactInstance, new Closeable() {
+                    @Override
+                    public void close() throws IOException {
+                        creationalContext.release();
+                    }
+                });
+            }
+            return new Instance(artifactInstance, null);
+        } catch (final Exception e) {
+            // no-op
+        }
+        return null;
+    }
+
+    @Override
+    public void init(final Properties batchConfig) throws BatchContainerServiceException {
+        // no-op
+    }
+
+    protected BeanManager getBeanManager() {
+        final BatchCDIInjectionExtension instance = BatchCDIInjectionExtension.getInstance();
+        if (instance == null) {
+            return null;
+        }
+        return instance.getBeanManager();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java b/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java
new file mode 100755
index 0000000..5171aa8
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.services.factory;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.util.DependencyInjections;
+import org.apache.batchee.spi.BatchArtifactFactory;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class DefaultBatchArtifactFactory implements BatchArtifactFactory, XMLStreamConstants {
+    private final static String BATCH_XML = "META-INF/batch.xml";
+    private final static String BATCHEE_XML = "META-INF/batchee.xml"; // used for out extensions to get short names, spec doesn't impose to read multiple batch.xml so using it as a workaround
+    private final static QName BATCH_ROOT_ELEM = new QName("http://xmlns.jcp.org/xml/ns/javaee", "batch-artifacts");
+
+    // Uses TCCL
+    @Override
+    public Instance load(final String batchId) {
+        final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+        final ArtifactMap artifactMap = createArtifactsMap(tccl);
+
+        Object loadedArtifact = artifactMap.getArtifactById(batchId);
+        if (loadedArtifact == null) {
+            try {
+                final Class<?> artifactClass = tccl.loadClass(batchId);
+                if (artifactClass != null) {
+                    loadedArtifact = artifactClass.newInstance();
+                }
+            } catch (final ClassNotFoundException e) {
+                throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
+            } catch (final InstantiationException e) {
+                throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
+            } catch (final IllegalAccessException e) {
+                throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
+            }
+        }
+
+        if (ProxyFactory.getInjectionReferences() != null) {
+            DependencyInjections.injectReferences(loadedArtifact, ProxyFactory.getInjectionReferences());
+        }
+
+        return new Instance(loadedArtifact, null);
+    }
+
+    private ArtifactMap createArtifactsMap(final ClassLoader tccl) {
+        final ArtifactMap artifactMap = new ArtifactMap();
+        initArtifactMapFromClassLoader(artifactMap, tccl, BATCH_XML);
+        initArtifactMapFromClassLoader(artifactMap, tccl, BATCHEE_XML);
+        return artifactMap;
+    }
+
+    private ArtifactMap initArtifactMapFromClassLoader(final ArtifactMap map, final ClassLoader loader, final String name) {
+        final Enumeration<URL> urls;
+        try {
+            urls = loader.getResources(name);
+        } catch (final IOException e) {
+            // try it as fallback
+            final InputStream is = loader.getResourceAsStream(name);
+            if (is == null) {
+                return null;
+            }
+            populateArtifactMapFromStream(map, is);
+            return map;
+        }
+
+        final Collection<URL> parsedUrls = new LinkedList<URL>();
+        while (urls.hasMoreElements()) {
+            final URL url = urls.nextElement();
+            if (parsedUrls.contains(url)) { // can happen in weird classloaders graphs
+                continue;
+            }
+            parsedUrls.add(url);
+
+            final InputStream is;
+            try {
+                is = url.openStream();
+            } catch (final IOException e) {
+                throw new BatchContainerRuntimeException(e);
+            }
+
+            populateArtifactMapFromStream(map, is);
+        }
+        return map;
+    }
+
+    protected void populateArtifactMapFromStream(final ArtifactMap tempMap, final InputStream is) {
+        final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
+        try {
+            final XMLStreamReader xmlStreamReader = xmlInputFactory.createXMLStreamReader(is);
+
+            boolean processedRoot = false;
+
+            // We are going to take advantage of the simplified structure of a
+            // line
+            // E.g.:
+            // <batch-artifacts>
+            //   <item-processor id=MyItemProcessor class=jsr352/sample/MyItemProcessorImpl/>
+            //   ..
+            // </batch-artifacts>
+            //
+            // and have much simpler logic than general-purpose parsing would
+            // require.
+            while (xmlStreamReader.hasNext()) {
+                int event = xmlStreamReader.next();
+
+                // Until we reach end of document
+                if (event == END_DOCUMENT) {
+                    break;
+                }
+
+                // At this point we have either:
+                //    A) just passed START_DOCUMENT, and are at START_ELEMENT for the root,
+                //       <batch-artifacts>, or
+                //    B) we have just passed END_ELEMENT for one of the artifacts which is a child of
+                //       <batch-artifacts>.
+                //
+                //  Only handle START_ELEMENT now so we can skip whitespace CHARACTERS events.
+                //
+                if (event == START_ELEMENT) {
+                    if (!processedRoot) {
+                        QName rootQName = xmlStreamReader.getName();
+                        if (!rootQName.equals(BATCH_ROOT_ELEM)) {
+                            throw new IllegalStateException("Expecting document with root element QName: " + BATCH_ROOT_ELEM
+                                + ", but found root element with QName: " + rootQName);
+                        } else {
+                            processedRoot = true;
+                        }
+                    } else {
+
+                        // Should only need localName
+                        final String annotationShortName = xmlStreamReader.getLocalName();
+                        final String id = xmlStreamReader.getAttributeValue(null, "id");
+                        final String className = xmlStreamReader.getAttributeValue(null, "class");
+                        tempMap.addEntry(annotationShortName, id, className);
+
+                        // Ignore anything else (text/whitespace) within this
+                        // element
+                        while (event != END_ELEMENT) {
+                            event = xmlStreamReader.next();
+                        }
+                    }
+                }
+            }
+            xmlStreamReader.close();
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                is.close();
+            } catch (final IOException e) {
+                // no-op
+            }
+        }
+    }
+
+    private class ArtifactMap {
+        private Map<String, Class<?>> idToArtifactClassMap = new HashMap<String, Class<?>>();
+
+        // Maps to a list of types not a single type since there's no reason a single artifact couldn't be annotated
+        // with >1 batch artifact annotation type.
+        private Map<String, List<String>> idToArtifactTypeListMap = new HashMap<String, List<String>>();
+
+        /*
+         * Init already synchronized, so no need to synch further
+         */
+        private void addEntry(final String batchTypeName, final String id, final String className) {
+            try {
+                final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+                if (!idToArtifactClassMap.containsKey(id)) {
+                    Class<?> artifactClass = contextClassLoader.loadClass(className);
+
+                    idToArtifactClassMap.put(id, artifactClass);
+                    List<String> typeList = new ArrayList<String>();
+                    typeList.add(batchTypeName);
+                    idToArtifactTypeListMap.put(id, typeList);
+                } else {
+                    final Class<?> artifactClass = contextClassLoader.loadClass(className);
+
+                    // Already contains entry for this 'id', let's make sure it's the same Class
+                    // which thus must implement >1 batch artifact "type" (i.e. contains >1 batch artifact annotation).
+                    if (!idToArtifactClassMap.get(id).equals(artifactClass)) {
+                        throw new IllegalArgumentException("Already loaded a different class for id = " + id);
+                    }
+                    List<String> typeList = idToArtifactTypeListMap.get(id);
+                    typeList.add(batchTypeName);
+                }
+            } catch (final Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private Object getArtifactById(final String id) {
+            Object artifactInstance = null;
+
+            try {
+                final Class<?> clazz = idToArtifactClassMap.get(id);
+                if (clazz != null) {
+                    artifactInstance = (idToArtifactClassMap.get(id)).newInstance();
+                }
+            } catch (final IllegalAccessException e) {
+                throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + id, e);
+            } catch (final InstantiationException e) {
+                throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + id, e);
+            }
+
+
+            return artifactInstance;
+        }
+    }
+
+    @Override
+    public void init(final Properties batchConfig) throws BatchContainerServiceException {
+        // no-op
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
new file mode 100755
index 0000000..1015739
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
@@ -0,0 +1,288 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.services.kernel;
+
+import org.apache.batchee.container.ThreadRootController;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.impl.jobinstance.JobExecutionHelper;
+import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.util.BatchFlowInSplitWorkUnit;
+import org.apache.batchee.container.util.BatchPartitionWorkUnit;
+import org.apache.batchee.container.util.BatchWorkUnit;
+import org.apache.batchee.container.util.FlowInSplitBuilderConfig;
+import org.apache.batchee.container.util.PartitionsBuilderConfig;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.spi.BatchThreadPoolService;
+import org.apache.batchee.spi.PersistenceManagerService;
+
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobExecutionNotRunningException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.JobInstance;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DefaultBatchKernel implements BatchKernelService {
+    private final Map<Long, ThreadRootController> executionId2jobControllerMap = new ConcurrentHashMap<Long, ThreadRootController>();
+    private final Set<Long> instanceIdExecutingSet = new HashSet<Long>();
+
+    private final BatchThreadPoolService executorService;
+    private final PersistenceManagerService persistenceService;
+
+    public DefaultBatchKernel() {
+        executorService = ServicesManager.service(BatchThreadPoolService.class);
+        persistenceService = ServicesManager.service(PersistenceManagerService.class);
+    }
+
+    @Override
+    public void init(final Properties pgcConfig) throws BatchContainerServiceException {
+        // no-op
+    }
+
+    @Override
+    public InternalJobExecution startJob(final String jobXML, final Properties jobParameters) throws JobStartException {
+        final RuntimeJobExecution jobExecution = JobExecutionHelper.startJob(jobXML, jobParameters);
+
+        // TODO - register with status manager
+
+        final BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+        registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+        executorService.executeTask(batchWork, null);
+
+        return jobExecution.getJobOperatorJobExecution();
+    }
+
+    @Override
+    public void stopJob(final long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
+
+        final ThreadRootController controller = this.executionId2jobControllerMap.get(executionId);
+        if (controller == null) {
+            throw new JobExecutionNotRunningException("JobExecution with execution id of " + executionId + "is not running.");
+        }
+        controller.stop();
+    }
+
+    @Override
+    public InternalJobExecution restartJob(final long executionId, final Properties jobOverrideProps) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+        final RuntimeJobExecution jobExecution = JobExecutionHelper.restartJob(executionId, jobOverrideProps);
+        final BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+
+        registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+        executorService.executeTask(batchWork, null);
+
+        return jobExecution.getJobOperatorJobExecution();
+    }
+
+    @Override
+    public void jobExecutionDone(final RuntimeJobExecution jobExecution) {
+        // Remove from executionId, instanceId map,set after job is done
+        this.executionId2jobControllerMap.remove(jobExecution.getExecutionId());
+        this.instanceIdExecutingSet.remove(jobExecution.getInstanceId());
+
+        for (final Closeable closeable : jobExecution.getReleasables()) { // release CDI beans for instance
+            try {
+                closeable.close();
+            } catch (final IOException e) {
+                // no-op
+            }
+        }
+
+        // AJM: ah - purge jobExecution from map here and flush to DB?
+        // edit: no long want a 2 tier for the jobexecution...do want it for step execution
+        // renamed method to flushAndRemoveStepExecution
+
+    }
+
+    public InternalJobExecution getJobExecution(final long executionId) throws NoSuchJobExecutionException {
+        return JobExecutionHelper.getPersistedJobOperatorJobExecution(executionId);
+    }
+
+    @Override
+    public void startGeneratedJob(final BatchWorkUnit batchWork) {
+        executorService.executeTask(batchWork, null);
+    }
+
+    @Override
+    public int getJobInstanceCount(final String jobName) {
+        return persistenceService.jobOperatorGetJobInstanceCount(jobName);
+    }
+
+    @Override
+    public JobInstance getJobInstance(final long executionId) {
+        return JobExecutionHelper.getJobInstance(executionId);
+    }
+
+
+    /**
+     * Build a list of batch work units and set them up in STARTING state but don't start them yet.
+     */
+
+    @Override
+    public List<BatchPartitionWorkUnit> buildNewParallelPartitions(final PartitionsBuilderConfig config)
+        throws JobRestartException, JobStartException {
+
+        final List<JSLJob> jobModels = config.getJobModels();
+        final Properties[] partitionPropertiesArray = config.getPartitionProperties();
+        final List<BatchPartitionWorkUnit> batchWorkUnits = new ArrayList<BatchPartitionWorkUnit>(jobModels.size());
+
+        int instance = 0;
+        for (final JSLJob parallelJob : jobModels) {
+            final Properties partitionProps = (partitionPropertiesArray == null) ? null : partitionPropertiesArray[instance];
+            final RuntimeJobExecution jobExecution = JobExecutionHelper.startPartition(parallelJob, partitionProps);
+            jobExecution.setPartitionInstance(instance);
+
+            final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+
+            registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+            batchWorkUnits.add(batchWork);
+            instance++;
+        }
+
+        return batchWorkUnits;
+    }
+
+    @Override
+    public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(final PartitionsBuilderConfig config) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+        final List<JSLJob> jobModels = config.getJobModels();
+        final Properties[] partitionProperties = config.getPartitionProperties();
+        final List<BatchPartitionWorkUnit> batchWorkUnits = new ArrayList<BatchPartitionWorkUnit>(jobModels.size());
+
+        //for now let always use a Properties array. We can add some more convenience methods later for null properties and what not
+
+        int instance = 0;
+        for (final JSLJob parallelJob : jobModels) {
+
+            final Properties partitionProps = (partitionProperties == null) ? null : partitionProperties[instance];
+
+            try {
+                final long execId = getMostRecentExecutionId(parallelJob);
+                final RuntimeJobExecution jobExecution;
+                try {
+                    jobExecution = JobExecutionHelper.restartPartition(execId, parallelJob, partitionProps);
+                    jobExecution.setPartitionInstance(instance);
+                } catch (final NoSuchJobExecutionException e) {
+                    throw new IllegalStateException("Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId, e);
+                }
+
+                final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+                registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+                batchWorkUnits.add(batchWork);
+            } catch (final JobExecutionAlreadyCompleteException e) {
+                // no-op
+            }
+
+            instance++;
+        }
+
+        return batchWorkUnits;
+    }
+
+    @Override
+    public void restartGeneratedJob(final BatchWorkUnit batchWork) throws JobRestartException {
+        executorService.executeTask(batchWork, null);
+    }
+
+    @Override
+    public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(final FlowInSplitBuilderConfig config) {
+        final JSLJob parallelJob = config.getJobModel();
+
+        final RuntimeFlowInSplitExecution execution = JobExecutionHelper.startFlowInSplit(parallelJob);
+        final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, execution, config);
+
+        registerCurrentInstanceAndExecution(execution, batchWork.getController());
+        return batchWork;
+    }
+
+    private long getMostRecentExecutionId(final JSLJob jobModel) {
+
+        //There can only be one instance associated with a subjob's id since it is generated from an unique
+        //job instance id. So there should be no way to directly start a subjob with particular
+        final List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 2);
+
+        // Maybe we should blow up on '0' too?
+        if (instanceIds.size() > 1) {
+            throw new IllegalStateException("Found " + instanceIds.size() + " entries for instance id = " + jobModel.getId() + ", which should not have happened.  Blowing up.");
+        }
+
+        final List<InternalJobExecution> partitionExecs = persistenceService.jobOperatorGetJobExecutions(instanceIds.get(0));
+
+        Long execId = Long.MIN_VALUE;
+        for (final InternalJobExecution partitionExec : partitionExecs) {
+            if (partitionExec.getExecutionId() > execId) {
+                execId = partitionExec.getExecutionId();
+            }
+        }
+        return execId;
+    }
+
+    @Override
+    public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(final FlowInSplitBuilderConfig config)
+        throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+        final JSLJob jobModel = config.getJobModel();
+        final long execId = getMostRecentExecutionId(jobModel);
+        final RuntimeFlowInSplitExecution jobExecution;
+        try {
+            jobExecution = JobExecutionHelper.restartFlowInSplit(execId, jobModel);
+        } catch (final NoSuchJobExecutionException e) {
+            throw new IllegalStateException("Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId, e);
+        }
+
+        final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, jobExecution, config);
+
+        registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+        return batchWork;
+    }
+
+    private void registerCurrentInstanceAndExecution(final RuntimeJobExecution jobExecution, final ThreadRootController controller) {
+        final long execId = jobExecution.getExecutionId();
+        final long instanceId = jobExecution.getInstanceId();
+        final String errorPrefix = "Tried to execute with Job executionId = " + execId + " and instanceId = " + instanceId + " ";
+        if (executionId2jobControllerMap.get(execId) != null) {
+            throw new IllegalStateException(errorPrefix + "but executionId is already currently executing.");
+        } else if (instanceIdExecutingSet.contains(instanceId)) {
+            throw new IllegalStateException(errorPrefix + "but another execution with this instanceId is already currently executing.");
+        } else {
+            instanceIdExecutingSet.add(instanceId);
+            executionId2jobControllerMap.put(jobExecution.getExecutionId(), controller);
+        }
+    }
+
+    @Override
+    public boolean isExecutionRunning(final long executionId) {
+        return executionId2jobControllerMap.containsKey(executionId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/loader/DefaultJobXMLLoaderService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/loader/DefaultJobXMLLoaderService.java b/jbatch/src/main/java/org/apache/batchee/container/services/loader/DefaultJobXMLLoaderService.java
new file mode 100755
index 0000000..be8fc04
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/loader/DefaultJobXMLLoaderService.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.services.loader;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.spi.JobXMLLoaderService;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class DefaultJobXMLLoaderService implements JobXMLLoaderService {
+    private static final String PREFIX = "META-INF/batch-jobs/";
+
+    @Override
+    public String loadJSL(final String id) {
+        final String jobXML = loadJobFromBatchJobs(id);
+        if (jobXML == null) {
+            throw new BatchContainerServiceException("Could not load job xml with id: " + id);
+        }
+        return jobXML;
+
+    }
+
+
+    private static String loadJobFromBatchJobs(final String id) {
+        final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+        final String relativePath = PREFIX + id + ".xml";
+        final InputStream stream = tccl.getResourceAsStream(relativePath);
+        if (stream == null) {
+            throw new BatchContainerRuntimeException(new FileNotFoundException(
+                "Cannot find an XML file under " + PREFIX + " with the following name " + id + ".xml"));
+        }
+
+        return readJobXML(stream);
+    }
+
+
+    private static String readJobXML(final InputStream stream) {
+        final StringBuilder out = new StringBuilder();
+        try {
+            final byte[] b = new byte[4096];
+            for (int i; (i = stream.read(b)) != -1; ) {
+                out.append(new String(b, 0, i));
+            }
+        } catch (final FileNotFoundException e) {
+            throw new BatchContainerServiceException(e);
+        } catch (final IOException e) {
+            throw new BatchContainerServiceException(e);
+        }
+        return out.toString();
+    }
+
+
+    @Override
+    public void init(final Properties batchConfig) throws BatchContainerServiceException {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java b/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
new file mode 100644
index 0000000..dbea45f
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/locator/ClassLoaderLocator.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright 2012,2013 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.services.locator;
+
+import org.apache.batchee.container.services.ServicesManager;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This is not the default since it could create memory leaks
+ * But in a server it is quite easy to use setServiceManager/resetServiceManager
+ * to handle it (either from the server itself or from an app in lightweight containers).
+ * Note: initializeServiceManager can be used instead of setServiceManager to use default behavior
+ */
+public class ClassLoaderLocator extends SingletonLocator {
+    private static final Map<ClassLoader, ServicesManager> MANAGERS = new ConcurrentHashMap<ClassLoader, ServicesManager>();
+
+    public static void setServiceManager(final ClassLoader key, final ServicesManager manager) {
+        MANAGERS.put(key, manager);
+    }
+
+    public static void initializeServiceManager(final ClassLoader key, final Properties props) {
+        final ServicesManager mgr = new ServicesManager();
+        mgr.init(props);
+        setServiceManager(key, mgr);
+    }
+
+    public static ServicesManager resetServiceManager(final ClassLoader key) {
+        return MANAGERS.remove(key);
+    }
+
+    @Override
+    public ServicesManager find() {
+        ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+        if (tccl == null) {
+            return super.find();
+        }
+
+        do {
+            final ServicesManager mgr = MANAGERS.get(tccl);
+            if (mgr != null) {
+                return mgr;
+            }
+            tccl = tccl.getParent();
+        } while (tccl != null);
+        return super.find();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/locator/SingletonLocator.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/locator/SingletonLocator.java b/jbatch/src/main/java/org/apache/batchee/container/services/locator/SingletonLocator.java
new file mode 100644
index 0000000..2dfa61a
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/locator/SingletonLocator.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright 2012,2013 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.services.locator;
+
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.services.ServicesManagerLocator;
+
+public class SingletonLocator implements ServicesManagerLocator {
+    public static final ServicesManagerLocator INSTANCE = new SingletonLocator();
+
+    private static final ServicesManager MANAGER = new ServicesManager();
+    static {
+        MANAGER.init(null);
+    }
+
+    @Override
+    public ServicesManager find() {
+        return MANAGER;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/services/package-info.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/package-info.java b/jbatch/src/main/java/org/apache/batchee/container/services/package-info.java
new file mode 100755
index 0000000..8698f13
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2013 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+/**
+ * This interfaces are tightly coupled to the existing
+ * batch implementation.
+ */
+package org.apache.batchee.container.services;
\ No newline at end of file