You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2015/11/30 21:30:00 UTC

incubator-batchee git commit: BATCHEE-77 adding cleanUp(Date) and Eviction command to the cli to make it easier to remove old data

Repository: incubator-batchee
Updated Branches:
  refs/heads/master 99972657e -> 02c322402


BATCHEE-77 adding cleanUp(Date) and Eviction command to the cli to make it easier to remove old data


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/02c32240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/02c32240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/02c32240

Branch: refs/heads/master
Commit: 02c322402e7cf37e20463c5d81c8ed92476c8b22
Parents: 9997265
Author: Romain Manni-Bucau <rm...@gmail.com>
Authored: Mon Nov 30 21:30:28 2015 +0100
Committer: Romain Manni-Bucau <rm...@gmail.com>
Committed: Mon Nov 30 21:30:28 2015 +0100

----------------------------------------------------------------------
 .../JDBCPersistenceManagerService.java          |  47 ++++++-
 .../JPAPersistenceManagerService.java           |  36 +++++-
 .../MemoryPersistenceManagerService.java        |  35 +++++-
 .../services/persistence/jdbc/Dictionary.java   |  46 +++++--
 .../jpa/domain/CheckpointEntity.java            |   6 +-
 .../jpa/domain/JobExecutionEntity.java          |   2 +
 .../jpa/domain/JobInstanceEntity.java           |   6 +-
 .../jpa/domain/StepExecutionEntity.java         |   2 +
 .../batchee/spi/PersistenceManagerService.java  |   6 +-
 .../spi/PersistenceManagerServiceTest.java      | 121 +++++++++++++++++++
 .../apache/batchee/cli/command/Eviction.java    |  42 +++++++
 11 files changed, 329 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManagerService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManagerService.java
index 765a60d..d09abef 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManagerService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManagerService.java
@@ -67,6 +67,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -1588,11 +1589,49 @@ public class JDBCPersistenceManagerService implements PersistenceManagerService
         }
     }
 
