You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2015/11/30 21:30:00 UTC
incubator-batchee git commit: BATCHEE-77 adding cleanUp(Date) and
Eviction command to the cli to make it easier to remove old data
Repository: incubator-batchee
Updated Branches:
refs/heads/master 99972657e -> 02c322402
BATCHEE-77 adding cleanUp(Date) and Eviction command to the cli to make it easier to remove old data
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/02c32240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/02c32240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/02c32240
Branch: refs/heads/master
Commit: 02c322402e7cf37e20463c5d81c8ed92476c8b22
Parents: 9997265
Author: Romain Manni-Bucau <rm...@gmail.com>
Authored: Mon Nov 30 21:30:28 2015 +0100
Committer: Romain Manni-Bucau <rm...@gmail.com>
Committed: Mon Nov 30 21:30:28 2015 +0100
----------------------------------------------------------------------
.../JDBCPersistenceManagerService.java | 47 ++++++-
.../JPAPersistenceManagerService.java | 36 +++++-
.../MemoryPersistenceManagerService.java | 35 +++++-
.../services/persistence/jdbc/Dictionary.java | 46 +++++--
.../jpa/domain/CheckpointEntity.java | 6 +-
.../jpa/domain/JobExecutionEntity.java | 2 +
.../jpa/domain/JobInstanceEntity.java | 6 +-
.../jpa/domain/StepExecutionEntity.java | 2 +
.../batchee/spi/PersistenceManagerService.java | 6 +-
.../spi/PersistenceManagerServiceTest.java | 121 +++++++++++++++++++
.../apache/batchee/cli/command/Eviction.java | 42 +++++++
11 files changed, 329 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManagerService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManagerService.java
index 765a60d..d09abef 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManagerService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManagerService.java
@@ -67,6 +67,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -1588,11 +1589,49 @@ public class JDBCPersistenceManagerService implements PersistenceManagerService
}
}
+ @Override
+ public void cleanUp(final Date until) {
+ Connection conn = null;
+ try {
+ conn = getConnection();
+ deleteUntil(until, conn, dictionary.getDeleteStepExecutionUntil());
+ deleteUntil(until, conn, dictionary.getDeleteCheckpointUntil());
+ deleteUntil(until, conn, dictionary.getDeleteJobInstanceUntil());
+ deleteUntil(until, conn, dictionary.getDeleteJobExecutionUntil());
+ if (!conn.getAutoCommit()) {
+ conn.commit();
+ }
+ } catch (final SQLException e) {
+ throw new PersistenceException(e);
+ } finally {
+ cleanupConnection(conn, null, null);
+ }
+ }
+
+ private static void deleteUntil(final Date until, final Connection conn, final String delete) throws SQLException {
+ PreparedStatement statement = null;
+ try {
+ statement = conn.prepareStatement(delete);
+ statement.setTimestamp(1, new Timestamp(until.getTime()));
+ statement.executeUpdate();
+ } finally {
+ if (statement != null) {
+ statement.close();
+ }
+ }
+ }
+
private static void deleteFromInstanceId(final long instanceId, final Connection conn, final String delete) throws SQLException {
- final PreparedStatement statement = conn.prepareStatement(delete);
- statement.setLong(1, instanceId);
- statement.executeUpdate();
- statement.close();
+ PreparedStatement statement = null;
+ try {
+ statement = conn.prepareStatement(delete);
+ statement.setLong(1, instanceId);
+ statement.executeUpdate();
+ } finally {
+ if (statement != null) {
+ statement.close();
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
index 6e2904e..a695327 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
@@ -48,11 +48,13 @@ import javax.batch.runtime.Metric;
import javax.batch.runtime.StepExecution;
import javax.persistence.EntityManager;
import javax.persistence.NoResultException;
+import javax.persistence.TemporalType;
import javax.persistence.TypedQuery;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -68,10 +70,14 @@ import static org.apache.batchee.container.util.Serializations.serialize;
public class JPAPersistenceManagerService implements PersistenceManagerService {
private final static Logger LOGGER = Logger.getLogger(JPAPersistenceManagerService.class.getName());
- private static final String[] DELETE_QUERIES = {
+ private static final String[] DELETE_ID_QUERIES = {
StepExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID,
JobExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID
};
+ private static final String[] DELETE_DATE_QUERIES = {
+ StepExecutionEntity.Queries.DELETE_BY_DATE, CheckpointEntity.Queries.DELETE_BY_DATE,
+ JobInstanceEntity.Queries.DELETE_BY_DATE, JobExecutionEntity.Queries.DELETE_BY_DATE
+ };
private EntityManagerProvider emProvider;
private TransactionProvider txProvider;
@@ -82,8 +88,26 @@ public class JPAPersistenceManagerService implements PersistenceManagerService {
try {
final Object tx = txProvider.start(em);
try {
- for (final String query : DELETE_QUERIES) {
- em.createQuery(query).setParameter("instanceId", instanceId).executeUpdate();
+ for (final String query : DELETE_ID_QUERIES) {
+ em.createNamedQuery(query).setParameter("instanceId", instanceId).executeUpdate();
+ }
+ txProvider.commit(tx);
+ } catch (final Exception e) {
+ throw new BatchContainerRuntimeException(performRollback(tx, e));
+ }
+ } finally {
+ emProvider.release(em);
+ }
+ }
+
+ @Override
+ public void cleanUp(final Date until) {
+ final EntityManager em = emProvider.newEntityManager();
+ try {
+ final Object tx = txProvider.start(em);
+ try {
+ for (final String query : DELETE_DATE_QUERIES) {
+ em.createNamedQuery(query).setParameter("date", until, TemporalType.TIMESTAMP).executeUpdate();
}
txProvider.commit(tx);
} catch (final Exception e) {
@@ -305,7 +329,11 @@ public class JPAPersistenceManagerService implements PersistenceManagerService {
public long getJobInstanceIdByExecutionId(final long executionId) throws NoSuchJobExecutionException {
final EntityManager em = emProvider.newEntityManager();
try {
- return em.find(JobExecutionEntity.class, executionId).getInstance().getJobInstanceId();
+ final JobExecutionEntity jobExecutionEntity = em.find(JobExecutionEntity.class, executionId);
+ if (jobExecutionEntity == null) {
+ throw new NoSuchJobExecutionException("Execution #" + executionId);
+ }
+ return jobExecutionEntity.getInstance().getJobInstanceId();
} finally {
emProvider.release(em);
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
index d1f2274..a2165f8 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
@@ -38,6 +38,7 @@ import javax.batch.runtime.Metric;
import javax.batch.runtime.StepExecution;
import java.io.Serializable;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -192,6 +193,10 @@ public class MemoryPersistenceManagerService implements PersistenceManagerServic
@Override
public List<StepExecution> getStepExecutionsForJobExecution(final long execid) {
final Structures.ExecutionInstanceData executionInstanceData = data.executionInstanceData.get(execid);
+ if (executionInstanceData == null) {
+ return Collections.emptyList();
+ }
+
synchronized (executionInstanceData.stepExecutions) {
return executionInstanceData.stepExecutions;
}
@@ -296,7 +301,11 @@ public class MemoryPersistenceManagerService implements PersistenceManagerServic
@Override
public long getJobInstanceIdByExecutionId(final long executionId) throws NoSuchJobExecutionException {
- return data.executionInstanceData.get(executionId).execution.getInstanceId();
+ final Structures.ExecutionInstanceData executionInstanceData = data.executionInstanceData.get(executionId);
+ if (executionInstanceData == null) {
+ throw new NoSuchJobExecutionException("Execution #" + executionId);
+ }
+ return executionInstanceData.execution.getInstanceId();
}
@Override
@@ -408,6 +417,9 @@ public class MemoryPersistenceManagerService implements PersistenceManagerServic
final StepExecutionImpl stepExecution = new StepExecutionImpl(rootJobExecId, data.stepExecutionIdGenerator.getAndIncrement());
stepExecution.setStepName(stepName);
final Structures.ExecutionInstanceData executionInstanceData = data.executionInstanceData.get(rootJobExecId);
+ if (executionInstanceData == null) {
+ return null;
+ }
synchronized (executionInstanceData.stepExecutions) {
executionInstanceData.stepExecutions.add(stepExecution);
}
@@ -653,6 +665,27 @@ public class MemoryPersistenceManagerService implements PersistenceManagerServic
}
}
+ @Override
+ public void cleanUp(final Date until) {
+ final Collection<Long> instanceIdToRemove = new ArrayList<Long>();
+ for (final Map.Entry<Long, Structures.JobInstanceData> entry : data.jobInstanceData.entrySet()) {
+ boolean match = true;
+ for (final Structures.ExecutionInstanceData exec : entry.getValue().executions) {
+ if (exec.execution.getEndTime() == null || exec.execution.getEndTime().after(until)) {
+ match = false;
+ break;
+ }
+ }
+ if (match) {
+ instanceIdToRemove.add(entry.getKey());
+ }
+ }
+
+ for (final Long id : instanceIdToRemove) {
+ cleanUp(id);
+ }
+ }
+
private static class ReverseDateComparator implements Comparator<Date> {
public static final ReverseDateComparator INSTANCE = new ReverseDateComparator();
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jdbc/Dictionary.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jdbc/Dictionary.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jdbc/Dictionary.java
index 258fe23..33b7608 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jdbc/Dictionary.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jdbc/Dictionary.java
@@ -35,6 +35,7 @@ public class Dictionary {
String SELECT_CHECKPOINT = SELECT + "%s" + FROM + "%s" + WHERE + "%s = ? and %s = ? and %s = ?";
String UPDATE_CHECKPOINT = UPDATE + "%s set %s = ?" + WHERE + "%s = ? and %s = ? and %s = ?";
String DELETE_CHECKPOINT = DELETE + "%s" + WHERE + "%s = ?";
+ String DELETE_CHECKPOINT_UNTIL = DELETE + "%s WHERE %s IN (" + SELECT + " DISTINCT t1.%s FROM %s t1 WHERE (SELECT MAX(t0.%s) FROM %s t0 WHERE t0.%s = t1.%s) < ?)";
String[] JOB_INSTANCE_COLUMNS = { "jobInstanceId", "batchStatus", "exitStatus", "jobName", "jobXml", "latestExecution", "restartOn", "step", "tag" };
String CREATE_JOB_INSTANCE = CREATE_TABLE + "%s(%s %s %s, %s %s, %s %s, %s %s, %s %s, %s %s, %s %s, %s %s, %s %s, PRIMARY KEY (%s))";
@@ -48,6 +49,8 @@ public class Dictionary {
String JOB_INSTANCE_CREATE = INSERT_INTO + "%s" + "(%s, %s) VALUES(?, ?)";
String JOB_INSTANCE_CREATE_WITH_JOB_XML = INSERT_INTO + "%s" + "(%s, %s, %s) VALUES(?, ?, ?)";
String DELETE_JOB_INSTANCE = DELETE + "%s" + WHERE + "%s = ?";
+ String DELETE_JOB_INSTANCE_UNTIL = DELETE + "%s" + WHERE + "%s IN (" + SELECT + "distinct t1.%s" + FROM + "%s t1" + WHERE + "(" + SELECT + "max(t0.%s)" + FROM + "%s t0" +
+ WHERE + "t0.%s = t1.%s) < ?)";
String[] JOB_EXECUTION_COLUMNS = { "executionId", "batchStatus", "createTime", "endTime", "exitStatus", "jobProperties",
"startTime", "updateTime", "INSTANCE_JOBINSTANCEID" };
@@ -71,6 +74,7 @@ public class Dictionary {
String JOB_EXECUTION_CREATE = INSERT_INTO + "%s(%s, %s, %s, %s, %s) VALUES(?, ?, ?, ?, ?)";
String JOB_EXECUTION_MOST_RECENT = SELECT + "%s" + FROM + "%s" + WHERE + "%s = ? ORDER BY %s DESC";
String DELETE_JOB_EXECUTION = DELETE + "%s" + WHERE + "%s = ?";
+ String DELETE_JOB_EXECUTION_UNTIL = DELETE + "%s" + WHERE + "%s IN (" + SELECT + "distinct t0.%s" + FROM + "%s t0" + WHERE + "t0.%s < ?)";
String[] STEP_EXECUTION_COLUMNS = { "id", "batchStatus", "exec_commit", "endTime", "exitStatus", "exec_filter", "lastRunStepExecutionId", "numPartitions",
"persistentData", "exec_processskip", "exec_read", "exec_readskip", "exec_rollback", "startCount", "startTime", "stepName",
@@ -90,6 +94,7 @@ public class Dictionary {
"B.%s, B.%s, B.%s, B.%s, B.%s, B.%s, B.%s, B.%s" + FROM + "%s A inner join %s B ON A.%s = B.%s " + WHERE + "A.%s = ? and B.%s = ?";
String DELETE_STEP_EXECUTION = DELETE + "%s A inner join %s B ON A.%s = B.%s " + WHERE + "A.%s = ?";
+ String DELETE_STEP_EXECUTION_UNTIL = DELETE + "%s" + WHERE + "%s in (" + SELECT + "distinct t0.%s" + FROM + "%s t0 inner join %s t1 ON t0.%s=t1.%s" + WHERE + "t1.%s < ?)";
}
private final String checkpointTable;
@@ -98,6 +103,7 @@ public class Dictionary {
private final String selectCheckpoint;
private final String updateCheckpoint;
private final String deleteCheckpoint;
+ private final String deleteCheckpointUntil;
private final String jobInstanceTable;
private final String createJobInstanceTable;
private final String countJobInstanceByName;
@@ -110,6 +116,7 @@ public class Dictionary {
private final String createJobInstance;
private final String createJobInstanceWithJobXml;
private final String deleteJobInstance;
+ private final String deleteJobInstanceUntil;
private final String jobExecutionTable;
private final String createJobExecutionTable;
private final String findJobExecutionTimestamps;
@@ -127,6 +134,7 @@ public class Dictionary {
private final String createJobExecution;
private final String findMostRecentJobExecution;
private final String deleteJobExecution;
+ private final String deleteJobExecutionUntil;
private final String stepExecutionTable;
private final String createStepExecutionTable;
private final String finStepExecutionFromJobExecution;
@@ -136,6 +144,7 @@ public class Dictionary {
private final String updateStepExecution;
private final String findStepExecutionByJobInstanceAndStepName;
private final String deleteStepExecution;
+ private final String deleteStepExecutionUntil;
private final String[] checkpointColumns;
private final String[] jobInstanceColumns;
@@ -149,9 +158,14 @@ public class Dictionary {
this.jobExecutionTable = jobExecutionTable;
this.stepExecutionTable = stepExecutionTable;
- { // checkpoint
+ { // ensure to be able to build jointures prebuilding columns
checkpointColumns = columns(database, SQL.CHECKPOINT_COLUMNS);
+ jobExecutionColumns = columns(database, SQL.JOB_EXECUTION_COLUMNS);
+ jobInstanceColumns = columns(database, SQL.JOB_INSTANCE_COLUMNS);
+ stepExecutionColumns = columns(database, SQL.STEP_EXECUTION_COLUMNS);
+ }
+ { // checkpoint
this.createCheckpointTable = String.format(SQL.CREATE_CHECKPOINT, checkpointTable,
checkpointColumns[0], database.bigint(), database.autoIncrementId(),
checkpointColumns[1], database.blob(),
@@ -162,11 +176,11 @@ public class Dictionary {
this.selectCheckpoint = String.format(SQL.SELECT_CHECKPOINT, checkpointColumns[1], checkpointTable, checkpointColumns[4], checkpointColumns[3], checkpointColumns[2]);
this.updateCheckpoint = String.format(SQL.UPDATE_CHECKPOINT, checkpointTable, checkpointColumns[1], checkpointColumns[4], checkpointColumns[3], checkpointColumns[2]);
this.deleteCheckpoint = String.format(SQL.DELETE_CHECKPOINT, checkpointTable, checkpointColumns[4]);
+ this.deleteCheckpointUntil = String.format(SQL.DELETE_CHECKPOINT_UNTIL, checkpointTable, checkpointColumns[0], checkpointColumns[0], checkpointTable,
+ jobExecutionColumns[3], jobExecutionTable, checkpointColumns[4], jobExecutionColumns[8]);
}
{ // jobInstance
- jobInstanceColumns = columns(database, SQL.JOB_INSTANCE_COLUMNS);
-
this.createJobInstanceTable = String.format(SQL.CREATE_JOB_INSTANCE, jobInstanceTable,
jobInstanceColumns[0], database.bigint(), database.autoIncrementId(),
jobInstanceColumns[1], database.varchar20(),
@@ -191,11 +205,11 @@ public class Dictionary {
this.createJobInstanceWithJobXml = String.format(SQL.JOB_INSTANCE_CREATE_WITH_JOB_XML, jobInstanceTable, jobInstanceColumns[3],
jobInstanceColumns[8], jobInstanceColumns[4]);
this.deleteJobInstance = String.format(SQL.DELETE_JOB_INSTANCE, jobInstanceTable, jobInstanceColumns[0]);
+ this.deleteJobInstanceUntil = String.format(SQL.DELETE_JOB_INSTANCE_UNTIL, jobInstanceTable, jobInstanceColumns[0], jobInstanceColumns[0], jobInstanceTable,
+ jobExecutionColumns[3], jobExecutionTable, jobExecutionColumns[8], jobInstanceColumns[0]);
}
{ // jobExecution
- jobExecutionColumns = columns(database, SQL.JOB_EXECUTION_COLUMNS);
-
this.createJobExecutionTable = String.format(SQL.CREATE_JOB_EXECUTION, jobExecutionTable,
jobExecutionColumns[0], database.bigint(), database.autoIncrementId(),
jobExecutionColumns[1], database.varchar20(),
@@ -232,11 +246,11 @@ public class Dictionary {
this.findMostRecentJobExecution = String.format(SQL.JOB_EXECUTION_MOST_RECENT, jobExecutionColumns[0], jobExecutionTable,
jobExecutionColumns[8], jobExecutionColumns[2]);
this.deleteJobExecution = String.format(SQL.DELETE_JOB_EXECUTION, jobExecutionTable, jobExecutionColumns[8]);
+ this.deleteJobExecutionUntil = String.format(SQL.DELETE_JOB_EXECUTION_UNTIL, jobExecutionTable, jobExecutionColumns[0], jobExecutionColumns[0],
+ jobExecutionTable, jobExecutionColumns[3]);
}
{ // step execution
- stepExecutionColumns = columns(database, SQL.STEP_EXECUTION_COLUMNS);
-
this.createStepExecutionTable = String.format(SQL.CREATE_STEP_EXECUTION, stepExecutionTable,
stepExecutionColumns[0], database.bigint(), database.autoIncrementId(),
stepExecutionColumns[1], database.varchar20(),
@@ -278,6 +292,8 @@ public class Dictionary {
jobExecutionColumns[0], stepExecutionColumns[18], jobExecutionColumns[8], stepExecutionColumns[15]);
this.deleteStepExecution = String.format(SQL.DELETE_STEP_EXECUTION, jobExecutionTable, stepExecutionTable, jobExecutionColumns[0],
stepExecutionColumns[18], jobExecutionColumns[8]);
+ this.deleteStepExecutionUntil = String.format(SQL.DELETE_STEP_EXECUTION_UNTIL, stepExecutionTable, stepExecutionColumns[0], stepExecutionColumns[0],
+ stepExecutionTable, jobExecutionTable, stepExecutionColumns[18], jobExecutionColumns[0], jobExecutionColumns[3]);
}
}
@@ -480,4 +496,20 @@ public class Dictionary {
public String stepExecutionColumns(final int idx) {
return stepExecutionColumns[idx];
}
+
+ public String getDeleteCheckpointUntil() {
+ return deleteCheckpointUntil;
+ }
+
+ public String getDeleteJobInstanceUntil() {
+ return deleteJobInstanceUntil;
+ }
+
+ public String getDeleteJobExecutionUntil() {
+ return deleteJobExecutionUntil;
+ }
+
+ public String getDeleteStepExecutionUntil() {
+ return deleteStepExecutionUntil;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/CheckpointEntity.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/CheckpointEntity.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/CheckpointEntity.java
index ccac415..772a103 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/CheckpointEntity.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/CheckpointEntity.java
@@ -33,13 +33,17 @@ import javax.persistence.Table;
@NamedQueries({
@NamedQuery(name = CheckpointEntity.Queries.FIND,
query = "select c from CheckpointEntity c where c.instance.jobInstanceId = :jobInstanceId and c.stepName = :stepName and c.type = :type"),
- @NamedQuery(name = CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from CheckpointEntity e where e.instance.jobInstanceId = :id")
+ @NamedQuery(name = CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from CheckpointEntity e where e.instance.jobInstanceId = :id"),
+ @NamedQuery(
+ name = CheckpointEntity.Queries.DELETE_BY_DATE,
+ query = "delete from CheckpointEntity e where (select max(x.endTime) from JobExecutionEntity x where x.instance.jobInstanceId = e.instance.jobInstanceId) < :date")
})
@Table(name=CheckpointEntity.TABLE_NAME)
public class CheckpointEntity {
public static interface Queries {
String FIND = "org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity.find";
String DELETE_BY_INSTANCE_ID = "org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity.deleteByInstanceId";
+ String DELETE_BY_DATE = "org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity.deleteBydate";
}
public static final String TABLE_NAME = "BATCH_CHECKPOINT";
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobExecutionEntity.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobExecutionEntity.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobExecutionEntity.java
index b46918d..66051fa 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobExecutionEntity.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobExecutionEntity.java
@@ -40,6 +40,7 @@ import java.util.Properties;
query = "SELECT e FROM JobExecutionEntity e WHERE e.instance.jobInstanceId = :instanceId ORDER BY e.createTime DESC"),
@NamedQuery(name = JobExecutionEntity.Queries.FIND_BY_INSTANCE, query = "SELECT e FROM JobExecutionEntity e WHERE e.instance.jobInstanceId = :instanceId"),
@NamedQuery(name = JobExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from JobExecutionEntity e where e.instance.jobInstanceId = :instanceId"),
+ @NamedQuery(name = JobExecutionEntity.Queries.DELETE_BY_DATE, query = "delete from JobExecutionEntity e where e.endTime < :date"),
@NamedQuery(name = JobExecutionEntity.Queries.FIND_RUNNING, query = "SELECT e FROM JobExecutionEntity e WHERE e.batchStatus in :statuses and e.instance.name = :name")
})
@Table(name=JobExecutionEntity.TABLE_NAME)
@@ -49,6 +50,7 @@ public class JobExecutionEntity {
String FIND_BY_INSTANCE = "org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity.findByInstance";
String FIND_RUNNING = "org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity.findRunning";
String DELETE_BY_INSTANCE_ID = "org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity.deleteByInstanceId";
+ String DELETE_BY_DATE = "org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity.deleteByDate";
List<BatchStatus> RUNNING_STATUSES = Arrays.asList(BatchStatus.STARTED, BatchStatus.STARTING, BatchStatus.STOPPING);
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobInstanceEntity.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobInstanceEntity.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobInstanceEntity.java
index 94ebe81..f8261c4 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobInstanceEntity.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobInstanceEntity.java
@@ -41,7 +41,10 @@ import java.util.List;
@NamedQuery(name = JobInstanceEntity.Queries.FIND_EXTERNALS, query = "select j from JobInstanceEntity j where j.name not like :pattern"),
@NamedQuery(name = JobInstanceEntity.Queries.FIND_BY_NAME_AND_TAG, query = "select j from JobInstanceEntity j where j.name = :name and j.tag = tag"),
@NamedQuery(name = JobInstanceEntity.Queries.FIND_BY_NAME, query = "select j from JobInstanceEntity j where j.name = :name"),
- @NamedQuery(name = JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from JobInstanceEntity e where e.jobInstanceId = :instanceId")
+ @NamedQuery(name = JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from JobInstanceEntity e where e.jobInstanceId = :instanceId"),
+ @NamedQuery(
+ name = JobInstanceEntity.Queries.DELETE_BY_DATE,
+ query = "delete from JobInstanceEntity e where (select max(x.endTime) from JobExecutionEntity x where x.instance.jobInstanceId = e.jobInstanceId) < :date")
})
@Table(name=JobInstanceEntity.TABLE_NAME)
public class JobInstanceEntity {
@@ -53,6 +56,7 @@ public class JobInstanceEntity {
String FIND_EXTERNALS = "org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity.findExternals";
String FIND_FROM_EXECUTION = "org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity.findByExecution";
String DELETE_BY_INSTANCE_ID = "org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity.deleteFromInstanceId";
+ String DELETE_BY_DATE = "org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity.deleteByDate";
}
public static final String TABLE_NAME = "BATCH_JOBINSTANCE";
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/StepExecutionEntity.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/StepExecutionEntity.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/StepExecutionEntity.java
index 0c85674..f2706cf 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/StepExecutionEntity.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/StepExecutionEntity.java
@@ -36,6 +36,7 @@ import java.sql.Timestamp;
@NamedQueries({
@NamedQuery(name = StepExecutionEntity.Queries.FIND_BY_EXECUTION, query = "select s from StepExecutionEntity s where s.execution.executionId = :executionId"),
@NamedQuery(name = StepExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from StepExecutionEntity e where e.execution.instance.jobInstanceId = :instanceId"),
+ @NamedQuery(name = StepExecutionEntity.Queries.DELETE_BY_DATE, query = "delete from StepExecutionEntity e where e.execution.endTime < :date"),
@NamedQuery(name = StepExecutionEntity.Queries.FIND_BY_INSTANCE_AND_NAME,
query = "select se FROM StepExecutionEntity se where se.execution.instance.jobInstanceId = :instanceId and se.stepName = :step")
})
@@ -45,6 +46,7 @@ public class StepExecutionEntity {
String FIND_BY_EXECUTION = "org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity.findByExecution";
String FIND_BY_INSTANCE_AND_NAME = "org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity.findByInstanceAndName";
String DELETE_BY_INSTANCE_ID = "org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity.deleteByInstanceId";
+ String DELETE_BY_DATE = "org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity.deleteByDate";
}
public static final String TABLE_NAME = "BATCH_STEPEXECUTION";
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/spi/PersistenceManagerService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/spi/PersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/spi/PersistenceManagerService.java
index 243474e..a50e10a 100755
--- a/jbatch/src/main/java/org/apache/batchee/spi/PersistenceManagerService.java
+++ b/jbatch/src/main/java/org/apache/batchee/spi/PersistenceManagerService.java
@@ -18,10 +18,10 @@ package org.apache.batchee.spi;
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.StepExecutionImpl;
-import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
-import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
+import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.services.InternalJobExecution;
import org.apache.batchee.container.status.JobStatus;
import org.apache.batchee.container.status.StepStatus;
@@ -31,6 +31,7 @@ import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobInstance;
import javax.batch.runtime.StepExecution;
import java.sql.Timestamp;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -193,4 +194,5 @@ public interface PersistenceManagerService extends BatchService {
StepExecution getStepExecutionByStepExecutionId(long stepExecId);
void cleanUp(final long instanceId);
+ void cleanUp(final Date until);
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/test/java/org/apache/batchee/spi/PersistenceManagerServiceTest.java
----------------------------------------------------------------------
diff --git a/jbatch/src/test/java/org/apache/batchee/spi/PersistenceManagerServiceTest.java b/jbatch/src/test/java/org/apache/batchee/spi/PersistenceManagerServiceTest.java
new file mode 100644
index 0000000..a281fc7
--- /dev/null
+++ b/jbatch/src/test/java/org/apache/batchee/spi/PersistenceManagerServiceTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.spi;
+
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.StepExecutionImpl;
+import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
+import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
+import org.apache.batchee.container.impl.controller.chunk.CheckpointType;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.container.services.persistence.JDBCPersistenceManagerService;
+import org.apache.batchee.container.services.persistence.JPAPersistenceManagerService;
+import org.apache.batchee.container.services.persistence.MemoryPersistenceManagerService;
+import org.apache.batchee.container.status.JobStatus;
+import org.junit.Test;
+
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class PersistenceManagerServiceTest {
+ @Test
+ public void cleanUpUntil() {
+ for (final PersistenceManagerService service : asList(
+ new JDBCPersistenceManagerService() {{
+ init(new Properties());
+ }},
+ new JPAPersistenceManagerService() {{
+ init(new Properties());
+ }},
+ new MemoryPersistenceManagerService() {{
+ init(new Properties());
+ }})) {
+ System.out.println("");
+ System.out.println(" " + service);
+ System.out.println("");
+
+ for (int i = 0; i < 3; i++) {
+ final JobInstance instance = service.createJobInstance("test", "app", "xml");
+ final RuntimeJobExecution exec = service.createJobExecution(instance, new Properties(), BatchStatus.COMPLETED);
+ final StepExecutionImpl step = service.createStepExecution(exec.getExecutionId(), new StepContextImpl("step"));
+ service.createStepStatus(step.getStepExecutionId()).setBatchStatus(BatchStatus.STARTED);
+ service.setCheckpointData(
+ new CheckpointDataKey(instance.getInstanceId(), "step", CheckpointType.READER),
+ new CheckpointData(instance.getInstanceId(), "step", CheckpointType.READER) {{
+ setRestartToken("restart".getBytes());
+ }});
+ service.createJobStatus(instance.getInstanceId());
+ service.updateJobStatus(instance.getInstanceId(), new JobStatus(instance) {{
+ setBatchStatus(BatchStatus.COMPLETED);
+ }});
+ service.updateWithFinalExecutionStatusesAndTimestamps(exec.getExecutionId(), BatchStatus.COMPLETED, "ok", new Timestamp(System.currentTimeMillis()));
+
+ // sanity checks we persisted data before deleting them
+ assertNotNull(service.getJobStatus(instance.getInstanceId()));
+ assertNotNull(service.getJobInstanceIdByExecutionId(exec.getExecutionId()));
+ assertNotNull(service.jobOperatorGetJobExecution(exec.getExecutionId()));
+ assertNotNull(service.getStepExecutionByStepExecutionId(step.getStepExecutionId()));
+ assertNotNull(service.getCheckpointData(new CheckpointDataKey(instance.getInstanceId(), "step", CheckpointType.READER)));
+
+ if (i != 2) { // skip last since we are not there to have a break
+ try { // add some delay
+ Thread.sleep(1000);
+ } catch (final InterruptedException e) {
+ Thread.interrupted();
+ fail();
+ }
+ }
+ }
+
+ // now delete until the first one (+ delta)
+ final List<Long> instances = service.jobOperatorGetJobInstanceIds("test", 0, 10);
+ assertEquals(3, instances.size());
+
+ Collections.sort(instances); // we use increments everywhere so should be fine
+
+ final InternalJobExecution firstExec = service.jobOperatorGetJobExecution(service.getMostRecentExecutionId(instances.iterator().next()));
+ final Date until = new Date(firstExec.getEndTime().getTime() + 100);
+ service.cleanUp(until);
+
+ assertEquals(2, service.jobOperatorGetJobInstanceIds("test", 0, 10).size());
+ assertTrue(service.getStepExecutionsForJobExecution(firstExec.getExecutionId()).isEmpty());
+ assertNull(service.getCheckpointData(new CheckpointDataKey(firstExec.getInstanceId(), "step", CheckpointType.READER)));
+ try {
+ service.getJobInstanceIdByExecutionId(firstExec.getExecutionId());
+ fail();
+ } catch (final NoSuchJobExecutionException nsje) {
+ // ok
+ }
+ assertFalse(service.jobOperatorGetJobInstanceIds("test", 0, 10).contains(firstExec.getInstanceId()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/tools/cli/src/main/java/org/apache/batchee/cli/command/Eviction.java
----------------------------------------------------------------------
diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/command/Eviction.java b/tools/cli/src/main/java/org/apache/batchee/cli/command/Eviction.java
new file mode 100644
index 0000000..6dc9602
--- /dev/null
+++ b/tools/cli/src/main/java/org/apache/batchee/cli/command/Eviction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cli.command;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.spi.PersistenceManagerService;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+@Command(name = "evict", description = "remove old data, uses embedded configuration (no JAXRS support yet)")
+public class Eviction implements Runnable {
+ @Option(name = "-until", description = "date until when the eviction will occur (excluded), YYYYMMdd format", required = true)
+ private String date;
+
+ @Override
+ public void run() {
+ try {
+ final Date date = new SimpleDateFormat("YYYYMMdd").parse(this.date);
+ ServicesManager.find().service(PersistenceManagerService.class).cleanUp(date);
+ } catch (final ParseException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+}