You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2014/03/30 12:35:47 UTC
[4/6] unify PersistenceManagerSercie impl names
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/67e63c08/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
new file mode 100644
index 0000000..1fae89b
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
@@ -0,0 +1,849 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.services.persistence;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.exception.PersistenceException;
+import org.apache.batchee.container.impl.JobExecutionImpl;
+import org.apache.batchee.container.impl.JobInstanceImpl;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.StepExecutionImpl;
+import org.apache.batchee.container.impl.controller.PartitionedStepBuilder;
+import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
+import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
+import org.apache.batchee.container.impl.controller.chunk.PersistentDataWrapper;
+import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.container.services.persistence.jpa.EntityManagerProvider;
+import org.apache.batchee.container.services.persistence.jpa.TransactionProvider;
+import org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity;
+import org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity;
+import org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity;
+import org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity;
+import org.apache.batchee.container.services.persistence.jpa.provider.DefaultEntityManagerProvider;
+import org.apache.batchee.container.services.persistence.jpa.provider.DefaultTransactionProvider;
+import org.apache.batchee.container.status.JobStatus;
+import org.apache.batchee.container.status.StepStatus;
+import org.apache.batchee.spi.PersistenceManagerService;
+
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import javax.batch.runtime.Metric;
+import javax.batch.runtime.StepExecution;
+import javax.persistence.EntityManager;
+import javax.persistence.NoResultException;
+import javax.persistence.TypedQuery;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.batchee.container.util.Serializations.deserialize;
+import static org.apache.batchee.container.util.Serializations.serialize;
+
+public class JPAPersistenceManagerService implements PersistenceManagerService {
+ private static final String[] DELETE_QUERIES = {
+ StepExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID,
+ JobExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID
+ };
+
+ private EntityManagerProvider emProvider;
+ private TransactionProvider txProvider;
+
+ @Override
+ public void cleanUp(final long instanceId) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final Object tx = txProvider.start(em);
+ try {
+ for (final String query : DELETE_QUERIES) {
+ em.createQuery(query).setParameter("instanceId", instanceId).executeUpdate();
+ }
+ txProvider.commit(tx);
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public StepStatus getStepStatus(final long instanceId, final String stepName) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final List<StepExecutionEntity> list = em.createNamedQuery(StepExecutionEntity.Queries.FIND_BY_INSTANCE_AND_NAME, StepExecutionEntity.class)
+ .setParameter("instanceId", instanceId)
+ .setParameter("step", stepName)
+ .getResultList();
+ if (list != null && !list.isEmpty()) {
+ final StepExecutionEntity entity = list.iterator().next();
+ final StepStatus status = new StepStatus(entity.getId(), entity.getStartCount());
+ status.setBatchStatus(entity.getBatchStatus());
+ status.setExitStatus(entity.getExitStatus());
+ status.setNumPartitions(entity.getNumPartitions());
+ status.setLastRunStepExecutionId(entity.getLastRunStepExecutionId());
+ if (entity.getPersistentData() != null) {
+ status.setPersistentUserData(new PersistentDataWrapper(entity.getPersistentData()));
+ }
+ return status;
+ }
+ return null;
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public Map<Long, String> jobOperatorGetExternalJobInstanceData() {
+ final Map<Long, String> data = new HashMap<Long, String>();
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final List<JobInstanceEntity> list = em.createNamedQuery(JobInstanceEntity.Queries.FIND_EXTERNALS, JobInstanceEntity.class)
+ .setParameter("pattern", PartitionedStepBuilder.JOB_ID_SEPARATOR)
+ .getResultList();
+ if (list != null) {
+ for (final JobInstanceEntity elt : list) {
+ data.put(elt.getJobInstanceId(), elt.getName());
+ }
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ return data;
+ }
+
+ @Override
+ public JobStatus getJobStatusFromExecution(final long executionId) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final JobInstanceEntity entity = em.createNamedQuery(JobInstanceEntity.Queries.FIND_FROM_EXECUTION, JobInstanceEntity.class)
+ .setParameter("executionId", executionId)
+ .getSingleResult();
+ final JobStatus status = new JobStatus(entity.getJobInstanceId());
+ setJobStatusData(status, entity);
+ return status;
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public StepExecution getStepExecutionByStepExecutionId(final long stepExecId) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepExecId);
+ final StepExecutionImpl stepEx = new StepExecutionImpl(entity.getExecution().getExecutionId(), stepExecId);
+ setStepExecutionData(entity, stepEx);
+ return stepEx;
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ private void setStepExecutionData(final StepExecutionEntity entity, final StepExecutionImpl stepEx) {
+ stepEx.setBatchStatus(entity.getBatchStatus());
+ stepEx.setExitStatus(entity.getExitStatus());
+ stepEx.setStepName(entity.getStepName());
+ stepEx.setReadCount(entity.getRead());
+ stepEx.setWriteCount(entity.getWrite());
+ stepEx.setCommitCount(entity.getCommit());
+ stepEx.setRollbackCount(entity.getRollback());
+ stepEx.setReadSkipCount(entity.getReadSkip());
+ stepEx.setProcessSkipCount(entity.getProcessSkip());
+ stepEx.setFilterCount(entity.getFilter());
+ stepEx.setWriteSkipCount(entity.getWriteSkip());
+ stepEx.setStartTime(entity.getStartTime());
+ stepEx.setEndTime(entity.getEndTime());
+ try {
+ stepEx.setPersistentUserData(deserialize(entity.getPersistentData()));
+ } catch (final Exception e) {
+ throw new PersistenceException(e);
+ }
+ }
+
+ @Override
+ public List<StepExecution> getStepExecutionsForJobExecution(final long execid) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final List<StepExecutionEntity> steps = em.createNamedQuery(StepExecutionEntity.Queries.FIND_BY_EXECUTION, StepExecutionEntity.class)
+ .setParameter("executionId", execid)
+ .getResultList();
+
+ if (steps == null) {
+ return Collections.emptyList();
+ }
+
+ final List<StepExecution> executions = new ArrayList<StepExecution>(steps.size());
+ for (final StepExecutionEntity entity : steps) {
+ final StepExecutionImpl stepEx = new StepExecutionImpl(execid, entity.getId());
+ setStepExecutionData(entity, stepEx);
+ executions.add(stepEx);
+ }
+
+ return executions;
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public void updateStepExecution(final long jobExecId, final StepContextImpl stepContext) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final Object tx = txProvider.start(em);
+ try {
+ final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepContext.getStepInternalExecID());
+ setStepData(em, jobExecId, stepContext,
+ stepContext.metricsAsMap().get(Metric.MetricType.READ_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.WRITE_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.COMMIT_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.ROLLBACK_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.READ_SKIP_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.PROCESS_SKIP_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.FILTER_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.WRITE_SKIP_COUNT.name()).getValue(),
+ entity);
+ txProvider.commit(tx);
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public StepExecutionImpl createStepExecution(final long jobExecId, final StepContextImpl stepContext) {
+ final StepExecutionEntity entity = new StepExecutionEntity();
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ setStepData(em, jobExecId, stepContext,
+ stepContext.metricsAsMap().get(Metric.MetricType.READ_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.WRITE_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.COMMIT_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.ROLLBACK_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.READ_SKIP_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.PROCESS_SKIP_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.FILTER_COUNT.name()).getValue(),
+ stepContext.metricsAsMap().get(Metric.MetricType.WRITE_SKIP_COUNT.name()).getValue(),
+ entity);
+
+ final Object tx = txProvider.start(em);
+ try {
+ em.persist(entity);
+ txProvider.commit(tx);
+
+ final StepExecutionImpl stepExecution = new StepExecutionImpl(jobExecId, entity.getId());
+ stepExecution.setStepName(stepContext.getStepName());
+ return stepExecution;
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+//CHECKSTYLE:OFF
+ private void setStepData(final EntityManager em,
+ final long jobExecId, final StepContextImpl stepContext,
+ final long readCount, final long writeCount,
+ final long commitCount, final long rollbackCount,
+ final long readSkipCount, final long processSkipCount,
+ final long filterCount, final long writeSkipCount,
+ final StepExecutionEntity entity) {
+//CHECKSTYLE:ON
+ entity.setExecution(em.find(JobExecutionEntity.class, jobExecId));
+ entity.setBatchStatus(stepContext.getBatchStatus());
+ entity.setExitStatus(stepContext.getExitStatus());
+ entity.setStepName(stepContext.getStepName());
+ entity.setRead(readCount);
+ entity.setWrite(writeCount);
+ entity.setCommit(commitCount);
+ entity.setRollback(rollbackCount);
+ entity.setReadSkip(readSkipCount);
+ entity.setProcessSkip(processSkipCount);
+ entity.setFilter(filterCount);
+ entity.setWriteSkip(writeSkipCount);
+ entity.setStartTime(stepContext.getStartTimeTS());
+ entity.setEndTime(stepContext.getEndTimeTS());
+ try {
+ entity.setPersistentData(serialize(stepContext.getPersistentUserData()));
+ } catch (final IOException e) {
+ throw new PersistenceException(e);
+ }
+ }
+
+ @Override
+ public long getJobInstanceIdByExecutionId(final long executionId) throws NoSuchJobExecutionException {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ return em.find(JobExecutionEntity.class, executionId).getInstance().getJobInstanceId();
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public Set<Long> jobOperatorGetRunningExecutions(final String jobName) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final List<JobExecutionEntity> list = em.createNamedQuery(JobExecutionEntity.Queries.FIND_RUNNING, JobExecutionEntity.class)
+ .setParameter("name", jobName)
+ .setParameter("statuses", JobExecutionEntity.Queries.RUNNING_STATUSES)
+ .getResultList();
+
+ if (list == null) {
+ return Collections.emptySet();
+ }
+
+ final Set<Long> ids= new HashSet<Long>(list.size());
+ for (final JobExecutionEntity entity : list) {
+ ids.add(entity.getExecutionId());
+ }
+ return ids;
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public List<InternalJobExecution> jobOperatorGetJobExecutions(final long jobInstanceId) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final List<JobExecutionEntity> list = em.createNamedQuery(JobExecutionEntity.Queries.FIND_BY_INSTANCE, JobExecutionEntity.class)
+ .setParameter("instanceId", jobInstanceId).getResultList();
+
+ if (list == null) {
+ return Collections.emptyList();
+ }
+
+ final List<InternalJobExecution> result = new ArrayList<InternalJobExecution>(list.size());
+ for (final JobExecutionEntity entity : list) {
+ final JobExecutionImpl jobEx = new JobExecutionImpl(entity.getExecutionId(), jobInstanceId, this);
+ jobEx.setCreateTime(entity.getCreateTime());
+ jobEx.setStartTime(entity.getStartTime());
+ jobEx.setEndTime(entity.getEndTime());
+ jobEx.setLastUpdateTime(entity.getUpdateTime());
+ jobEx.setBatchStatus(entity.getBatchStatus().name());
+ jobEx.setExitStatus(entity.getExitStatus());
+ jobEx.setJobName(entity.getInstance().getName());
+
+ result.add(jobEx);
+ }
+ return result;
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public void updateWithFinalExecutionStatusesAndTimestamps(final long key, final BatchStatus batchStatus, final String exitStatus, final Timestamp updatets) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final Object tx = txProvider.start(em);
+ try {
+ final JobExecutionEntity instance = em.find(JobExecutionEntity.class, key);
+ instance.setBatchStatus(batchStatus);
+ instance.setUpdateTime(updatets);
+ instance.setEndTime(updatets);
+ instance.setExitStatus(exitStatus);
+
+ em.merge(instance);
+ txProvider.commit(tx);
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public InternalJobExecution jobOperatorGetJobExecution(final long jobExecutionId) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final JobExecutionEntity instance = em.find(JobExecutionEntity.class, jobExecutionId);
+
+ final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instance.getInstance().getJobInstanceId(), this);
+ jobEx.setCreateTime(instance.getCreateTime());
+ jobEx.setStartTime(instance.getStartTime());
+ jobEx.setEndTime(instance.getEndTime());
+ jobEx.setJobParameters(instance.getJobProperties());
+ jobEx.setLastUpdateTime(instance.getUpdateTime());
+ if (instance.getBatchStatus() != null) {
+ jobEx.setBatchStatus(instance.getBatchStatus().name());
+ }
+ jobEx.setExitStatus(instance.getExitStatus());
+ jobEx.setJobName(instance.getInstance().getName());
+ return jobEx;
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public void markJobStarted(final long key, final Timestamp startTS) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final Object tx = txProvider.start(em);
+ try {
+ final JobExecutionEntity instance = em.find(JobExecutionEntity.class, key);
+ instance.setBatchStatus(BatchStatus.STARTED);
+ instance.setStartTime(startTS);
+ instance.setUpdateTime(startTS);
+
+ em.merge(instance);
+ txProvider.commit(tx);
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public Properties getParameters(final long executionId) throws NoSuchJobExecutionException {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ return em.find(JobExecutionEntity.class, executionId).getJobProperties();
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public String jobOperatorQueryJobExecutionExitStatus(final long key) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ return em.find(JobExecutionEntity.class, key).getExitStatus();
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public String jobOperatorQueryJobExecutionBatchStatus(final long key) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ return em.find(JobExecutionEntity.class, key).getBatchStatus().name();
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public Timestamp jobOperatorQueryJobExecutionTimestamp(final long key, final TimestampType timetype) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final JobExecutionEntity entity = em.find(JobExecutionEntity.class, key);
+ if (timetype.equals(TimestampType.CREATE)) {
+ return entity.getCreateTime();
+ } else if (timetype.equals(TimestampType.END)) {
+ return entity.getEndTime();
+ } else if (timetype.equals(TimestampType.LAST_UPDATED)) {
+ return entity.getUpdateTime();
+ } else if (timetype.equals(TimestampType.STARTED)) {
+ return entity.getStartTime();
+ } else {
+ throw new IllegalArgumentException("Unexpected enum value.");
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public RuntimeFlowInSplitExecution createFlowInSplitExecution(final JobInstance jobInstance, final BatchStatus batchStatus) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final JobExecutionEntity instance = new JobExecutionEntity();
+ instance.setCreateTime(new Timestamp(System.currentTimeMillis()));
+ instance.setUpdateTime(instance.getCreateTime());
+ instance.setBatchStatus(batchStatus);
+
+ final Object tx = txProvider.start(em);
+ try {
+ instance.setInstance(em.find(JobInstanceEntity.class, jobInstance.getInstanceId()));
+
+ em.persist(instance);
+ txProvider.commit(tx);
+
+ final RuntimeFlowInSplitExecution jobExecution = new RuntimeFlowInSplitExecution(jobInstance, instance.getExecutionId(), this);
+ jobExecution.setBatchStatus(batchStatus.name());
+ jobExecution.setCreateTime(instance.getCreateTime());
+ jobExecution.setLastUpdateTime(instance.getCreateTime());
+ return jobExecution;
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public long getMostRecentExecutionId(final long jobInstanceId) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ return em.createNamedQuery(JobExecutionEntity.Queries.MOST_RECENT, JobExecutionEntity.class)
+ .setParameter("instanceId", jobInstanceId)
+ .setMaxResults(1)
+ .getSingleResult().getExecutionId();
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public void updateBatchStatusOnly(final long executionId, final BatchStatus batchStatus, final Timestamp timestamp) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final Object tx = txProvider.start(em);
+ final JobExecutionEntity instance = em.find(JobExecutionEntity.class, executionId);
+ instance.setBatchStatus(batchStatus);
+ instance.setUpdateTime(timestamp);
+
+ try {
+ em.merge(instance);
+ txProvider.commit(tx);
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public RuntimeJobExecution createJobExecution(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final JobExecutionEntity execution = new JobExecutionEntity();
+ execution.setJobProperties(jobParameters);
+ execution.setCreateTime(new Timestamp(System.currentTimeMillis()));
+ execution.setUpdateTime(execution.getCreateTime());
+ execution.setBatchStatus(batchStatus);
+
+ final Object tx = txProvider.start(em);
+ try {
+ execution.setInstance(em.find(JobInstanceEntity.class, jobInstance.getInstanceId()));
+
+ em.persist(execution);
+ txProvider.commit(tx);
+
+ final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, execution.getExecutionId(), this);
+ jobExecution.setBatchStatus(batchStatus.name());
+ jobExecution.setCreateTime(execution.getCreateTime());
+ jobExecution.setLastUpdateTime(execution.getCreateTime());
+ return jobExecution;
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public JobInstance createSubJobInstance(final String name, final String apptag) {
+ return createJobInstance(name, apptag, null);
+ }
+
+ @Override
+ public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final int start, final int count) {
+ return jobOperatorGetJobInstanceIds(jobName, null, start, count);
+ }
+
+ @Override
+ public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final String appTag, final int start, final int count) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final TypedQuery<JobInstanceEntity> query;
+ if (appTag != null) {
+ query = em.createNamedQuery(JobInstanceEntity.Queries.FIND_BY_NAME, JobInstanceEntity.class).setParameter("tag", appTag);
+ } else {
+ query = em.createNamedQuery(JobInstanceEntity.Queries.FIND_BY_NAME, JobInstanceEntity.class);
+ }
+ query.setParameter("name", jobName);
+
+ final List<JobInstanceEntity> resultList = query
+ .setFirstResult(start)
+ .setMaxResults(count)
+ .getResultList();
+
+ if (resultList == null) {
+ return Collections.emptyList();
+ }
+
+ final List<Long> result = new ArrayList<Long>(resultList.size());
+ for (final JobInstanceEntity entity : resultList) {
+ result.add(entity.getJobInstanceId());
+ }
+ return result;
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public JobInstance createJobInstance(final String name, final String apptag, final String jobXml) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final JobInstanceEntity instance = new JobInstanceEntity();
+ instance.setTag(apptag);
+ instance.setName(name);
+ instance.setJobXml(jobXml);
+
+ final Object tx = txProvider.start(em);
+ try {
+ em.persist(instance);
+ txProvider.commit(tx);
+
+ final JobInstanceImpl jobInstance = new JobInstanceImpl(instance.getJobInstanceId(), jobXml);
+ jobInstance.setJobName(name);
+ return jobInstance;
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public int jobOperatorGetJobInstanceCount(final String jobName) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ return em.createNamedQuery(JobInstanceEntity.Queries.COUNT_BY_NAME, Number.class)
+ .setParameter("name", jobName)
+ .getSingleResult().intValue();
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public int jobOperatorGetJobInstanceCount(final String jobName, final String appTag) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ return em.createNamedQuery(JobInstanceEntity.Queries.COUNT_BY_NAME, Number.class)
+ .setParameter("name", jobName)
+ .setParameter("tag", appTag)
+ .getSingleResult().intValue();
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public void updateStepStatus(final long stepExecutionId, final StepStatus stepStatus) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final Object tx = txProvider.start(em);
+ try {
+ final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepExecutionId);
+ entity.setBatchStatus(stepStatus.getBatchStatus());
+ entity.setExitStatus(stepStatus.getExitStatus());
+ entity.setLastRunStepExecutionId(stepStatus.getLastRunStepExecutionId());
+ entity.setNumPartitions(stepStatus.getNumPartitions());
+ entity.setPersistentData(stepStatus.getRawPersistentUserData());
+ entity.setStartCount(stepStatus.getStartCount());
+
+ em.merge(entity);
+ txProvider.commit(tx);
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public StepStatus createStepStatus(final long stepExecId) {
+ return new StepStatus(stepExecId); // instance already created
+ }
+
+ @Override
+ public void updateJobStatus(final long instanceId, final JobStatus jobStatus) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final Object tx = txProvider.start(em);
+ try {
+ final JobInstanceEntity entity = em.find(JobInstanceEntity.class, instanceId);
+ entity.setBatchStatus(jobStatus.getBatchStatus());
+ entity.setStep(jobStatus.getCurrentStepId());
+ entity.setLatestExecution(jobStatus.getLatestExecutionId());
+ entity.setExitStatus(jobStatus.getExitStatus());
+ entity.setRestartOn(jobStatus.getRestartOn());
+ if (jobStatus.getJobInstance() != null) {
+ entity.setName(jobStatus.getJobInstance().getJobName());
+ }
+ em.merge(entity);
+
+ final List<JobExecutionEntity> executions = em.createNamedQuery(JobExecutionEntity.Queries.FIND_BY_INSTANCE, JobExecutionEntity.class)
+ .setParameter("instanceId", instanceId)
+ .getResultList();
+ if (executions != null) {
+ for (final JobExecutionEntity e : executions) {
+ e.setInstance(entity);
+ em.merge(e);
+ }
+ }
+ txProvider.commit(tx);
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public JobStatus getJobStatus(final long instanceId) {
+ final JobStatus status = new JobStatus(instanceId);
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final Object tx = txProvider.start(em);
+ try {
+ setJobStatusData(status, em.find(JobInstanceEntity.class, instanceId));
+ txProvider.commit(tx);
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ return status;
+ }
+
+ private void setJobStatusData(final JobStatus status, final JobInstanceEntity entity) {
+ status.setBatchStatus(entity.getBatchStatus());
+ status.setCurrentStepId(entity.getStep());
+ status.setLatestExecutionId(entity.getLatestExecution());
+ status.setExitStatus(entity.getExitStatus());
+ status.setRestartOn(entity.getRestartOn());
+ status.setJobInstance(entity.toJobInstance());
+ }
+
+ @Override
+ public JobStatus createJobStatus(final long jobInstanceId) {
+ return new JobStatus(jobInstanceId); // instance already created
+ }
+
+ @Override
+ public void setCheckpointData(final CheckpointDataKey key, final CheckpointData value) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final Object tx = txProvider.start(em);
+ try {
+ final List<CheckpointEntity> checkpoints = em.createNamedQuery(CheckpointEntity.Queries.FIND, CheckpointEntity.class)
+ .setParameter("jobInstanceId", key.getJobInstanceId())
+ .setParameter("stepName", key.getStepName())
+ .setParameter("type", key.getType())
+ .getResultList();
+
+ final CheckpointEntity checkpoint;
+ final boolean isNew = checkpoints == null || checkpoints.isEmpty();
+ if (isNew) {
+ checkpoint = new CheckpointEntity();
+ checkpoint.setInstance(em.find(JobInstanceEntity.class, key.getJobInstanceId()));
+ checkpoint.setStepName(key.getStepName());
+ checkpoint.setType(key.getType());
+ } else {
+ checkpoint = checkpoints.iterator().next();
+ }
+
+ checkpoint.setData(value.getRestartToken());
+
+ if (isNew) {
+ em.persist(checkpoint);
+ } else {
+ em.merge(checkpoint);
+ }
+ txProvider.commit(tx);
+ } catch (final Exception e) {
+ txProvider.rollback(tx, e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public CheckpointData getCheckpointData(final CheckpointDataKey key) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final CheckpointEntity checkpoint = em.createNamedQuery(CheckpointEntity.Queries.FIND, CheckpointEntity.class)
+ .setParameter("jobInstanceId", key.getJobInstanceId())
+ .setParameter("stepName", key.getStepName())
+ .setParameter("type", key.getType())
+ .getSingleResult();
+
+ final CheckpointData data = new CheckpointData(checkpoint.getInstance().getJobInstanceId(), checkpoint.getStepName(), checkpoint.getType());
+ data.setRestartToken(checkpoint.getData());
+ return data;
+ } catch (final NoResultException nre) {
+ return null;
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public void init(final Properties batchConfig) {
+ final String txProviderClass = batchConfig.getProperty("persistence.jpa.transaction-provider", DefaultTransactionProvider.class.getName());
+ try {
+ txProvider = TransactionProvider.class.cast(Thread.currentThread().getContextClassLoader().loadClass(txProviderClass).newInstance());
+ } catch (final Exception e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+ txProvider.init(batchConfig);
+
+ final String providerClass = batchConfig.getProperty("persistence.jpa.entity-manager-provider", DefaultEntityManagerProvider.class.getName());
+ try {
+ emProvider = EntityManagerProvider.class.cast(Thread.currentThread().getContextClassLoader().loadClass(providerClass).newInstance());
+ } catch (final Exception e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+ emProvider.init(batchConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/67e63c08/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
deleted file mode 100644
index 9071d9b..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceService.java
+++ /dev/null
@@ -1,849 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.batchee.container.services.persistence;
-
-import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-import org.apache.batchee.container.exception.PersistenceException;
-import org.apache.batchee.container.impl.JobExecutionImpl;
-import org.apache.batchee.container.impl.JobInstanceImpl;
-import org.apache.batchee.container.impl.StepContextImpl;
-import org.apache.batchee.container.impl.StepExecutionImpl;
-import org.apache.batchee.container.impl.controller.PartitionedStepBuilder;
-import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
-import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
-import org.apache.batchee.container.impl.controller.chunk.PersistentDataWrapper;
-import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
-import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
-import org.apache.batchee.container.services.InternalJobExecution;
-import org.apache.batchee.container.services.persistence.jpa.EntityManagerProvider;
-import org.apache.batchee.container.services.persistence.jpa.TransactionProvider;
-import org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity;
-import org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity;
-import org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity;
-import org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity;
-import org.apache.batchee.container.services.persistence.jpa.provider.DefaultEntityManagerProvider;
-import org.apache.batchee.container.services.persistence.jpa.provider.DefaultTransactionProvider;
-import org.apache.batchee.container.status.JobStatus;
-import org.apache.batchee.container.status.StepStatus;
-import org.apache.batchee.spi.PersistenceManagerService;
-
-import javax.batch.operations.NoSuchJobExecutionException;
-import javax.batch.runtime.BatchStatus;
-import javax.batch.runtime.JobInstance;
-import javax.batch.runtime.Metric;
-import javax.batch.runtime.StepExecution;
-import javax.persistence.EntityManager;
-import javax.persistence.NoResultException;
-import javax.persistence.TypedQuery;
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.batchee.container.util.Serializations.deserialize;
-import static org.apache.batchee.container.util.Serializations.serialize;
-
-public class JPAPersistenceService implements PersistenceManagerService {
- private static final String[] DELETE_QUERIES = {
- StepExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID,
- JobExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID
- };
-
- private EntityManagerProvider emProvider;
- private TransactionProvider txProvider;
-
- @Override
- public void cleanUp(final long instanceId) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final Object tx = txProvider.start(em);
- try {
- for (final String query : DELETE_QUERIES) {
- em.createQuery(query).setParameter("instanceId", instanceId).executeUpdate();
- }
- txProvider.commit(tx);
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public StepStatus getStepStatus(final long instanceId, final String stepName) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final List<StepExecutionEntity> list = em.createNamedQuery(StepExecutionEntity.Queries.FIND_BY_INSTANCE_AND_NAME, StepExecutionEntity.class)
- .setParameter("instanceId", instanceId)
- .setParameter("step", stepName)
- .getResultList();
- if (list != null && !list.isEmpty()) {
- final StepExecutionEntity entity = list.iterator().next();
- final StepStatus status = new StepStatus(entity.getId(), entity.getStartCount());
- status.setBatchStatus(entity.getBatchStatus());
- status.setExitStatus(entity.getExitStatus());
- status.setNumPartitions(entity.getNumPartitions());
- status.setLastRunStepExecutionId(entity.getLastRunStepExecutionId());
- if (entity.getPersistentData() != null) {
- status.setPersistentUserData(new PersistentDataWrapper(entity.getPersistentData()));
- }
- return status;
- }
- return null;
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public Map<Long, String> jobOperatorGetExternalJobInstanceData() {
- final Map<Long, String> data = new HashMap<Long, String>();
- final EntityManager em = emProvider.newEntityManager();
- try {
- final List<JobInstanceEntity> list = em.createNamedQuery(JobInstanceEntity.Queries.FIND_EXTERNALS, JobInstanceEntity.class)
- .setParameter("pattern", PartitionedStepBuilder.JOB_ID_SEPARATOR)
- .getResultList();
- if (list != null) {
- for (final JobInstanceEntity elt : list) {
- data.put(elt.getJobInstanceId(), elt.getName());
- }
- }
- } finally {
- emProvider.release(em);
- }
- return data;
- }
-
- @Override
- public JobStatus getJobStatusFromExecution(final long executionId) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final JobInstanceEntity entity = em.createNamedQuery(JobInstanceEntity.Queries.FIND_FROM_EXECUTION, JobInstanceEntity.class)
- .setParameter("executionId", executionId)
- .getSingleResult();
- final JobStatus status = new JobStatus(entity.getJobInstanceId());
- setJobStatusData(status, entity);
- return status;
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public StepExecution getStepExecutionByStepExecutionId(final long stepExecId) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepExecId);
- final StepExecutionImpl stepEx = new StepExecutionImpl(entity.getExecution().getExecutionId(), stepExecId);
- setStepExecutionData(entity, stepEx);
- return stepEx;
- } finally {
- emProvider.release(em);
- }
- }
-
- private void setStepExecutionData(final StepExecutionEntity entity, final StepExecutionImpl stepEx) {
- stepEx.setBatchStatus(entity.getBatchStatus());
- stepEx.setExitStatus(entity.getExitStatus());
- stepEx.setStepName(entity.getStepName());
- stepEx.setReadCount(entity.getRead());
- stepEx.setWriteCount(entity.getWrite());
- stepEx.setCommitCount(entity.getCommit());
- stepEx.setRollbackCount(entity.getRollback());
- stepEx.setReadSkipCount(entity.getReadSkip());
- stepEx.setProcessSkipCount(entity.getProcessSkip());
- stepEx.setFilterCount(entity.getFilter());
- stepEx.setWriteSkipCount(entity.getWriteSkip());
- stepEx.setStartTime(entity.getStartTime());
- stepEx.setEndTime(entity.getEndTime());
- try {
- stepEx.setPersistentUserData(deserialize(entity.getPersistentData()));
- } catch (final Exception e) {
- throw new PersistenceException(e);
- }
- }
-
- @Override
- public List<StepExecution> getStepExecutionsForJobExecution(final long execid) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final List<StepExecutionEntity> steps = em.createNamedQuery(StepExecutionEntity.Queries.FIND_BY_EXECUTION, StepExecutionEntity.class)
- .setParameter("executionId", execid)
- .getResultList();
-
- if (steps == null) {
- return Collections.emptyList();
- }
-
- final List<StepExecution> executions = new ArrayList<StepExecution>(steps.size());
- for (final StepExecutionEntity entity : steps) {
- final StepExecutionImpl stepEx = new StepExecutionImpl(execid, entity.getId());
- setStepExecutionData(entity, stepEx);
- executions.add(stepEx);
- }
-
- return executions;
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public void updateStepExecution(final long jobExecId, final StepContextImpl stepContext) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final Object tx = txProvider.start(em);
- try {
- final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepContext.getStepInternalExecID());
- setStepData(em, jobExecId, stepContext,
- stepContext.metricsAsMap().get(Metric.MetricType.READ_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.WRITE_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.COMMIT_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.ROLLBACK_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.READ_SKIP_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.PROCESS_SKIP_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.FILTER_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.WRITE_SKIP_COUNT.name()).getValue(),
- entity);
- txProvider.commit(tx);
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public StepExecutionImpl createStepExecution(final long jobExecId, final StepContextImpl stepContext) {
- final StepExecutionEntity entity = new StepExecutionEntity();
- final EntityManager em = emProvider.newEntityManager();
- try {
- setStepData(em, jobExecId, stepContext,
- stepContext.metricsAsMap().get(Metric.MetricType.READ_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.WRITE_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.COMMIT_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.ROLLBACK_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.READ_SKIP_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.PROCESS_SKIP_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.FILTER_COUNT.name()).getValue(),
- stepContext.metricsAsMap().get(Metric.MetricType.WRITE_SKIP_COUNT.name()).getValue(),
- entity);
-
- final Object tx = txProvider.start(em);
- try {
- em.persist(entity);
- txProvider.commit(tx);
-
- final StepExecutionImpl stepExecution = new StepExecutionImpl(jobExecId, entity.getId());
- stepExecution.setStepName(stepContext.getStepName());
- return stepExecution;
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
-//CHECKSTYLE:OFF
- private void setStepData(final EntityManager em,
- final long jobExecId, final StepContextImpl stepContext,
- final long readCount, final long writeCount,
- final long commitCount, final long rollbackCount,
- final long readSkipCount, final long processSkipCount,
- final long filterCount, final long writeSkipCount,
- final StepExecutionEntity entity) {
-//CHECKSTYLE:ON
- entity.setExecution(em.find(JobExecutionEntity.class, jobExecId));
- entity.setBatchStatus(stepContext.getBatchStatus());
- entity.setExitStatus(stepContext.getExitStatus());
- entity.setStepName(stepContext.getStepName());
- entity.setRead(readCount);
- entity.setWrite(writeCount);
- entity.setCommit(commitCount);
- entity.setRollback(rollbackCount);
- entity.setReadSkip(readSkipCount);
- entity.setProcessSkip(processSkipCount);
- entity.setFilter(filterCount);
- entity.setWriteSkip(writeSkipCount);
- entity.setStartTime(stepContext.getStartTimeTS());
- entity.setEndTime(stepContext.getEndTimeTS());
- try {
- entity.setPersistentData(serialize(stepContext.getPersistentUserData()));
- } catch (final IOException e) {
- throw new PersistenceException(e);
- }
- }
-
- @Override
- public long getJobInstanceIdByExecutionId(final long executionId) throws NoSuchJobExecutionException {
- final EntityManager em = emProvider.newEntityManager();
- try {
- return em.find(JobExecutionEntity.class, executionId).getInstance().getJobInstanceId();
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public Set<Long> jobOperatorGetRunningExecutions(final String jobName) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final List<JobExecutionEntity> list = em.createNamedQuery(JobExecutionEntity.Queries.FIND_RUNNING, JobExecutionEntity.class)
- .setParameter("name", jobName)
- .setParameter("statuses", JobExecutionEntity.Queries.RUNNING_STATUSES)
- .getResultList();
-
- if (list == null) {
- return Collections.emptySet();
- }
-
- final Set<Long> ids= new HashSet<Long>(list.size());
- for (final JobExecutionEntity entity : list) {
- ids.add(entity.getExecutionId());
- }
- return ids;
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public List<InternalJobExecution> jobOperatorGetJobExecutions(final long jobInstanceId) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final List<JobExecutionEntity> list = em.createNamedQuery(JobExecutionEntity.Queries.FIND_BY_INSTANCE, JobExecutionEntity.class)
- .setParameter("instanceId", jobInstanceId).getResultList();
-
- if (list == null) {
- return Collections.emptyList();
- }
-
- final List<InternalJobExecution> result = new ArrayList<InternalJobExecution>(list.size());
- for (final JobExecutionEntity entity : list) {
- final JobExecutionImpl jobEx = new JobExecutionImpl(entity.getExecutionId(), jobInstanceId, this);
- jobEx.setCreateTime(entity.getCreateTime());
- jobEx.setStartTime(entity.getStartTime());
- jobEx.setEndTime(entity.getEndTime());
- jobEx.setLastUpdateTime(entity.getUpdateTime());
- jobEx.setBatchStatus(entity.getBatchStatus().name());
- jobEx.setExitStatus(entity.getExitStatus());
- jobEx.setJobName(entity.getInstance().getName());
-
- result.add(jobEx);
- }
- return result;
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public void updateWithFinalExecutionStatusesAndTimestamps(final long key, final BatchStatus batchStatus, final String exitStatus, final Timestamp updatets) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final Object tx = txProvider.start(em);
- try {
- final JobExecutionEntity instance = em.find(JobExecutionEntity.class, key);
- instance.setBatchStatus(batchStatus);
- instance.setUpdateTime(updatets);
- instance.setEndTime(updatets);
- instance.setExitStatus(exitStatus);
-
- em.merge(instance);
- txProvider.commit(tx);
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public InternalJobExecution jobOperatorGetJobExecution(final long jobExecutionId) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final JobExecutionEntity instance = em.find(JobExecutionEntity.class, jobExecutionId);
-
- final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instance.getInstance().getJobInstanceId(), this);
- jobEx.setCreateTime(instance.getCreateTime());
- jobEx.setStartTime(instance.getStartTime());
- jobEx.setEndTime(instance.getEndTime());
- jobEx.setJobParameters(instance.getJobProperties());
- jobEx.setLastUpdateTime(instance.getUpdateTime());
- if (instance.getBatchStatus() != null) {
- jobEx.setBatchStatus(instance.getBatchStatus().name());
- }
- jobEx.setExitStatus(instance.getExitStatus());
- jobEx.setJobName(instance.getInstance().getName());
- return jobEx;
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public void markJobStarted(final long key, final Timestamp startTS) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final Object tx = txProvider.start(em);
- try {
- final JobExecutionEntity instance = em.find(JobExecutionEntity.class, key);
- instance.setBatchStatus(BatchStatus.STARTED);
- instance.setStartTime(startTS);
- instance.setUpdateTime(startTS);
-
- em.merge(instance);
- txProvider.commit(tx);
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public Properties getParameters(final long executionId) throws NoSuchJobExecutionException {
- final EntityManager em = emProvider.newEntityManager();
- try {
- return em.find(JobExecutionEntity.class, executionId).getJobProperties();
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public String jobOperatorQueryJobExecutionExitStatus(final long key) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- return em.find(JobExecutionEntity.class, key).getExitStatus();
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public String jobOperatorQueryJobExecutionBatchStatus(final long key) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- return em.find(JobExecutionEntity.class, key).getBatchStatus().name();
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public Timestamp jobOperatorQueryJobExecutionTimestamp(final long key, final TimestampType timetype) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final JobExecutionEntity entity = em.find(JobExecutionEntity.class, key);
- if (timetype.equals(TimestampType.CREATE)) {
- return entity.getCreateTime();
- } else if (timetype.equals(TimestampType.END)) {
- return entity.getEndTime();
- } else if (timetype.equals(TimestampType.LAST_UPDATED)) {
- return entity.getUpdateTime();
- } else if (timetype.equals(TimestampType.STARTED)) {
- return entity.getStartTime();
- } else {
- throw new IllegalArgumentException("Unexpected enum value.");
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public RuntimeFlowInSplitExecution createFlowInSplitExecution(final JobInstance jobInstance, final BatchStatus batchStatus) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final JobExecutionEntity instance = new JobExecutionEntity();
- instance.setCreateTime(new Timestamp(System.currentTimeMillis()));
- instance.setUpdateTime(instance.getCreateTime());
- instance.setBatchStatus(batchStatus);
-
- final Object tx = txProvider.start(em);
- try {
- instance.setInstance(em.find(JobInstanceEntity.class, jobInstance.getInstanceId()));
-
- em.persist(instance);
- txProvider.commit(tx);
-
- final RuntimeFlowInSplitExecution jobExecution = new RuntimeFlowInSplitExecution(jobInstance, instance.getExecutionId(), this);
- jobExecution.setBatchStatus(batchStatus.name());
- jobExecution.setCreateTime(instance.getCreateTime());
- jobExecution.setLastUpdateTime(instance.getCreateTime());
- return jobExecution;
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public long getMostRecentExecutionId(final long jobInstanceId) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- return em.createNamedQuery(JobExecutionEntity.Queries.MOST_RECENT, JobExecutionEntity.class)
- .setParameter("instanceId", jobInstanceId)
- .setMaxResults(1)
- .getSingleResult().getExecutionId();
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public void updateBatchStatusOnly(final long executionId, final BatchStatus batchStatus, final Timestamp timestamp) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final Object tx = txProvider.start(em);
- final JobExecutionEntity instance = em.find(JobExecutionEntity.class, executionId);
- instance.setBatchStatus(batchStatus);
- instance.setUpdateTime(timestamp);
-
- try {
- em.merge(instance);
- txProvider.commit(tx);
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public RuntimeJobExecution createJobExecution(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final JobExecutionEntity execution = new JobExecutionEntity();
- execution.setJobProperties(jobParameters);
- execution.setCreateTime(new Timestamp(System.currentTimeMillis()));
- execution.setUpdateTime(execution.getCreateTime());
- execution.setBatchStatus(batchStatus);
-
- final Object tx = txProvider.start(em);
- try {
- execution.setInstance(em.find(JobInstanceEntity.class, jobInstance.getInstanceId()));
-
- em.persist(execution);
- txProvider.commit(tx);
-
- final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, execution.getExecutionId(), this);
- jobExecution.setBatchStatus(batchStatus.name());
- jobExecution.setCreateTime(execution.getCreateTime());
- jobExecution.setLastUpdateTime(execution.getCreateTime());
- return jobExecution;
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public JobInstance createSubJobInstance(final String name, final String apptag) {
- return createJobInstance(name, apptag, null);
- }
-
- @Override
- public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final int start, final int count) {
- return jobOperatorGetJobInstanceIds(jobName, null, start, count);
- }
-
- @Override
- public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final String appTag, final int start, final int count) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final TypedQuery<JobInstanceEntity> query;
- if (appTag != null) {
- query = em.createNamedQuery(JobInstanceEntity.Queries.FIND_BY_NAME, JobInstanceEntity.class).setParameter("tag", appTag);
- } else {
- query = em.createNamedQuery(JobInstanceEntity.Queries.FIND_BY_NAME, JobInstanceEntity.class);
- }
- query.setParameter("name", jobName);
-
- final List<JobInstanceEntity> resultList = query
- .setFirstResult(start)
- .setMaxResults(count)
- .getResultList();
-
- if (resultList == null) {
- return Collections.emptyList();
- }
-
- final List<Long> result = new ArrayList<Long>(resultList.size());
- for (final JobInstanceEntity entity : resultList) {
- result.add(entity.getJobInstanceId());
- }
- return result;
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public JobInstance createJobInstance(final String name, final String apptag, final String jobXml) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final JobInstanceEntity instance = new JobInstanceEntity();
- instance.setTag(apptag);
- instance.setName(name);
- instance.setJobXml(jobXml);
-
- final Object tx = txProvider.start(em);
- try {
- em.persist(instance);
- txProvider.commit(tx);
-
- final JobInstanceImpl jobInstance = new JobInstanceImpl(instance.getJobInstanceId(), jobXml);
- jobInstance.setJobName(name);
- return jobInstance;
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public int jobOperatorGetJobInstanceCount(final String jobName) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- return em.createNamedQuery(JobInstanceEntity.Queries.COUNT_BY_NAME, Number.class)
- .setParameter("name", jobName)
- .getSingleResult().intValue();
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public int jobOperatorGetJobInstanceCount(final String jobName, final String appTag) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- return em.createNamedQuery(JobInstanceEntity.Queries.COUNT_BY_NAME, Number.class)
- .setParameter("name", jobName)
- .setParameter("tag", appTag)
- .getSingleResult().intValue();
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public void updateStepStatus(final long stepExecutionId, final StepStatus stepStatus) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final Object tx = txProvider.start(em);
- try {
- final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepExecutionId);
- entity.setBatchStatus(stepStatus.getBatchStatus());
- entity.setExitStatus(stepStatus.getExitStatus());
- entity.setLastRunStepExecutionId(stepStatus.getLastRunStepExecutionId());
- entity.setNumPartitions(stepStatus.getNumPartitions());
- entity.setPersistentData(stepStatus.getRawPersistentUserData());
- entity.setStartCount(stepStatus.getStartCount());
-
- em.merge(entity);
- txProvider.commit(tx);
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public StepStatus createStepStatus(final long stepExecId) {
- return new StepStatus(stepExecId); // instance already created
- }
-
- @Override
- public void updateJobStatus(final long instanceId, final JobStatus jobStatus) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final Object tx = txProvider.start(em);
- try {
- final JobInstanceEntity entity = em.find(JobInstanceEntity.class, instanceId);
- entity.setBatchStatus(jobStatus.getBatchStatus());
- entity.setStep(jobStatus.getCurrentStepId());
- entity.setLatestExecution(jobStatus.getLatestExecutionId());
- entity.setExitStatus(jobStatus.getExitStatus());
- entity.setRestartOn(jobStatus.getRestartOn());
- if (jobStatus.getJobInstance() != null) {
- entity.setName(jobStatus.getJobInstance().getJobName());
- }
- em.merge(entity);
-
- final List<JobExecutionEntity> executions = em.createNamedQuery(JobExecutionEntity.Queries.FIND_BY_INSTANCE, JobExecutionEntity.class)
- .setParameter("instanceId", instanceId)
- .getResultList();
- if (executions != null) {
- for (final JobExecutionEntity e : executions) {
- e.setInstance(entity);
- em.merge(e);
- }
- }
- txProvider.commit(tx);
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public JobStatus getJobStatus(final long instanceId) {
- final JobStatus status = new JobStatus(instanceId);
- final EntityManager em = emProvider.newEntityManager();
- try {
- final Object tx = txProvider.start(em);
- try {
- setJobStatusData(status, em.find(JobInstanceEntity.class, instanceId));
- txProvider.commit(tx);
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- return status;
- }
-
- private void setJobStatusData(final JobStatus status, final JobInstanceEntity entity) {
- status.setBatchStatus(entity.getBatchStatus());
- status.setCurrentStepId(entity.getStep());
- status.setLatestExecutionId(entity.getLatestExecution());
- status.setExitStatus(entity.getExitStatus());
- status.setRestartOn(entity.getRestartOn());
- status.setJobInstance(entity.toJobInstance());
- }
-
- @Override
- public JobStatus createJobStatus(final long jobInstanceId) {
- return new JobStatus(jobInstanceId); // instance already created
- }
-
- @Override
- public void setCheckpointData(final CheckpointDataKey key, final CheckpointData value) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final Object tx = txProvider.start(em);
- try {
- final List<CheckpointEntity> checkpoints = em.createNamedQuery(CheckpointEntity.Queries.FIND, CheckpointEntity.class)
- .setParameter("jobInstanceId", key.getJobInstanceId())
- .setParameter("stepName", key.getStepName())
- .setParameter("type", key.getType())
- .getResultList();
-
- final CheckpointEntity checkpoint;
- final boolean isNew = checkpoints == null || checkpoints.isEmpty();
- if (isNew) {
- checkpoint = new CheckpointEntity();
- checkpoint.setInstance(em.find(JobInstanceEntity.class, key.getJobInstanceId()));
- checkpoint.setStepName(key.getStepName());
- checkpoint.setType(key.getType());
- } else {
- checkpoint = checkpoints.iterator().next();
- }
-
- checkpoint.setData(value.getRestartToken());
-
- if (isNew) {
- em.persist(checkpoint);
- } else {
- em.merge(checkpoint);
- }
- txProvider.commit(tx);
- } catch (final Exception e) {
- txProvider.rollback(tx, e);
- throw new BatchContainerRuntimeException(e);
- }
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public CheckpointData getCheckpointData(final CheckpointDataKey key) {
- final EntityManager em = emProvider.newEntityManager();
- try {
- final CheckpointEntity checkpoint = em.createNamedQuery(CheckpointEntity.Queries.FIND, CheckpointEntity.class)
- .setParameter("jobInstanceId", key.getJobInstanceId())
- .setParameter("stepName", key.getStepName())
- .setParameter("type", key.getType())
- .getSingleResult();
-
- final CheckpointData data = new CheckpointData(checkpoint.getInstance().getJobInstanceId(), checkpoint.getStepName(), checkpoint.getType());
- data.setRestartToken(checkpoint.getData());
- return data;
- } catch (final NoResultException nre) {
- return null;
- } finally {
- emProvider.release(em);
- }
- }
-
- @Override
- public void init(final Properties batchConfig) {
- final String txProviderClass = batchConfig.getProperty("persistence.jpa.transaction-provider", DefaultTransactionProvider.class.getName());
- try {
- txProvider = TransactionProvider.class.cast(Thread.currentThread().getContextClassLoader().loadClass(txProviderClass).newInstance());
- } catch (final Exception e) {
- throw new BatchContainerRuntimeException(e);
- }
- txProvider.init(batchConfig);
-
- final String providerClass = batchConfig.getProperty("persistence.jpa.entity-manager-provider", DefaultEntityManagerProvider.class.getName());
- try {
- emProvider = EntityManagerProvider.class.cast(Thread.currentThread().getContextClassLoader().loadClass(providerClass).newInstance());
- } catch (final Exception e) {
- throw new BatchContainerRuntimeException(e);
- }
- emProvider.init(batchConfig);
- }
-}