+    @Override
+    public void cleanUp(final Date until) {
+        Connection conn = null;
+        try {
+            conn = getConnection();
+            deleteUntil(until, conn, dictionary.getDeleteStepExecutionUntil());
+            deleteUntil(until, conn, dictionary.getDeleteCheckpointUntil());
+            deleteUntil(until, conn, dictionary.getDeleteJobInstanceUntil());
+            deleteUntil(until, conn, dictionary.getDeleteJobExecutionUntil());
+            if (!conn.getAutoCommit()) {
+                conn.commit();
+            }
+        } catch (final SQLException e) {
+            throw new PersistenceException(e);
+        } finally {
+            cleanupConnection(conn, null, null);
+        }
+    }
+
+    private static void deleteUntil(final Date until, final Connection conn, final String delete) throws SQLException {
+        PreparedStatement statement = null;
+        try {
+            statement = conn.prepareStatement(delete);
+            statement.setTimestamp(1, new Timestamp(until.getTime()));
+            statement.executeUpdate();
+        } finally {
+            if (statement != null) {
+                statement.close();
+            }
+        }
+    }
+
     private static void deleteFromInstanceId(final long instanceId, final Connection conn, final String delete) throws SQLException {
-        final PreparedStatement statement = conn.prepareStatement(delete);
-        statement.setLong(1, instanceId);
-        statement.executeUpdate();
-        statement.close();
+        PreparedStatement statement = null;
+        try {
+            statement = conn.prepareStatement(delete);
+            statement.setLong(1, instanceId);
+            statement.executeUpdate();
+        } finally {
+            if (statement != null) {
+                statement.close();
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
index 6e2904e..a695327 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JPAPersistenceManagerService.java
@@ -48,11 +48,13 @@ import javax.batch.runtime.Metric;
 import javax.batch.runtime.StepExecution;
 import javax.persistence.EntityManager;
 import javax.persistence.NoResultException;
+import javax.persistence.TemporalType;
 import javax.persistence.TypedQuery;
 import java.io.IOException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -68,10 +70,14 @@ import static org.apache.batchee.container.util.Serializations.serialize;
 public class JPAPersistenceManagerService implements PersistenceManagerService {
     private final static Logger LOGGER = Logger.getLogger(JPAPersistenceManagerService.class.getName());
 
-    private static final String[] DELETE_QUERIES = {
+    private static final String[] DELETE_ID_QUERIES = {
         StepExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID,
         JobExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID
     };
+    private static final String[] DELETE_DATE_QUERIES = {
+        StepExecutionEntity.Queries.DELETE_BY_DATE, CheckpointEntity.Queries.DELETE_BY_DATE,
+        JobInstanceEntity.Queries.DELETE_BY_DATE, JobExecutionEntity.Queries.DELETE_BY_DATE
+    };
 
     private EntityManagerProvider emProvider;
     private TransactionProvider txProvider;
@@ -82,8 +88,26 @@ public class JPAPersistenceManagerService implements PersistenceManagerService {
         try {
             final Object tx = txProvider.start(em);
             try {
-                for (final String query : DELETE_QUERIES) {
-                    em.createQuery(query).setParameter("instanceId", instanceId).executeUpdate();
+                for (final String query : DELETE_ID_QUERIES) {
+                    em.createNamedQuery(query).setParameter("instanceId", instanceId).executeUpdate();
+                }
+                txProvider.commit(tx);
+            } catch (final Exception e) {
+                throw new BatchContainerRuntimeException(performRollback(tx, e));
+            }
+        } finally {
+            emProvider.release(em);
+        }
+    }
+
+    @Override
+    public void cleanUp(final Date until) {
+        final EntityManager em = emProvider.newEntityManager();
+        try {
+            final Object tx = txProvider.start(em);
+            try {
+                for (final String query : DELETE_DATE_QUERIES) {
+                    em.createNamedQuery(query).setParameter("date", until, TemporalType.TIMESTAMP).executeUpdate();
                 }
                 txProvider.commit(tx);
             } catch (final Exception e) {
@@ -305,7 +329,11 @@ public class JPAPersistenceManagerService implements PersistenceManagerService {
     public long getJobInstanceIdByExecutionId(final long executionId) throws NoSuchJobExecutionException {
         final EntityManager em = emProvider.newEntityManager();
         try {
-            return em.find(JobExecutionEntity.class, executionId).getInstance().getJobInstanceId();
+            final JobExecutionEntity jobExecutionEntity = em.find(JobExecutionEntity.class, executionId);
+            if (jobExecutionEntity == null) {
+                throw new NoSuchJobExecutionException("Execution #" + executionId);
+            }
+            return jobExecutionEntity.getInstance().getJobInstanceId();
         } finally {
             emProvider.release(em);
         }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
index d1f2274..a2165f8 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
@@ -38,6 +38,7 @@ import javax.batch.runtime.Metric;
 import javax.batch.runtime.StepExecution;
 import java.io.Serializable;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -192,6 +193,10 @@ public class MemoryPersistenceManagerService implements PersistenceManagerServic
     @Override
     public List<StepExecution> getStepExecutionsForJobExecution(final long execid) {
         final Structures.ExecutionInstanceData executionInstanceData = data.executionInstanceData.get(execid);
+        if (executionInstanceData == null) {
+            return Collections.emptyList();
+        }
+
         synchronized (executionInstanceData.stepExecutions) {
             return executionInstanceData.stepExecutions;
         }
@@ -296,7 +301,11 @@ public class MemoryPersistenceManagerService implements PersistenceManagerServic
 
     @Override
     public long getJobInstanceIdByExecutionId(final long executionId) throws NoSuchJobExecutionException {
-        return data.executionInstanceData.get(executionId).execution.getInstanceId();
+        final Structures.ExecutionInstanceData executionInstanceData = data.executionInstanceData.get(executionId);
+        if (executionInstanceData == null) {
+            throw new NoSuchJobExecutionException("Execution #" + executionId);
+        }
+        return executionInstanceData.execution.getInstanceId();
     }
 
     @Override
@@ -408,6 +417,9 @@ public class MemoryPersistenceManagerService implements PersistenceManagerServic
         final StepExecutionImpl stepExecution = new StepExecutionImpl(rootJobExecId, data.stepExecutionIdGenerator.getAndIncrement());
         stepExecution.setStepName(stepName);
         final Structures.ExecutionInstanceData executionInstanceData = data.executionInstanceData.get(rootJobExecId);
+        if (executionInstanceData == null) {
+            return null;
+        }
         synchronized (executionInstanceData.stepExecutions) {
             executionInstanceData.stepExecutions.add(stepExecution);
         }
@@ -653,6 +665,27 @@ public class MemoryPersistenceManagerService implements PersistenceManagerServic
         }
     }
 
+    @Override
+    public void cleanUp(final Date until) {
+        final Collection<Long> instanceIdToRemove = new ArrayList<Long>();
+        for (final Map.Entry<Long, Structures.JobInstanceData> entry : data.jobInstanceData.entrySet()) {
+            boolean match = true;
+            for (final Structures.ExecutionInstanceData exec : entry.getValue().executions) {
+                if (exec.execution.getEndTime() == null || exec.execution.getEndTime().after(until)) {
+                    match = false;
+                    break;
+                }
+            }
+            if (match) {
+                instanceIdToRemove.add(entry.getKey());
+            }
+        }
+
+        for (final Long id : instanceIdToRemove) {
+            cleanUp(id);
+        }
+    }
+
     private static class ReverseDateComparator implements Comparator<Date> {
         public static final ReverseDateComparator INSTANCE = new ReverseDateComparator();
 

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jdbc/Dictionary.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jdbc/Dictionary.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jdbc/Dictionary.java
index 258fe23..33b7608 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jdbc/Dictionary.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jdbc/Dictionary.java
@@ -35,6 +35,7 @@ public class Dictionary {
         String SELECT_CHECKPOINT = SELECT + "%s" + FROM + "%s" + WHERE + "%s = ? and %s = ? and %s = ?";
         String UPDATE_CHECKPOINT = UPDATE + "%s set %s = ?" + WHERE + "%s = ? and %s = ? and %s = ?";
         String DELETE_CHECKPOINT = DELETE + "%s" + WHERE + "%s = ?";
+        String DELETE_CHECKPOINT_UNTIL = DELETE + "%s WHERE %s IN (" + SELECT + " DISTINCT t1.%s FROM %s t1 WHERE (SELECT MAX(t0.%s) FROM %s t0 WHERE t0.%s = t1.%s) < ?)";
 
         String[] JOB_INSTANCE_COLUMNS = { "jobInstanceId", "batchStatus", "exitStatus", "jobName", "jobXml", "latestExecution", "restartOn", "step", "tag" };
         String CREATE_JOB_INSTANCE = CREATE_TABLE + "%s(%s %s %s, %s %s, %s %s, %s %s, %s %s, %s %s, %s %s, %s %s, %s %s, PRIMARY KEY (%s))";
@@ -48,6 +49,8 @@ public class Dictionary {
         String JOB_INSTANCE_CREATE = INSERT_INTO + "%s" + "(%s, %s) VALUES(?, ?)";
         String JOB_INSTANCE_CREATE_WITH_JOB_XML = INSERT_INTO + "%s" + "(%s, %s, %s) VALUES(?, ?, ?)";
         String DELETE_JOB_INSTANCE = DELETE + "%s" + WHERE + "%s = ?";
+        String DELETE_JOB_INSTANCE_UNTIL = DELETE + "%s" + WHERE + "%s IN (" + SELECT + "distinct t1.%s" + FROM + "%s t1" + WHERE + "(" + SELECT + "max(t0.%s)" + FROM + "%s t0" +
+            WHERE + "t0.%s = t1.%s) < ?)";
 
         String[] JOB_EXECUTION_COLUMNS = { "executionId", "batchStatus", "createTime", "endTime", "exitStatus", "jobProperties",
                 "startTime", "updateTime", "INSTANCE_JOBINSTANCEID" };
@@ -71,6 +74,7 @@ public class Dictionary {
         String JOB_EXECUTION_CREATE = INSERT_INTO + "%s(%s, %s, %s, %s, %s) VALUES(?, ?, ?, ?, ?)";
         String JOB_EXECUTION_MOST_RECENT = SELECT + "%s" + FROM + "%s" + WHERE + "%s = ? ORDER BY %s DESC";
         String DELETE_JOB_EXECUTION = DELETE + "%s" + WHERE + "%s = ?";
+        String DELETE_JOB_EXECUTION_UNTIL = DELETE + "%s" + WHERE + "%s IN (" + SELECT + "distinct t0.%s" + FROM + "%s t0" + WHERE + "t0.%s < ?)";
 
         String[] STEP_EXECUTION_COLUMNS = { "id", "batchStatus", "exec_commit", "endTime", "exitStatus", "exec_filter", "lastRunStepExecutionId", "numPartitions",
                 "persistentData", "exec_processskip", "exec_read", "exec_readskip", "exec_rollback", "startCount", "startTime", "stepName",
@@ -90,6 +94,7 @@ public class Dictionary {
                 "B.%s, B.%s, B.%s, B.%s, B.%s, B.%s, B.%s, B.%s" + FROM + "%s A inner join %s B ON A.%s = B.%s " + WHERE + "A.%s = ? and B.%s = ?";
 
         String DELETE_STEP_EXECUTION = DELETE + "%s A inner join %s B ON A.%s = B.%s " + WHERE + "A.%s = ?";
+        String DELETE_STEP_EXECUTION_UNTIL = DELETE + "%s" + WHERE + "%s in (" + SELECT + "distinct t0.%s" + FROM + "%s t0 inner join %s t1 ON t0.%s=t1.%s" + WHERE + "t1.%s < ?)";
     }
 
     private final String checkpointTable;
@@ -98,6 +103,7 @@ public class Dictionary {
     private final String selectCheckpoint;
     private final String updateCheckpoint;
     private final String deleteCheckpoint;
+    private final String deleteCheckpointUntil;
     private final String jobInstanceTable;
     private final String createJobInstanceTable;
     private final String countJobInstanceByName;
@@ -110,6 +116,7 @@ public class Dictionary {
     private final String createJobInstance;
     private final String createJobInstanceWithJobXml;
     private final String deleteJobInstance;
+    private final String deleteJobInstanceUntil;
     private final String jobExecutionTable;
     private final String createJobExecutionTable;
     private final String findJobExecutionTimestamps;
@@ -127,6 +134,7 @@ public class Dictionary {
     private final String createJobExecution;
     private final String findMostRecentJobExecution;
     private final String deleteJobExecution;
+    private final String deleteJobExecutionUntil;
     private final String stepExecutionTable;
     private final String createStepExecutionTable;
     private final String finStepExecutionFromJobExecution;
@@ -136,6 +144,7 @@ public class Dictionary {
     private final String updateStepExecution;
     private final String findStepExecutionByJobInstanceAndStepName;
     private final String deleteStepExecution;
+    private final String deleteStepExecutionUntil;
 
     private final String[] checkpointColumns;
     private final String[] jobInstanceColumns;
@@ -149,9 +158,14 @@ public class Dictionary {
         this.jobExecutionTable = jobExecutionTable;
         this.stepExecutionTable = stepExecutionTable;
 
-        { // checkpoint
+        { // ensure to be able to build jointures prebuilding columns
             checkpointColumns = columns(database, SQL.CHECKPOINT_COLUMNS);
+            jobExecutionColumns = columns(database, SQL.JOB_EXECUTION_COLUMNS);
+            jobInstanceColumns = columns(database, SQL.JOB_INSTANCE_COLUMNS);
+            stepExecutionColumns = columns(database, SQL.STEP_EXECUTION_COLUMNS);
+        }
 
+        { // checkpoint
             this.createCheckpointTable = String.format(SQL.CREATE_CHECKPOINT, checkpointTable,
                                                         checkpointColumns[0], database.bigint(), database.autoIncrementId(),
                                                         checkpointColumns[1], database.blob(),
@@ -162,11 +176,11 @@ public class Dictionary {
             this.selectCheckpoint = String.format(SQL.SELECT_CHECKPOINT, checkpointColumns[1], checkpointTable, checkpointColumns[4], checkpointColumns[3], checkpointColumns[2]);
             this.updateCheckpoint = String.format(SQL.UPDATE_CHECKPOINT, checkpointTable, checkpointColumns[1], checkpointColumns[4], checkpointColumns[3], checkpointColumns[2]);
             this.deleteCheckpoint = String.format(SQL.DELETE_CHECKPOINT, checkpointTable, checkpointColumns[4]);
+            this.deleteCheckpointUntil = String.format(SQL.DELETE_CHECKPOINT_UNTIL, checkpointTable, checkpointColumns[0], checkpointColumns[0], checkpointTable,
+                jobExecutionColumns[3], jobExecutionTable, checkpointColumns[4], jobExecutionColumns[8]);
         }
 
         { // jobInstance
-            jobInstanceColumns = columns(database, SQL.JOB_INSTANCE_COLUMNS);
-
             this.createJobInstanceTable = String.format(SQL.CREATE_JOB_INSTANCE, jobInstanceTable,
                                                         jobInstanceColumns[0], database.bigint(), database.autoIncrementId(),
                                                         jobInstanceColumns[1], database.varchar20(),
@@ -191,11 +205,11 @@ public class Dictionary {
             this.createJobInstanceWithJobXml = String.format(SQL.JOB_INSTANCE_CREATE_WITH_JOB_XML, jobInstanceTable, jobInstanceColumns[3],
                     jobInstanceColumns[8], jobInstanceColumns[4]);
             this.deleteJobInstance = String.format(SQL.DELETE_JOB_INSTANCE, jobInstanceTable, jobInstanceColumns[0]);
+            this.deleteJobInstanceUntil = String.format(SQL.DELETE_JOB_INSTANCE_UNTIL, jobInstanceTable, jobInstanceColumns[0], jobInstanceColumns[0], jobInstanceTable,
+                jobExecutionColumns[3], jobExecutionTable, jobExecutionColumns[8], jobInstanceColumns[0]);
         }
 
         { // jobExecution
-            jobExecutionColumns = columns(database, SQL.JOB_EXECUTION_COLUMNS);
-
             this.createJobExecutionTable = String.format(SQL.CREATE_JOB_EXECUTION, jobExecutionTable,
                                                             jobExecutionColumns[0], database.bigint(), database.autoIncrementId(),
                                                             jobExecutionColumns[1], database.varchar20(),
@@ -232,11 +246,11 @@ public class Dictionary {
             this.findMostRecentJobExecution = String.format(SQL.JOB_EXECUTION_MOST_RECENT, jobExecutionColumns[0], jobExecutionTable,
                     jobExecutionColumns[8], jobExecutionColumns[2]);
             this.deleteJobExecution = String.format(SQL.DELETE_JOB_EXECUTION, jobExecutionTable, jobExecutionColumns[8]);
+            this.deleteJobExecutionUntil = String.format(SQL.DELETE_JOB_EXECUTION_UNTIL, jobExecutionTable, jobExecutionColumns[0], jobExecutionColumns[0],
+                jobExecutionTable, jobExecutionColumns[3]);
         }
 
         { // step execution
-            stepExecutionColumns = columns(database, SQL.STEP_EXECUTION_COLUMNS);
-
             this.createStepExecutionTable = String.format(SQL.CREATE_STEP_EXECUTION, stepExecutionTable,
                                                             stepExecutionColumns[0], database.bigint(), database.autoIncrementId(),
                                                             stepExecutionColumns[1], database.varchar20(),
@@ -278,6 +292,8 @@ public class Dictionary {
                 jobExecutionColumns[0], stepExecutionColumns[18], jobExecutionColumns[8], stepExecutionColumns[15]);
             this.deleteStepExecution = String.format(SQL.DELETE_STEP_EXECUTION, jobExecutionTable, stepExecutionTable, jobExecutionColumns[0],
                     stepExecutionColumns[18], jobExecutionColumns[8]);
+            this.deleteStepExecutionUntil = String.format(SQL.DELETE_STEP_EXECUTION_UNTIL, stepExecutionTable, stepExecutionColumns[0], stepExecutionColumns[0],
+                stepExecutionTable, jobExecutionTable, stepExecutionColumns[18], jobExecutionColumns[0], jobExecutionColumns[3]);
         }
     }
 
@@ -480,4 +496,20 @@ public class Dictionary {
     public String stepExecutionColumns(final int idx) {
         return stepExecutionColumns[idx];
     }
+
+    public String getDeleteCheckpointUntil() {
+        return deleteCheckpointUntil;
+    }
+
+    public String getDeleteJobInstanceUntil() {
+        return deleteJobInstanceUntil;
+    }
+
+    public String getDeleteJobExecutionUntil() {
+        return deleteJobExecutionUntil;
+    }
+
+    public String getDeleteStepExecutionUntil() {
+        return deleteStepExecutionUntil;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/CheckpointEntity.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/CheckpointEntity.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/CheckpointEntity.java
index ccac415..772a103 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/CheckpointEntity.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/CheckpointEntity.java
@@ -33,13 +33,17 @@ import javax.persistence.Table;
 @NamedQueries({
     @NamedQuery(name = CheckpointEntity.Queries.FIND,
                 query = "select c from CheckpointEntity c where c.instance.jobInstanceId = :jobInstanceId and c.stepName = :stepName and c.type = :type"),
-    @NamedQuery(name = CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from CheckpointEntity e where e.instance.jobInstanceId = :id")
+    @NamedQuery(name = CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from CheckpointEntity e where e.instance.jobInstanceId = :id"),
+    @NamedQuery(
+        name = CheckpointEntity.Queries.DELETE_BY_DATE,
+        query = "delete from CheckpointEntity e where (select max(x.endTime) from JobExecutionEntity x where x.instance.jobInstanceId = e.instance.jobInstanceId) < :date")
 })
 @Table(name=CheckpointEntity.TABLE_NAME)
 public class CheckpointEntity {
     public static interface Queries {
         String FIND = "org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity.find";
         String DELETE_BY_INSTANCE_ID = "org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity.deleteByInstanceId";
+        String DELETE_BY_DATE = "org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity.deleteBydate";
     }
 
     public static final String TABLE_NAME = "BATCH_CHECKPOINT";

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobExecutionEntity.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobExecutionEntity.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobExecutionEntity.java
index b46918d..66051fa 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobExecutionEntity.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobExecutionEntity.java
@@ -40,6 +40,7 @@ import java.util.Properties;
                 query =  "SELECT e FROM JobExecutionEntity e WHERE e.instance.jobInstanceId = :instanceId ORDER BY e.createTime DESC"),
     @NamedQuery(name = JobExecutionEntity.Queries.FIND_BY_INSTANCE, query =  "SELECT e FROM JobExecutionEntity e WHERE e.instance.jobInstanceId = :instanceId"),
     @NamedQuery(name = JobExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, query =  "delete from JobExecutionEntity e where e.instance.jobInstanceId = :instanceId"),
+    @NamedQuery(name = JobExecutionEntity.Queries.DELETE_BY_DATE, query =  "delete from JobExecutionEntity e where e.endTime < :date"),
     @NamedQuery(name = JobExecutionEntity.Queries.FIND_RUNNING, query =  "SELECT e FROM JobExecutionEntity e WHERE e.batchStatus in :statuses and e.instance.name = :name")
 })
 @Table(name=JobExecutionEntity.TABLE_NAME)
@@ -49,6 +50,7 @@ public class JobExecutionEntity {
         String FIND_BY_INSTANCE = "org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity.findByInstance";
         String FIND_RUNNING = "org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity.findRunning";
         String DELETE_BY_INSTANCE_ID = "org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity.deleteByInstanceId";
+        String DELETE_BY_DATE = "org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity.deleteByDate";
 
         List<BatchStatus> RUNNING_STATUSES = Arrays.asList(BatchStatus.STARTED, BatchStatus.STARTING, BatchStatus.STOPPING);
     }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobInstanceEntity.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobInstanceEntity.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobInstanceEntity.java
index 94ebe81..f8261c4 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobInstanceEntity.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/JobInstanceEntity.java
@@ -41,7 +41,10 @@ import java.util.List;
     @NamedQuery(name = JobInstanceEntity.Queries.FIND_EXTERNALS, query = "select j from JobInstanceEntity j where j.name not like :pattern"),
     @NamedQuery(name = JobInstanceEntity.Queries.FIND_BY_NAME_AND_TAG, query = "select j from JobInstanceEntity j where j.name = :name and j.tag = tag"),
     @NamedQuery(name = JobInstanceEntity.Queries.FIND_BY_NAME, query = "select j from JobInstanceEntity j where j.name = :name"),
-    @NamedQuery(name = JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from JobInstanceEntity e where e.jobInstanceId = :instanceId")
+    @NamedQuery(name = JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from JobInstanceEntity e where e.jobInstanceId = :instanceId"),
+    @NamedQuery(
+        name = JobInstanceEntity.Queries.DELETE_BY_DATE,
+        query = "delete from JobInstanceEntity e where (select max(x.endTime) from JobExecutionEntity x where x.instance.jobInstanceId = e.jobInstanceId) < :date")
 })
 @Table(name=JobInstanceEntity.TABLE_NAME)
 public class JobInstanceEntity {
@@ -53,6 +56,7 @@ public class JobInstanceEntity {
         String FIND_EXTERNALS = "org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity.findExternals";
         String FIND_FROM_EXECUTION = "org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity.findByExecution";
         String DELETE_BY_INSTANCE_ID = "org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity.deleteFromInstanceId";
+        String DELETE_BY_DATE = "org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity.deleteByDate";
     }
 
     public static final String TABLE_NAME = "BATCH_JOBINSTANCE";

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/StepExecutionEntity.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/StepExecutionEntity.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/StepExecutionEntity.java
index 0c85674..f2706cf 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/StepExecutionEntity.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/jpa/domain/StepExecutionEntity.java
@@ -36,6 +36,7 @@ import java.sql.Timestamp;
 @NamedQueries({
     @NamedQuery(name = StepExecutionEntity.Queries.FIND_BY_EXECUTION, query = "select s from StepExecutionEntity s where s.execution.executionId = :executionId"),
     @NamedQuery(name = StepExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, query = "delete from StepExecutionEntity e where e.execution.instance.jobInstanceId = :instanceId"),
+    @NamedQuery(name = StepExecutionEntity.Queries.DELETE_BY_DATE, query = "delete from StepExecutionEntity e where e.execution.endTime < :date"),
     @NamedQuery(name = StepExecutionEntity.Queries.FIND_BY_INSTANCE_AND_NAME,
                 query = "select se FROM StepExecutionEntity se where se.execution.instance.jobInstanceId = :instanceId and se.stepName = :step")
 })
@@ -45,6 +46,7 @@ public class StepExecutionEntity {
         String FIND_BY_EXECUTION = "org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity.findByExecution";
         String FIND_BY_INSTANCE_AND_NAME = "org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity.findByInstanceAndName";
         String DELETE_BY_INSTANCE_ID = "org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity.deleteByInstanceId";
+        String DELETE_BY_DATE = "org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity.deleteByDate";
     }
 
     public static final String TABLE_NAME = "BATCH_STEPEXECUTION";

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/main/java/org/apache/batchee/spi/PersistenceManagerService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/spi/PersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/spi/PersistenceManagerService.java
index 243474e..a50e10a 100755
--- a/jbatch/src/main/java/org/apache/batchee/spi/PersistenceManagerService.java
+++ b/jbatch/src/main/java/org/apache/batchee/spi/PersistenceManagerService.java
@@ -18,10 +18,10 @@ package org.apache.batchee.spi;
 
 import org.apache.batchee.container.impl.StepContextImpl;
 import org.apache.batchee.container.impl.StepExecutionImpl;
-import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
-import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
 import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
+import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.services.InternalJobExecution;
 import org.apache.batchee.container.status.JobStatus;
 import org.apache.batchee.container.status.StepStatus;
@@ -31,6 +31,7 @@ import javax.batch.runtime.BatchStatus;
 import javax.batch.runtime.JobInstance;
 import javax.batch.runtime.StepExecution;
 import java.sql.Timestamp;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -193,4 +194,5 @@ public interface PersistenceManagerService extends BatchService {
     StepExecution getStepExecutionByStepExecutionId(long stepExecId);
 
     void cleanUp(final long instanceId);
+    void cleanUp(final Date until);
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/jbatch/src/test/java/org/apache/batchee/spi/PersistenceManagerServiceTest.java
----------------------------------------------------------------------
diff --git a/jbatch/src/test/java/org/apache/batchee/spi/PersistenceManagerServiceTest.java b/jbatch/src/test/java/org/apache/batchee/spi/PersistenceManagerServiceTest.java
new file mode 100644
index 0000000..a281fc7
--- /dev/null
+++ b/jbatch/src/test/java/org/apache/batchee/spi/PersistenceManagerServiceTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.spi;
+
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.StepExecutionImpl;
+import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
+import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
+import org.apache.batchee.container.impl.controller.chunk.CheckpointType;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.container.services.persistence.JDBCPersistenceManagerService;
+import org.apache.batchee.container.services.persistence.JPAPersistenceManagerService;
+import org.apache.batchee.container.services.persistence.MemoryPersistenceManagerService;
+import org.apache.batchee.container.status.JobStatus;
+import org.junit.Test;
+
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class PersistenceManagerServiceTest {
+    @Test
+    public void cleanUpUntil() {
+        for (final PersistenceManagerService service : asList(
+            new JDBCPersistenceManagerService() {{
+                init(new Properties());
+            }},
+            new JPAPersistenceManagerService() {{
+                init(new Properties());
+            }},
+            new MemoryPersistenceManagerService() {{
+                init(new Properties());
+            }})) {
+            System.out.println("");
+            System.out.println(" " + service);
+            System.out.println("");
+
+            for (int i = 0; i < 3; i++) {
+                final JobInstance instance = service.createJobInstance("test", "app", "xml");
+                final RuntimeJobExecution exec = service.createJobExecution(instance, new Properties(), BatchStatus.COMPLETED);
+                final StepExecutionImpl step = service.createStepExecution(exec.getExecutionId(), new StepContextImpl("step"));
+                service.createStepStatus(step.getStepExecutionId()).setBatchStatus(BatchStatus.STARTED);
+                service.setCheckpointData(
+                    new CheckpointDataKey(instance.getInstanceId(), "step", CheckpointType.READER),
+                    new CheckpointData(instance.getInstanceId(), "step", CheckpointType.READER) {{
+                        setRestartToken("restart".getBytes());
+                    }});
+                service.createJobStatus(instance.getInstanceId());
+                service.updateJobStatus(instance.getInstanceId(), new JobStatus(instance) {{
+                    setBatchStatus(BatchStatus.COMPLETED);
+                }});
+                service.updateWithFinalExecutionStatusesAndTimestamps(exec.getExecutionId(), BatchStatus.COMPLETED, "ok", new Timestamp(System.currentTimeMillis()));
+
+                // sanity checks we persisted data before deleting them
+                assertNotNull(service.getJobStatus(instance.getInstanceId()));
+                assertNotNull(service.getJobInstanceIdByExecutionId(exec.getExecutionId()));
+                assertNotNull(service.jobOperatorGetJobExecution(exec.getExecutionId()));
+                assertNotNull(service.getStepExecutionByStepExecutionId(step.getStepExecutionId()));
+                assertNotNull(service.getCheckpointData(new CheckpointDataKey(instance.getInstanceId(), "step", CheckpointType.READER)));
+
+                if (i != 2) { // skip last since we are not there to have a break
+                    try { // add some delay
+                        Thread.sleep(1000);
+                    } catch (final InterruptedException e) {
+                        Thread.interrupted();
+                        fail();
+                    }
+                }
+            }
+
+            // now delete until the first one (+ delta)
+            final List<Long> instances = service.jobOperatorGetJobInstanceIds("test", 0, 10);
+            assertEquals(3, instances.size());
+
+            Collections.sort(instances); // we use increments everywhere so should be fine
+
+            final InternalJobExecution firstExec = service.jobOperatorGetJobExecution(service.getMostRecentExecutionId(instances.iterator().next()));
+            final Date until = new Date(firstExec.getEndTime().getTime() + 100);
+            service.cleanUp(until);
+
+            assertEquals(2, service.jobOperatorGetJobInstanceIds("test", 0, 10).size());
+            assertTrue(service.getStepExecutionsForJobExecution(firstExec.getExecutionId()).isEmpty());
+            assertNull(service.getCheckpointData(new CheckpointDataKey(firstExec.getInstanceId(), "step", CheckpointType.READER)));
+            try {
+                service.getJobInstanceIdByExecutionId(firstExec.getExecutionId());
+                fail();
+            } catch (final NoSuchJobExecutionException nsje) {
+                // ok
+            }
+            assertFalse(service.jobOperatorGetJobInstanceIds("test", 0, 10).contains(firstExec.getInstanceId()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/02c32240/tools/cli/src/main/java/org/apache/batchee/cli/command/Eviction.java
----------------------------------------------------------------------
diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/command/Eviction.java b/tools/cli/src/main/java/org/apache/batchee/cli/command/Eviction.java
new file mode 100644
index 0000000..6dc9602
--- /dev/null
+++ b/tools/cli/src/main/java/org/apache/batchee/cli/command/Eviction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cli.command;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.spi.PersistenceManagerService;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+@Command(name = "evict", description = "remove old data, uses embedded configuration (no JAXRS support yet)")
+public class Eviction implements Runnable {
+    @Option(name = "-until", description = "date until when the eviction will occur (excluded), YYYYMMdd format", required = true)
+    private String date;
+
+    @Override
+    public void run() {
+        try {
+            final Date date = new SimpleDateFormat("YYYYMMdd").parse(this.date);
+            ServicesManager.find().service(PersistenceManagerService.class).cleanUp(date);
+        } catch (final ParseException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+}