You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2014/03/30 12:35:47 UTC

[4/6] unify PersistenceManagerSercie impl names

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/67e63c08/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
new file mode 100644
index 0000000..1fae89b
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
@@ -0,0 +1,849 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.services.persistence;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.exception.PersistenceException;
+import org.apache.batchee.container.impl.JobExecutionImpl;
+import org.apache.batchee.container.impl.JobInstanceImpl;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.StepExecutionImpl;
+import org.apache.batchee.container.impl.controller.PartitionedStepBuilder;
+import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
+import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
+import org.apache.batchee.container.impl.controller.chunk.PersistentDataWrapper;
+import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.container.services.persistence.jpa.EntityManagerProvider;
+import org.apache.batchee.container.services.persistence.jpa.TransactionProvider;
+import org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity;
+import org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity;
+import org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity;
+import org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity;
+import org.apache.batchee.container.services.persistence.jpa.provider.DefaultEntityManagerProvider;
+import org.apache.batchee.container.services.persistence.jpa.provider.DefaultTransactionProvider;
+import org.apache.batchee.container.status.JobStatus;
+import org.apache.batchee.container.status.StepStatus;
+import org.apache.batchee.spi.PersistenceManagerService;
+
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import javax.batch.runtime.Metric;
+import javax.batch.runtime.StepExecution;
+import javax.persistence.EntityManager;
+import javax.persistence.NoResultException;
+import javax.persistence.TypedQuery;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.batchee.container.util.Serializations.deserialize;
+import static org.apache.batchee.container.util.Serializations.serialize;
+
+public class JPAPersistenceManagerService implements PersistenceManagerService {
+    private static final String[] DELETE_QUERIES = {
+        StepExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID,
+        JobExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID
+    };
+
+    private EntityManagerProvider emProvider;
+    private TransactionProvider txProvider;
+
+    @Override
+    public void cleanUp(final long instanceId) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final Object tx = txProvider.start(em);
+            try {
+                for (final String query : DELETE_QUERIES) {
+                    em.createQuery(query).setParameter("instanceId", instanceId).executeUpdate();
+                }
+                txProvider.commit(tx);
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public StepStatus getStepStatus(final long instanceId, final String stepName) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final List<StepExecutionEntity> list = em.createNamedQuery(StepExecutionEntity.Queries.FIND_BY_INSTANCE_AND_NAME, StepExecutionEntity.class)
+                .setParameter("instanceId", instanceId)
+                .setParameter("step", stepName)
+                .getResultList();
+            if (list != null && !list.isEmpty()) {
+                final StepExecutionEntity entity = list.iterator().next();
+                final StepStatus status = new StepStatus(entity.getId(), entity.getStartCount());
+                status.setBatchStatus(entity.getBatchStatus());
+                status.setExitStatus(entity.getExitStatus());
+                status.setNumPartitions(entity.getNumPartitions());
+                status.setLastRunStepExecutionId(entity.getLastRunStepExecutionId());
+                if (entity.getPersistentData() != null) {
+                    status.setPersistentUserData(new PersistentDataWrapper(entity.getPersistentData()));
+                }
+                return status;
+            }
+            return null;
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public Map<Long, String> jobOperatorGetExternalJobInstanceData() {
+        final Map<Long, String> data = new HashMap<Long, String>();
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final List<JobInstanceEntity> list = em.createNamedQuery(JobInstanceEntity.Queries.FIND_EXTERNALS, JobInstanceEntity.class)
+                .setParameter("pattern", PartitionedStepBuilder.JOB_ID_SEPARATOR)
+                .getResultList();
+            if (list != null) {
+                for (final JobInstanceEntity elt : list) {
+                    data.put(elt.getJobInstanceId(), elt.getName());
+                }
+            }
+        } finally {
+            emProvider.release(em);
+        }
+        return data;
+    }
+
+    @Override
+    public JobStatus getJobStatusFromExecution(final long executionId) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final JobInstanceEntity entity = em.createNamedQuery(JobInstanceEntity.Queries.FIND_FROM_EXECUTION, JobInstanceEntity.class)
+                .setParameter("executionId", executionId)
+                .getSingleResult();
+            final JobStatus status = new JobStatus(entity.getJobInstanceId());
+            setJobStatusData(status, entity);
+            return status;
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public StepExecution getStepExecutionByStepExecutionId(final long stepExecId) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepExecId);
+            final StepExecutionImpl stepEx = new StepExecutionImpl(entity.getExecution().getExecutionId(), stepExecId);
+            setStepExecutionData(entity, stepEx);
+            return stepEx;
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    private void setStepExecutionData(final StepExecutionEntity entity, final StepExecutionImpl stepEx) {
+        stepEx.setBatchStatus(entity.getBatchStatus());
+        stepEx.setExitStatus(entity.getExitStatus());
+        stepEx.setStepName(entity.getStepName());
+        stepEx.setReadCount(entity.getRead());
+        stepEx.setWriteCount(entity.getWrite());
+        stepEx.setCommitCount(entity.getCommit());
+        stepEx.setRollbackCount(entity.getRollback());
+        stepEx.setReadSkipCount(entity.getReadSkip());
+        stepEx.setProcessSkipCount(entity.getProcessSkip());
+        stepEx.setFilterCount(entity.getFilter());
+        stepEx.setWriteSkipCount(entity.getWriteSkip());
+        stepEx.setStartTime(entity.getStartTime());
+        stepEx.setEndTime(entity.getEndTime());
+        try {
+            stepEx.setPersistentUserData(deserialize(entity.getPersistentData()));
+        } catch (final Exception e) {
+            throw new PersistenceException(e);
+        }
+    }
+
+    @Override
+    public List<StepExecution> getStepExecutionsForJobExecution(final long execid) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final List<StepExecutionEntity> steps = em.createNamedQuery(StepExecutionEntity.Queries.FIND_BY_EXECUTION, StepExecutionEntity.class)
+                .setParameter("executionId", execid)
+                .getResultList();
+
+            if (steps == null) {
+                return Collections.emptyList();
+            }
+
+            final List<StepExecution> executions = new ArrayList<StepExecution>(steps.size());
+            for (final StepExecutionEntity entity : steps) {
+                final StepExecutionImpl stepEx = new StepExecutionImpl(execid, entity.getId());
+                setStepExecutionData(entity, stepEx);
+                executions.add(stepEx);
+            }
+
+            return executions;
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public void updateStepExecution(final long jobExecId, final StepContextImpl stepContext) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final Object tx = txProvider.start(em);
+            try {
+                final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepContext.getStepInternalExecID());
+                setStepData(em, jobExecId, stepContext,
+                    stepContext.metricsAsMap().get(Metric.MetricType.READ_COUNT.name()).getValue(),
+                    stepContext.metricsAsMap().get(Metric.MetricType.WRITE_COUNT.name()).getValue(),
+                    stepContext.metricsAsMap().get(Metric.MetricType.COMMIT_COUNT.name()).getValue(),
+                    stepContext.metricsAsMap().get(Metric.MetricType.ROLLBACK_COUNT.name()).getValue(),
+                    stepContext.metricsAsMap().get(Metric.MetricType.READ_SKIP_COUNT.name()).getValue(),
+                    stepContext.metricsAsMap().get(Metric.MetricType.PROCESS_SKIP_COUNT.name()).getValue(),
+                    stepContext.metricsAsMap().get(Metric.MetricType.FILTER_COUNT.name()).getValue(),
+                    stepContext.metricsAsMap().get(Metric.MetricType.WRITE_SKIP_COUNT.name()).getValue(),
+                    entity);
+                txProvider.commit(tx);
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public StepExecutionImpl createStepExecution(final long jobExecId, final StepContextImpl stepContext) {
+        final StepExecutionEntity entity = new StepExecutionEntity();
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            setStepData(em, jobExecId, stepContext,
+                stepContext.metricsAsMap().get(Metric.MetricType.READ_COUNT.name()).getValue(),
+                stepContext.metricsAsMap().get(Metric.MetricType.WRITE_COUNT.name()).getValue(),
+                stepContext.metricsAsMap().get(Metric.MetricType.COMMIT_COUNT.name()).getValue(),
+                stepContext.metricsAsMap().get(Metric.MetricType.ROLLBACK_COUNT.name()).getValue(),
+                stepContext.metricsAsMap().get(Metric.MetricType.READ_SKIP_COUNT.name()).getValue(),
+                stepContext.metricsAsMap().get(Metric.MetricType.PROCESS_SKIP_COUNT.name()).getValue(),
+                stepContext.metricsAsMap().get(Metric.MetricType.FILTER_COUNT.name()).getValue(),
+                stepContext.metricsAsMap().get(Metric.MetricType.WRITE_SKIP_COUNT.name()).getValue(),
+                entity);
+
+            final Object tx = txProvider.start(em);
+            try {
+                em.persist(entity);
+                txProvider.commit(tx);
+
+                final StepExecutionImpl stepExecution = new StepExecutionImpl(jobExecId, entity.getId());
+                stepExecution.setStepName(stepContext.getStepName());
+                return stepExecution;
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+//CHECKSTYLE:OFF
+    private void setStepData(final EntityManager em,
+                             final long jobExecId, final StepContextImpl stepContext,
+                             final long readCount, final long writeCount,
+                             final long commitCount, final long rollbackCount,
+                             final long readSkipCount, final long processSkipCount,
+                             final long filterCount, final long writeSkipCount,
+                             final StepExecutionEntity entity) {
+//CHECKSTYLE:ON
+        entity.setExecution(em.find(JobExecutionEntity.class, jobExecId));
+        entity.setBatchStatus(stepContext.getBatchStatus());
+        entity.setExitStatus(stepContext.getExitStatus());
+        entity.setStepName(stepContext.getStepName());
+        entity.setRead(readCount);
+        entity.setWrite(writeCount);
+        entity.setCommit(commitCount);
+        entity.setRollback(rollbackCount);
+        entity.setReadSkip(readSkipCount);
+        entity.setProcessSkip(processSkipCount);
+        entity.setFilter(filterCount);
+        entity.setWriteSkip(writeSkipCount);
+        entity.setStartTime(stepContext.getStartTimeTS());
+        entity.setEndTime(stepContext.getEndTimeTS());
+        try {
+            entity.setPersistentData(serialize(stepContext.getPersistentUserData()));
+        } catch (final IOException e) {
+            throw new PersistenceException(e);
+        }
+    }
+
+    @Override
+    public long getJobInstanceIdByExecutionId(final long executionId) throws NoSuchJobExecutionException {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            return em.find(JobExecutionEntity.class, executionId).getInstance().getJobInstanceId();
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public Set<Long> jobOperatorGetRunningExecutions(final String jobName) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final List<JobExecutionEntity> list = em.createNamedQuery(JobExecutionEntity.Queries.FIND_RUNNING, JobExecutionEntity.class)
+                .setParameter("name", jobName)
+                .setParameter("statuses", JobExecutionEntity.Queries.RUNNING_STATUSES)
+                .getResultList();
+
+            if (list == null) {
+                return Collections.emptySet();
+            }
+
+            final Set<Long> ids= new HashSet<Long>(list.size());
+            for (final JobExecutionEntity entity : list) {
+                ids.add(entity.getExecutionId());
+            }
+            return ids;
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public List<InternalJobExecution> jobOperatorGetJobExecutions(final long jobInstanceId) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final List<JobExecutionEntity> list = em.createNamedQuery(JobExecutionEntity.Queries.FIND_BY_INSTANCE, JobExecutionEntity.class)
+                .setParameter("instanceId", jobInstanceId).getResultList();
+
+            if (list == null) {
+                return Collections.emptyList();
+            }
+
+            final List<InternalJobExecution> result = new ArrayList<InternalJobExecution>(list.size());
+            for (final JobExecutionEntity entity : list) {
+                final JobExecutionImpl jobEx = new JobExecutionImpl(entity.getExecutionId(), jobInstanceId, this);
+                jobEx.setCreateTime(entity.getCreateTime());
+                jobEx.setStartTime(entity.getStartTime());
+                jobEx.setEndTime(entity.getEndTime());
+                jobEx.setLastUpdateTime(entity.getUpdateTime());
+                jobEx.setBatchStatus(entity.getBatchStatus().name());
+                jobEx.setExitStatus(entity.getExitStatus());
+                jobEx.setJobName(entity.getInstance().getName());
+
+                result.add(jobEx);
+            }
+            return result;
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public void updateWithFinalExecutionStatusesAndTimestamps(final long key, final BatchStatus batchStatus, final String exitStatus, final Timestamp updatets) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final Object tx = txProvider.start(em);
+            try {
+                final JobExecutionEntity instance = em.find(JobExecutionEntity.class, key);
+                instance.setBatchStatus(batchStatus);
+                instance.setUpdateTime(updatets);
+                instance.setEndTime(updatets);
+                instance.setExitStatus(exitStatus);
+
+                em.merge(instance);
+                txProvider.commit(tx);
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public InternalJobExecution jobOperatorGetJobExecution(final long jobExecutionId) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final JobExecutionEntity instance = em.find(JobExecutionEntity.class, jobExecutionId);
+
+            final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instance.getInstance().getJobInstanceId(), this);
+            jobEx.setCreateTime(instance.getCreateTime());
+            jobEx.setStartTime(instance.getStartTime());
+            jobEx.setEndTime(instance.getEndTime());
+            jobEx.setJobParameters(instance.getJobProperties());
+            jobEx.setLastUpdateTime(instance.getUpdateTime());
+            if (instance.getBatchStatus() != null) {
+                jobEx.setBatchStatus(instance.getBatchStatus().name());
+            }
+            jobEx.setExitStatus(instance.getExitStatus());
+            jobEx.setJobName(instance.getInstance().getName());
+            return jobEx;
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public void markJobStarted(final long key, final Timestamp startTS) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final Object tx = txProvider.start(em);
+            try {
+                final JobExecutionEntity instance = em.find(JobExecutionEntity.class, key);
+                instance.setBatchStatus(BatchStatus.STARTED);
+                instance.setStartTime(startTS);
+                instance.setUpdateTime(startTS);
+
+                em.merge(instance);
+                txProvider.commit(tx);
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public Properties getParameters(final long executionId) throws NoSuchJobExecutionException {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            return em.find(JobExecutionEntity.class, executionId).getJobProperties();
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public String jobOperatorQueryJobExecutionExitStatus(final long key) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            return em.find(JobExecutionEntity.class, key).getExitStatus();
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public String jobOperatorQueryJobExecutionBatchStatus(final long key) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            return em.find(JobExecutionEntity.class, key).getBatchStatus().name();
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public Timestamp jobOperatorQueryJobExecutionTimestamp(final long key, final TimestampType timetype) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final JobExecutionEntity entity = em.find(JobExecutionEntity.class, key);
+            if (timetype.equals(TimestampType.CREATE)) {
+                return entity.getCreateTime();
+            } else if (timetype.equals(TimestampType.END)) {
+                return entity.getEndTime();
+            } else if (timetype.equals(TimestampType.LAST_UPDATED)) {
+                return entity.getUpdateTime();
+            } else if (timetype.equals(TimestampType.STARTED)) {
+                return entity.getStartTime();
+            } else {
+                throw new IllegalArgumentException("Unexpected enum value.");
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public RuntimeFlowInSplitExecution createFlowInSplitExecution(final JobInstance jobInstance, final BatchStatus batchStatus) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final JobExecutionEntity instance = new JobExecutionEntity();
+            instance.setCreateTime(new Timestamp(System.currentTimeMillis()));
+            instance.setUpdateTime(instance.getCreateTime());
+            instance.setBatchStatus(batchStatus);
+
+            final Object tx = txProvider.start(em);
+            try {
+                instance.setInstance(em.find(JobInstanceEntity.class, jobInstance.getInstanceId()));
+
+                em.persist(instance);
+                txProvider.commit(tx);
+
+                final RuntimeFlowInSplitExecution jobExecution = new RuntimeFlowInSplitExecution(jobInstance, instance.getExecutionId(), this);
+                jobExecution.setBatchStatus(batchStatus.name());
+                jobExecution.setCreateTime(instance.getCreateTime());
+                jobExecution.setLastUpdateTime(instance.getCreateTime());
+                return jobExecution;
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public long getMostRecentExecutionId(final long jobInstanceId) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            return em.createNamedQuery(JobExecutionEntity.Queries.MOST_RECENT, JobExecutionEntity.class)
+                .setParameter("instanceId", jobInstanceId)
+                .setMaxResults(1)
+                .getSingleResult().getExecutionId();
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public void updateBatchStatusOnly(final long executionId, final BatchStatus batchStatus, final Timestamp timestamp) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final Object tx = txProvider.start(em);
+            final JobExecutionEntity instance = em.find(JobExecutionEntity.class, executionId);
+            instance.setBatchStatus(batchStatus);
+            instance.setUpdateTime(timestamp);
+
+            try {
+                em.merge(instance);
+                txProvider.commit(tx);
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public RuntimeJobExecution createJobExecution(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final JobExecutionEntity execution = new JobExecutionEntity();
+            execution.setJobProperties(jobParameters);
+            execution.setCreateTime(new Timestamp(System.currentTimeMillis()));
+            execution.setUpdateTime(execution.getCreateTime());
+            execution.setBatchStatus(batchStatus);
+
+            final Object tx = txProvider.start(em);
+            try {
+                execution.setInstance(em.find(JobInstanceEntity.class, jobInstance.getInstanceId()));
+
+                em.persist(execution);
+                txProvider.commit(tx);
+
+                final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, execution.getExecutionId(), this);
+                jobExecution.setBatchStatus(batchStatus.name());
+                jobExecution.setCreateTime(execution.getCreateTime());
+                jobExecution.setLastUpdateTime(execution.getCreateTime());
+                return jobExecution;
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public JobInstance createSubJobInstance(final String name, final String apptag) {
+        return createJobInstance(name, apptag, null);
+    }
+
+    @Override
+    public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final int start, final int count) {
+        return jobOperatorGetJobInstanceIds(jobName, null, start, count);
+    }
+
+    @Override
+    public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final String appTag, final int start, final int count) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final TypedQuery<JobInstanceEntity> query;
+            if (appTag != null) {
+                query = em.createNamedQuery(JobInstanceEntity.Queries.FIND_BY_NAME, JobInstanceEntity.class).setParameter("tag", appTag);
+            } else {
+                query = em.createNamedQuery(JobInstanceEntity.Queries.FIND_BY_NAME, JobInstanceEntity.class);
+            }
+            query.setParameter("name", jobName);
+
+            final List<JobInstanceEntity> resultList = query
+                .setFirstResult(start)
+                .setMaxResults(count)
+                .getResultList();
+
+            if (resultList == null) {
+                return Collections.emptyList();
+            }
+
+            final List<Long> result = new ArrayList<Long>(resultList.size());
+            for (final JobInstanceEntity entity : resultList) {
+                result.add(entity.getJobInstanceId());
+            }
+            return result;
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public JobInstance createJobInstance(final String name, final String apptag, final String jobXml) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final JobInstanceEntity instance = new JobInstanceEntity();
+            instance.setTag(apptag);
+            instance.setName(name);
+            instance.setJobXml(jobXml);
+
+            final Object tx = txProvider.start(em);
+            try {
+                em.persist(instance);
+                txProvider.commit(tx);
+
+                final JobInstanceImpl jobInstance = new JobInstanceImpl(instance.getJobInstanceId(), jobXml);
+                jobInstance.setJobName(name);
+                return jobInstance;
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public int jobOperatorGetJobInstanceCount(final String jobName) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            return em.createNamedQuery(JobInstanceEntity.Queries.COUNT_BY_NAME, Number.class)
+                .setParameter("name", jobName)
+                .getSingleResult().intValue();
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public int jobOperatorGetJobInstanceCount(final String jobName, final String appTag) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            return em.createNamedQuery(JobInstanceEntity.Queries.COUNT_BY_NAME, Number.class)
+                .setParameter("name", jobName)
+                .setParameter("tag", appTag)
+                .getSingleResult().intValue();
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public void updateStepStatus(final long stepExecutionId, final StepStatus stepStatus) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final Object tx = txProvider.start(em);
+            try {
+                final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepExecutionId);
+                entity.setBatchStatus(stepStatus.getBatchStatus());
+                entity.setExitStatus(stepStatus.getExitStatus());
+                entity.setLastRunStepExecutionId(stepStatus.getLastRunStepExecutionId());
+                entity.setNumPartitions(stepStatus.getNumPartitions());
+                entity.setPersistentData(stepStatus.getRawPersistentUserData());
+                entity.setStartCount(stepStatus.getStartCount());
+
+                em.merge(entity);
+                txProvider.commit(tx);
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public StepStatus createStepStatus(final long stepExecId) {
+        return new StepStatus(stepExecId); // instance already created
+    }
+
+    @Override
+    public void updateJobStatus(final long instanceId, final JobStatus jobStatus) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final Object tx = txProvider.start(em);
+            try {
+                final JobInstanceEntity entity = em.find(JobInstanceEntity.class, instanceId);
+                entity.setBatchStatus(jobStatus.getBatchStatus());
+                entity.setStep(jobStatus.getCurrentStepId());
+                entity.setLatestExecution(jobStatus.getLatestExecutionId());
+                entity.setExitStatus(jobStatus.getExitStatus());
+                entity.setRestartOn(jobStatus.getRestartOn());
+                if (jobStatus.getJobInstance() != null) {
+                    entity.setName(jobStatus.getJobInstance().getJobName());
+                }
+                em.merge(entity);
+
+                final List<JobExecutionEntity> executions = em.createNamedQuery(JobExecutionEntity.Queries.FIND_BY_INSTANCE, JobExecutionEntity.class)
+                    .setParameter("instanceId", instanceId)
+                    .getResultList();
+                if (executions != null) {
+                    for (final JobExecutionEntity e : executions) {
+                        e.setInstance(entity);
+                        em.merge(e);
+                    }
+                }
+                txProvider.commit(tx);
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public JobStatus getJobStatus(final long instanceId) {
+        final JobStatus status = new JobStatus(instanceId);
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final Object tx = txProvider.start(em);
+            try {
+                setJobStatusData(status, em.find(JobInstanceEntity.class, instanceId));
+                txProvider.commit(tx);
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+        return status;
+    }
+
+    private void setJobStatusData(final JobStatus status, final JobInstanceEntity entity) {
+        status.setBatchStatus(entity.getBatchStatus());
+        status.setCurrentStepId(entity.getStep());
+        status.setLatestExecutionId(entity.getLatestExecution());
+        status.setExitStatus(entity.getExitStatus());
+        status.setRestartOn(entity.getRestartOn());
+        status.setJobInstance(entity.toJobInstance());
+    }
+
+    @Override
+    public JobStatus createJobStatus(final long jobInstanceId) {
+        return new JobStatus(jobInstanceId); // instance already created
+    }
+
+    @Override
+    public void setCheckpointData(final CheckpointDataKey key, final CheckpointData value) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final Object tx = txProvider.start(em);
+            try {
+                final List<CheckpointEntity> checkpoints = em.createNamedQuery(CheckpointEntity.Queries.FIND, CheckpointEntity.class)
+                    .setParameter("jobInstanceId", key.getJobInstanceId())
+                    .setParameter("stepName", key.getStepName())
+                    .setParameter("type", key.getType())
+                    .getResultList();
+
+                final CheckpointEntity checkpoint;
+                final boolean isNew = checkpoints == null || checkpoints.isEmpty();
+                if (isNew) {
+                    checkpoint = new CheckpointEntity();
+                    checkpoint.setInstance(em.find(JobInstanceEntity.class, key.getJobInstanceId()));
+                    checkpoint.setStepName(key.getStepName());
+                    checkpoint.setType(key.getType());
+                } else {
+                    checkpoint = checkpoints.iterator().next();
+                }
+
+                checkpoint.setData(value.getRestartToken());
+
+                if (isNew) {
+                    em.persist(checkpoint);
+                } else {
+                    em.merge(checkpoint);
+                }
+                txProvider.commit(tx);
+            } catch (final Exception e) {
+                txProvider.rollback(tx, e);
+                throw new BatchContainerRuntimeException(e);
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public CheckpointData getCheckpointData(final CheckpointDataKey key) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final CheckpointEntity checkpoint = em.createNamedQuery(CheckpointEntity.Queries.FIND, CheckpointEntity.class)
+                .setParameter("jobInstanceId", key.getJobInstanceId())
+                .setParameter("stepName", key.getStepName())
+                .setParameter("type", key.getType())
+                .getSingleResult();
+
+            final CheckpointData data = new CheckpointData(checkpoint.getInstance().getJobInstanceId(), checkpoint.getStepName(), checkpoint.getType());
+            data.setRestartToken(checkpoint.getData());
+            return data;
+        } catch (final NoResultException nre) {
+            return null;
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public void init(final Properties batchConfig) {
+        final String txProviderClass = batchConfig.getProperty("persistence.jpa.transaction-provider", DefaultTransactionProvider.class.getName());
+        try {
+            txProvider = TransactionProvider.class.cast(Thread.currentThread().getContextClassLoader().loadClass(txProviderClass).newInstance());
+        } catch (final Exception e) {
+            throw new BatchContainerRuntimeException(e);
+        }
+        txProvider.init(batchConfig);
+
+        final String providerClass = batchConfig.getProperty("persistence.jpa.entity-manager-provider", DefaultEntityManagerProvider.class.getName());
+        try {
+            emProvider = EntityManagerProvider.class.cast(Thread.currentThread().getContextClassLoader().loadClass(providerClass).newInstance());
+        } catch (final Exception e) {
+            throw new BatchContainerRuntimeException(e);
+        }
+        emProvider.init(batchConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/67e63c08/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
deleted file mode 100644
index 9071d9b..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
+++ /dev/null
@@ -1,849 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.batchee.container.services.persistence;
-
-import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-import org.apache.batchee.container.exception.PersistenceException;
-import org.apache.batchee.container.impl.JobExecutionImpl;
-import org.apache.batchee.container.impl.JobInstanceImpl;
-import org.apache.batchee.container.impl.StepContextImpl;
-import org.apache.batchee.container.impl.StepExecutionImpl;
-import org.apache.batchee.container.impl.controller.PartitionedStepBuilder;
-import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
-import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
-import org.apache.batchee.container.impl.controller.chunk.PersistentDataWrapper;
-import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
-import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
-import org.apache.batchee.container.services.InternalJobExecution;
-import org.apache.batchee.container.services.persistence.jpa.EntityManagerProvider;
-import org.apache.batchee.container.services.persistence.jpa.TransactionProvider;
-import org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity;
-import org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity;
-import org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity;
-import org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity;
-import org.apache.batchee.container.services.persistence.jpa.provider.DefaultEntityManagerProvider;
-import org.apache.batchee.container.services.persistence.jpa.provider.DefaultTransactionProvider;
-import org.apache.batchee.container.status.JobStatus;
-import org.apache.batchee.container.status.StepStatus;
-import org.apache.batchee.spi.PersistenceManagerService;
-
-import javax.batch.operations.NoSuchJobExecutionException;
-import javax.batch.runtime.BatchStatus;
-import javax.batch.runtime.JobInstance;
-import javax.batch.runtime.Metric;
-import javax.batch.runtime.StepExecution;
-import javax.persistence.EntityManager;
-import javax.persistence.NoResultException;
-import javax.persistence.TypedQuery;
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.batchee.container.util.Serializations.deserialize;
-import static org.apache.batchee.container.util.Serializations.serialize;
-
-public class JPAPersistenceService implements PersistenceManagerService {
-    private static final String[] DELETE_QUERIES = {
-        StepExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID,
-        JobExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID
-    };
-
-    private EntityManagerProvider emProvider;
-    private TransactionProvider txProvider;
-
-    @Override
-    public void cleanUp(final long instanceId) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final Object tx = txProvider.start(em);
-            try {
-                for (final String query : DELETE_QUERIES) {
-                    em.createQuery(query).setParameter("instanceId", instanceId).executeUpdate();
-                }
-                txProvider.commit(tx);
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public StepStatus getStepStatus(final long instanceId, final String stepName) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final List<StepExecutionEntity> list = em.createNamedQuery(StepExecutionEntity.Queries.FIND_BY_INSTANCE_AND_NAME, StepExecutionEntity.class)
-                .setParameter("instanceId", instanceId)
-                .setParameter("step", stepName)
-                .getResultList();
-            if (list != null && !list.isEmpty()) {
-                final StepExecutionEntity entity = list.iterator().next();
-                final StepStatus status = new StepStatus(entity.getId(), entity.getStartCount());
-                status.setBatchStatus(entity.getBatchStatus());
-                status.setExitStatus(entity.getExitStatus());
-                status.setNumPartitions(entity.getNumPartitions());
-                status.setLastRunStepExecutionId(entity.getLastRunStepExecutionId());
-                if (entity.getPersistentData() != null) {
-                    status.setPersistentUserData(new PersistentDataWrapper(entity.getPersistentData()));
-                }
-                return status;
-            }
-            return null;
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public Map<Long, String> jobOperatorGetExternalJobInstanceData() {
-        final Map<Long, String> data = new HashMap<Long, String>();
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final List<JobInstanceEntity> list = em.createNamedQuery(JobInstanceEntity.Queries.FIND_EXTERNALS, JobInstanceEntity.class)
-                .setParameter("pattern", PartitionedStepBuilder.JOB_ID_SEPARATOR)
-                .getResultList();
-            if (list != null) {
-                for (final JobInstanceEntity elt : list) {
-                    data.put(elt.getJobInstanceId(), elt.getName());
-                }
-            }
-        } finally {
-            emProvider.release(em);
-        }
-        return data;
-    }
-
-    @Override
-    public JobStatus getJobStatusFromExecution(final long executionId) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final JobInstanceEntity entity = em.createNamedQuery(JobInstanceEntity.Queries.FIND_FROM_EXECUTION, JobInstanceEntity.class)
-                .setParameter("executionId", executionId)
-                .getSingleResult();
-            final JobStatus status = new JobStatus(entity.getJobInstanceId());
-            setJobStatusData(status, entity);
-            return status;
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public StepExecution getStepExecutionByStepExecutionId(final long stepExecId) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepExecId);
-            final StepExecutionImpl stepEx = new StepExecutionImpl(entity.getExecution().getExecutionId(), stepExecId);
-            setStepExecutionData(entity, stepEx);
-            return stepEx;
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    private void setStepExecutionData(final StepExecutionEntity entity, final StepExecutionImpl stepEx) {
-        stepEx.setBatchStatus(entity.getBatchStatus());
-        stepEx.setExitStatus(entity.getExitStatus());
-        stepEx.setStepName(entity.getStepName());
-        stepEx.setReadCount(entity.getRead());
-        stepEx.setWriteCount(entity.getWrite());
-        stepEx.setCommitCount(entity.getCommit());
-        stepEx.setRollbackCount(entity.getRollback());
-        stepEx.setReadSkipCount(entity.getReadSkip());
-        stepEx.setProcessSkipCount(entity.getProcessSkip());
-        stepEx.setFilterCount(entity.getFilter());
-        stepEx.setWriteSkipCount(entity.getWriteSkip());
-        stepEx.setStartTime(entity.getStartTime());
-        stepEx.setEndTime(entity.getEndTime());
-        try {
-            stepEx.setPersistentUserData(deserialize(entity.getPersistentData()));
-        } catch (final Exception e) {
-            throw new PersistenceException(e);
-        }
-    }
-
-    @Override
-    public List<StepExecution> getStepExecutionsForJobExecution(final long execid) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final List<StepExecutionEntity> steps = em.createNamedQuery(StepExecutionEntity.Queries.FIND_BY_EXECUTION, StepExecutionEntity.class)
-                .setParameter("executionId", execid)
-                .getResultList();
-
-            if (steps == null) {
-                return Collections.emptyList();
-            }
-
-            final List<StepExecution> executions = new ArrayList<StepExecution>(steps.size());
-            for (final StepExecutionEntity entity : steps) {
-                final StepExecutionImpl stepEx = new StepExecutionImpl(execid, entity.getId());
-                setStepExecutionData(entity, stepEx);
-                executions.add(stepEx);
-            }
-
-            return executions;
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public void updateStepExecution(final long jobExecId, final StepContextImpl stepContext) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final Object tx = txProvider.start(em);
-            try {
-                final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepContext.getStepInternalExecID());
-                setStepData(em, jobExecId, stepContext,
-                    stepContext.metricsAsMap().get(Metric.MetricType.READ_COUNT.name()).getValue(),
-                    stepContext.metricsAsMap().get(Metric.MetricType.WRITE_COUNT.name()).getValue(),
-                    stepContext.metricsAsMap().get(Metric.MetricType.COMMIT_COUNT.name()).getValue(),
-                    stepContext.metricsAsMap().get(Metric.MetricType.ROLLBACK_COUNT.name()).getValue(),
-                    stepContext.metricsAsMap().get(Metric.MetricType.READ_SKIP_COUNT.name()).getValue(),
-                    stepContext.metricsAsMap().get(Metric.MetricType.PROCESS_SKIP_COUNT.name()).getValue(),
-                    stepContext.metricsAsMap().get(Metric.MetricType.FILTER_COUNT.name()).getValue(),
-                    stepContext.metricsAsMap().get(Metric.MetricType.WRITE_SKIP_COUNT.name()).getValue(),
-                    entity);
-                txProvider.commit(tx);
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public StepExecutionImpl createStepExecution(final long jobExecId, final StepContextImpl stepContext) {
-        final StepExecutionEntity entity = new StepExecutionEntity();
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            setStepData(em, jobExecId, stepContext,
-                stepContext.metricsAsMap().get(Metric.MetricType.READ_COUNT.name()).getValue(),
-                stepContext.metricsAsMap().get(Metric.MetricType.WRITE_COUNT.name()).getValue(),
-                stepContext.metricsAsMap().get(Metric.MetricType.COMMIT_COUNT.name()).getValue(),
-                stepContext.metricsAsMap().get(Metric.MetricType.ROLLBACK_COUNT.name()).getValue(),
-                stepContext.metricsAsMap().get(Metric.MetricType.READ_SKIP_COUNT.name()).getValue(),
-                stepContext.metricsAsMap().get(Metric.MetricType.PROCESS_SKIP_COUNT.name()).getValue(),
-                stepContext.metricsAsMap().get(Metric.MetricType.FILTER_COUNT.name()).getValue(),
-                stepContext.metricsAsMap().get(Metric.MetricType.WRITE_SKIP_COUNT.name()).getValue(),
-                entity);
-
-            final Object tx = txProvider.start(em);
-            try {
-                em.persist(entity);
-                txProvider.commit(tx);
-
-                final StepExecutionImpl stepExecution = new StepExecutionImpl(jobExecId, entity.getId());
-                stepExecution.setStepName(stepContext.getStepName());
-                return stepExecution;
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-//CHECKSTYLE:OFF
-    private void setStepData(final EntityManager em,
-                             final long jobExecId, final StepContextImpl stepContext,
-                             final long readCount, final long writeCount,
-                             final long commitCount, final long rollbackCount,
-                             final long readSkipCount, final long processSkipCount,
-                             final long filterCount, final long writeSkipCount,
-                             final StepExecutionEntity entity) {
-//CHECKSTYLE:ON
-        entity.setExecution(em.find(JobExecutionEntity.class, jobExecId));
-        entity.setBatchStatus(stepContext.getBatchStatus());
-        entity.setExitStatus(stepContext.getExitStatus());
-        entity.setStepName(stepContext.getStepName());
-        entity.setRead(readCount);
-        entity.setWrite(writeCount);
-        entity.setCommit(commitCount);
-        entity.setRollback(rollbackCount);
-        entity.setReadSkip(readSkipCount);
-        entity.setProcessSkip(processSkipCount);
-        entity.setFilter(filterCount);
-        entity.setWriteSkip(writeSkipCount);
-        entity.setStartTime(stepContext.getStartTimeTS());
-        entity.setEndTime(stepContext.getEndTimeTS());
-        try {
-            entity.setPersistentData(serialize(stepContext.getPersistentUserData()));
-        } catch (final IOException e) {
-            throw new PersistenceException(e);
-        }
-    }
-
-    @Override
-    public long getJobInstanceIdByExecutionId(final long executionId) throws NoSuchJobExecutionException {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            return em.find(JobExecutionEntity.class, executionId).getInstance().getJobInstanceId();
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public Set<Long> jobOperatorGetRunningExecutions(final String jobName) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final List<JobExecutionEntity> list = em.createNamedQuery(JobExecutionEntity.Queries.FIND_RUNNING, JobExecutionEntity.class)
-                .setParameter("name", jobName)
-                .setParameter("statuses", JobExecutionEntity.Queries.RUNNING_STATUSES)
-                .getResultList();
-
-            if (list == null) {
-                return Collections.emptySet();
-            }
-
-            final Set<Long> ids= new HashSet<Long>(list.size());
-            for (final JobExecutionEntity entity : list) {
-                ids.add(entity.getExecutionId());
-            }
-            return ids;
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public List<InternalJobExecution> jobOperatorGetJobExecutions(final long jobInstanceId) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final List<JobExecutionEntity> list = em.createNamedQuery(JobExecutionEntity.Queries.FIND_BY_INSTANCE, JobExecutionEntity.class)
-                .setParameter("instanceId", jobInstanceId).getResultList();
-
-            if (list == null) {
-                return Collections.emptyList();
-            }
-
-            final List<InternalJobExecution> result = new ArrayList<InternalJobExecution>(list.size());
-            for (final JobExecutionEntity entity : list) {
-                final JobExecutionImpl jobEx = new JobExecutionImpl(entity.getExecutionId(), jobInstanceId, this);
-                jobEx.setCreateTime(entity.getCreateTime());
-                jobEx.setStartTime(entity.getStartTime());
-                jobEx.setEndTime(entity.getEndTime());
-                jobEx.setLastUpdateTime(entity.getUpdateTime());
-                jobEx.setBatchStatus(entity.getBatchStatus().name());
-                jobEx.setExitStatus(entity.getExitStatus());
-                jobEx.setJobName(entity.getInstance().getName());
-
-                result.add(jobEx);
-            }
-            return result;
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public void updateWithFinalExecutionStatusesAndTimestamps(final long key, final BatchStatus batchStatus, final String exitStatus, final Timestamp updatets) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final Object tx = txProvider.start(em);
-            try {
-                final JobExecutionEntity instance = em.find(JobExecutionEntity.class, key);
-                instance.setBatchStatus(batchStatus);
-                instance.setUpdateTime(updatets);
-                instance.setEndTime(updatets);
-                instance.setExitStatus(exitStatus);
-
-                em.merge(instance);
-                txProvider.commit(tx);
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public InternalJobExecution jobOperatorGetJobExecution(final long jobExecutionId) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final JobExecutionEntity instance = em.find(JobExecutionEntity.class, jobExecutionId);
-
-            final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instance.getInstance().getJobInstanceId(), this);
-            jobEx.setCreateTime(instance.getCreateTime());
-            jobEx.setStartTime(instance.getStartTime());
-            jobEx.setEndTime(instance.getEndTime());
-            jobEx.setJobParameters(instance.getJobProperties());
-            jobEx.setLastUpdateTime(instance.getUpdateTime());
-            if (instance.getBatchStatus() != null) {
-                jobEx.setBatchStatus(instance.getBatchStatus().name());
-            }
-            jobEx.setExitStatus(instance.getExitStatus());
-            jobEx.setJobName(instance.getInstance().getName());
-            return jobEx;
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public void markJobStarted(final long key, final Timestamp startTS) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final Object tx = txProvider.start(em);
-            try {
-                final JobExecutionEntity instance = em.find(JobExecutionEntity.class, key);
-                instance.setBatchStatus(BatchStatus.STARTED);
-                instance.setStartTime(startTS);
-                instance.setUpdateTime(startTS);
-
-                em.merge(instance);
-                txProvider.commit(tx);
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public Properties getParameters(final long executionId) throws NoSuchJobExecutionException {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            return em.find(JobExecutionEntity.class, executionId).getJobProperties();
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public String jobOperatorQueryJobExecutionExitStatus(final long key) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            return em.find(JobExecutionEntity.class, key).getExitStatus();
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public String jobOperatorQueryJobExecutionBatchStatus(final long key) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            return em.find(JobExecutionEntity.class, key).getBatchStatus().name();
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public Timestamp jobOperatorQueryJobExecutionTimestamp(final long key, final TimestampType timetype) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final JobExecutionEntity entity = em.find(JobExecutionEntity.class, key);
-            if (timetype.equals(TimestampType.CREATE)) {
-                return entity.getCreateTime();
-            } else if (timetype.equals(TimestampType.END)) {
-                return entity.getEndTime();
-            } else if (timetype.equals(TimestampType.LAST_UPDATED)) {
-                return entity.getUpdateTime();
-            } else if (timetype.equals(TimestampType.STARTED)) {
-                return entity.getStartTime();
-            } else {
-                throw new IllegalArgumentException("Unexpected enum value.");
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public RuntimeFlowInSplitExecution createFlowInSplitExecution(final JobInstance jobInstance, final BatchStatus batchStatus) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final JobExecutionEntity instance = new JobExecutionEntity();
-            instance.setCreateTime(new Timestamp(System.currentTimeMillis()));
-            instance.setUpdateTime(instance.getCreateTime());
-            instance.setBatchStatus(batchStatus);
-
-            final Object tx = txProvider.start(em);
-            try {
-                instance.setInstance(em.find(JobInstanceEntity.class, jobInstance.getInstanceId()));
-
-                em.persist(instance);
-                txProvider.commit(tx);
-
-                final RuntimeFlowInSplitExecution jobExecution = new RuntimeFlowInSplitExecution(jobInstance, instance.getExecutionId(), this);
-                jobExecution.setBatchStatus(batchStatus.name());
-                jobExecution.setCreateTime(instance.getCreateTime());
-                jobExecution.setLastUpdateTime(instance.getCreateTime());
-                return jobExecution;
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public long getMostRecentExecutionId(final long jobInstanceId) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            return em.createNamedQuery(JobExecutionEntity.Queries.MOST_RECENT, JobExecutionEntity.class)
-                .setParameter("instanceId", jobInstanceId)
-                .setMaxResults(1)
-                .getSingleResult().getExecutionId();
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public void updateBatchStatusOnly(final long executionId, final BatchStatus batchStatus, final Timestamp timestamp) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final Object tx = txProvider.start(em);
-            final JobExecutionEntity instance = em.find(JobExecutionEntity.class, executionId);
-            instance.setBatchStatus(batchStatus);
-            instance.setUpdateTime(timestamp);
-
-            try {
-                em.merge(instance);
-                txProvider.commit(tx);
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public RuntimeJobExecution createJobExecution(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final JobExecutionEntity execution = new JobExecutionEntity();
-            execution.setJobProperties(jobParameters);
-            execution.setCreateTime(new Timestamp(System.currentTimeMillis()));
-            execution.setUpdateTime(execution.getCreateTime());
-            execution.setBatchStatus(batchStatus);
-
-            final Object tx = txProvider.start(em);
-            try {
-                execution.setInstance(em.find(JobInstanceEntity.class, jobInstance.getInstanceId()));
-
-                em.persist(execution);
-                txProvider.commit(tx);
-
-                final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, execution.getExecutionId(), this);
-                jobExecution.setBatchStatus(batchStatus.name());
-                jobExecution.setCreateTime(execution.getCreateTime());
-                jobExecution.setLastUpdateTime(execution.getCreateTime());
-                return jobExecution;
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public JobInstance createSubJobInstance(final String name, final String apptag) {
-        return createJobInstance(name, apptag, null);
-    }
-
-    @Override
-    public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final int start, final int count) {
-        return jobOperatorGetJobInstanceIds(jobName, null, start, count);
-    }
-
-    @Override
-    public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final String appTag, final int start, final int count) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final TypedQuery<JobInstanceEntity> query;
-            if (appTag != null) {
-                query = em.createNamedQuery(JobInstanceEntity.Queries.FIND_BY_NAME, JobInstanceEntity.class).setParameter("tag", appTag);
-            } else {
-                query = em.createNamedQuery(JobInstanceEntity.Queries.FIND_BY_NAME, JobInstanceEntity.class);
-            }
-            query.setParameter("name", jobName);
-
-            final List<JobInstanceEntity> resultList = query
-                .setFirstResult(start)
-                .setMaxResults(count)
-                .getResultList();
-
-            if (resultList == null) {
-                return Collections.emptyList();
-            }
-
-            final List<Long> result = new ArrayList<Long>(resultList.size());
-            for (final JobInstanceEntity entity : resultList) {
-                result.add(entity.getJobInstanceId());
-            }
-            return result;
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public JobInstance createJobInstance(final String name, final String apptag, final String jobXml) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final JobInstanceEntity instance = new JobInstanceEntity();
-            instance.setTag(apptag);
-            instance.setName(name);
-            instance.setJobXml(jobXml);
-
-            final Object tx = txProvider.start(em);
-            try {
-                em.persist(instance);
-                txProvider.commit(tx);
-
-                final JobInstanceImpl jobInstance = new JobInstanceImpl(instance.getJobInstanceId(), jobXml);
-                jobInstance.setJobName(name);
-                return jobInstance;
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public int jobOperatorGetJobInstanceCount(final String jobName) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            return em.createNamedQuery(JobInstanceEntity.Queries.COUNT_BY_NAME, Number.class)
-                .setParameter("name", jobName)
-                .getSingleResult().intValue();
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public int jobOperatorGetJobInstanceCount(final String jobName, final String appTag) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            return em.createNamedQuery(JobInstanceEntity.Queries.COUNT_BY_NAME, Number.class)
-                .setParameter("name", jobName)
-                .setParameter("tag", appTag)
-                .getSingleResult().intValue();
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public void updateStepStatus(final long stepExecutionId, final StepStatus stepStatus) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final Object tx = txProvider.start(em);
-            try {
-                final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepExecutionId);
-                entity.setBatchStatus(stepStatus.getBatchStatus());
-                entity.setExitStatus(stepStatus.getExitStatus());
-                entity.setLastRunStepExecutionId(stepStatus.getLastRunStepExecutionId());
-                entity.setNumPartitions(stepStatus.getNumPartitions());
-                entity.setPersistentData(stepStatus.getRawPersistentUserData());
-                entity.setStartCount(stepStatus.getStartCount());
-
-                em.merge(entity);
-                txProvider.commit(tx);
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public StepStatus createStepStatus(final long stepExecId) {
-        return new StepStatus(stepExecId); // instance already created
-    }
-
-    @Override
-    public void updateJobStatus(final long instanceId, final JobStatus jobStatus) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final Object tx = txProvider.start(em);
-            try {
-                final JobInstanceEntity entity = em.find(JobInstanceEntity.class, instanceId);
-                entity.setBatchStatus(jobStatus.getBatchStatus());
-                entity.setStep(jobStatus.getCurrentStepId());
-                entity.setLatestExecution(jobStatus.getLatestExecutionId());
-                entity.setExitStatus(jobStatus.getExitStatus());
-                entity.setRestartOn(jobStatus.getRestartOn());
-                if (jobStatus.getJobInstance() != null) {
-                    entity.setName(jobStatus.getJobInstance().getJobName());
-                }
-                em.merge(entity);
-
-                final List<JobExecutionEntity> executions = em.createNamedQuery(JobExecutionEntity.Queries.FIND_BY_INSTANCE, JobExecutionEntity.class)
-                    .setParameter("instanceId", instanceId)
-                    .getResultList();
-                if (executions != null) {
-                    for (final JobExecutionEntity e : executions) {
-                        e.setInstance(entity);
-                        em.merge(e);
-                    }
-                }
-                txProvider.commit(tx);
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public JobStatus getJobStatus(final long instanceId) {
-        final JobStatus status = new JobStatus(instanceId);
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final Object tx = txProvider.start(em);
-            try {
-                setJobStatusData(status, em.find(JobInstanceEntity.class, instanceId));
-                txProvider.commit(tx);
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-        return status;
-    }
-
-    private void setJobStatusData(final JobStatus status, final JobInstanceEntity entity) {
-        status.setBatchStatus(entity.getBatchStatus());
-        status.setCurrentStepId(entity.getStep());
-        status.setLatestExecutionId(entity.getLatestExecution());
-        status.setExitStatus(entity.getExitStatus());
-        status.setRestartOn(entity.getRestartOn());
-        status.setJobInstance(entity.toJobInstance());
-    }
-
-    @Override
-    public JobStatus createJobStatus(final long jobInstanceId) {
-        return new JobStatus(jobInstanceId); // instance already created
-    }
-
-    @Override
-    public void setCheckpointData(final CheckpointDataKey key, final CheckpointData value) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final Object tx = txProvider.start(em);
-            try {
-                final List<CheckpointEntity> checkpoints = em.createNamedQuery(CheckpointEntity.Queries.FIND, CheckpointEntity.class)
-                    .setParameter("jobInstanceId", key.getJobInstanceId())
-                    .setParameter("stepName", key.getStepName())
-                    .setParameter("type", key.getType())
-                    .getResultList();
-
-                final CheckpointEntity checkpoint;
-                final boolean isNew = checkpoints == null || checkpoints.isEmpty();
-                if (isNew) {
-                    checkpoint = new CheckpointEntity();
-                    checkpoint.setInstance(em.find(JobInstanceEntity.class, key.getJobInstanceId()));
-                    checkpoint.setStepName(key.getStepName());
-                    checkpoint.setType(key.getType());
-                } else {
-                    checkpoint = checkpoints.iterator().next();
-                }
-
-                checkpoint.setData(value.getRestartToken());
-
-                if (isNew) {
-                    em.persist(checkpoint);
-                } else {
-                    em.merge(checkpoint);
-                }
-                txProvider.commit(tx);
-            } catch (final Exception e) {
-                txProvider.rollback(tx, e);
-                throw new BatchContainerRuntimeException(e);
-            }
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public CheckpointData getCheckpointData(final CheckpointDataKey key) {
-        final EntityManager em = emProvider.newEntityManager();
-        try {
-            final CheckpointEntity checkpoint = em.createNamedQuery(CheckpointEntity.Queries.FIND, CheckpointEntity.class)
-                .setParameter("jobInstanceId", key.getJobInstanceId())
-                .setParameter("stepName", key.getStepName())
-                .setParameter("type", key.getType())
-                .getSingleResult();
-
-            final CheckpointData data = new CheckpointData(checkpoint.getInstance().getJobInstanceId(), checkpoint.getStepName(), checkpoint.getType());
-            data.setRestartToken(checkpoint.getData());
-            return data;
-        } catch (final NoResultException nre) {
-            return null;
-        } finally {
-            emProvider.release(em);
-        }
-    }
-
-    @Override
-    public void init(final Properties batchConfig) {
-        final String txProviderClass = batchConfig.getProperty("persistence.jpa.transaction-provider", DefaultTransactionProvider.class.getName());
-        try {
-            txProvider = TransactionProvider.class.cast(Thread.currentThread().getContextClassLoader().loadClass(txProviderClass).newInstance());
-        } catch (final Exception e) {
-            throw new BatchContainerRuntimeException(e);
-        }
-        txProvider.init(batchConfig);
-
-        final String providerClass = batchConfig.getProperty("persistence.jpa.entity-manager-provider", DefaultEntityManagerProvider.class.getName());
-        try {
-            emProvider = EntityManagerProvider.class.cast(Thread.currentThread().getContextClassLoader().loadClass(providerClass).newInstance());
-        } catch (final Exception e) {
-            throw new BatchContainerRuntimeException(e);
-        }
-        emProvider.init(batchConfig);
-    }
-}