You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2019/01/10 15:37:30 UTC

[oozie] branch master updated: OOZIE-3407 [tests] Cleanup TestPurgeXCommand (asalamon74 via andras.piros)

This is an automated email from the ASF dual-hosted git repository.

andras pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a6799f  OOZIE-3407 [tests] Cleanup TestPurgeXCommand (asalamon74 via andras.piros)
0a6799f is described below

commit 0a6799fe7117ea9df5abca9318c2833c941db7c8
Author: Andras Piros <an...@cloudera.com>
AuthorDate: Thu Jan 10 16:36:30 2019 +0100

    OOZIE-3407 [tests] Cleanup TestPurgeXCommand (asalamon74 via andras.piros)
---
 .../apache/oozie/command/TestPurgeXCommand.java    | 4285 ++++----------------
 release-log.txt                                    |    1 +
 2 files changed, 768 insertions(+), 3518 deletions(-)

diff --git a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
index 107547d..0f0b859 100644
--- a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
@@ -18,8 +18,6 @@
 
 package org.apache.oozie.command;
 
-import java.util.Date;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.BundleActionBean;
@@ -35,6 +33,7 @@ import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
@@ -43,7 +42,6 @@ import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
-import org.apache.oozie.executor.jpa.JPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.QueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
@@ -55,9 +53,9 @@ import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
+import org.apache.oozie.service.UUIDService.ApplicationType;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.service.WorkflowStoreService;
-import org.apache.oozie.service.UUIDService.ApplicationType;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.XmlUtils;
@@ -69,12 +67,16 @@ import org.apache.oozie.workflow.lite.LiteWorkflowApp;
 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
 import org.apache.oozie.workflow.lite.StartNodeDef;
 
+import java.util.Date;
+
 public class TestPurgeXCommand extends XDataTestCase {
-    private Services services;
     private JPAService jpaService;
+    private Services services;
     private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
-            "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
-            "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+                        "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+                        "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+    private static final int TEST_CHILD_NUM = 5;
+    private static final int LIMIT_3_ITEMS = 3;
 
     @Override
     protected void setUp() throws Exception {
@@ -98,589 +100,243 @@ public class TestPurgeXCommand extends XDataTestCase {
      * same job to make it qualify for the purge criteria. Calls the purge
      * command, and ensure the job does not exist in the system.
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testSucJobPurgeXCommand() throws Exception {
-        WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.OK);
-
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
-
-        job = jpaService.execute(wfJobGetCmd);
-        action = jpaService.execute(wfActionGetCmd);
-        assertEquals(job.getStatus(), WorkflowJob.Status.SUCCEEDED);
-        assertEquals(action.getStatus(), WorkflowAction.Status.OK);
-        WorkflowInstance wfInstance = job.getWorkflowInstance();
-        assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.SUCCEEDED);
-
-        new PurgeXCommand(7, 1, 1, 10).call();
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+    public void testSucceededWorkflow() throws Exception {
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.OK);
 
-        try {
-            jpaService.execute(wfActionGetCmd);
-            fail("Workflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
+        assertWorkflowJobPurged(job);
+        assertWorkflowActionPurged(action);
     }
 
     /**
      * Test : purge failed wf job and action successfully
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testFailJobPurgeXCommand() throws Exception {
-        WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
-        WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.FAILED);
-
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
-
-        job = jpaService.execute(wfJobGetCmd);
-        action = jpaService.execute(wfActionGetCmd);
-        assertEquals(job.getStatus(), WorkflowJob.Status.FAILED);
-        assertEquals(action.getStatus(), WorkflowAction.Status.FAILED);
-        WorkflowInstance wfInstance = job.getWorkflowInstance();
-        assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.FAILED);
-
-        new PurgeXCommand(7, 1, 1, 10).call();
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+    public void testFailedWorkflow() throws Exception {
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
+        WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.FAILED);
 
-        try {
-            jpaService.execute(wfActionGetCmd);
-            fail("Workflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
+        assertWorkflowJobPurged(job);
+        assertWorkflowActionPurged(action);
     }
 
     /**
     * Test : purge killed wf job and action successfully
     *
-    * @throws Exception
+    * @throws Exception if cannot insert records to the database
     */
-    public void testKillJobPurgeXCommand() throws Exception {
-        WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
-        WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.KILLED);
-
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
-
-        job = jpaService.execute(wfJobGetCmd);
-        action = jpaService.execute(wfActionGetCmd);
-        assertEquals(job.getStatus(), WorkflowJob.Status.KILLED);
-        assertEquals(action.getStatus(), WorkflowAction.Status.KILLED);
-        WorkflowInstance wfInstance = job.getWorkflowInstance();
-        assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.KILLED);
-
-        new PurgeXCommand(7, 1, 1, 10).call();
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+    public void testKilledWorkflow() throws Exception {
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
+        WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.KILLED);
 
-        try {
-            jpaService.execute(wfActionGetCmd);
-            fail("Workflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
+        assertWorkflowJobPurged(job);
+        assertWorkflowActionPurged(action);
     }
 
     /**
-     * Test : purge wf job and action failed
+     * Test : purge wf job and action still running
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeXCommandFailed() throws Exception {
-        WorkflowJobBean job = this.addRecordToWfJobTableForNegCase(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
-        WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.RUNNING);
-
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
-
-        job = jpaService.execute(wfJobGetCmd);
-        action = jpaService.execute(wfActionGetCmd);
-        assertEquals(job.getStatus(), WorkflowJob.Status.RUNNING);
-        assertEquals(action.getStatus(), WorkflowAction.Status.RUNNING);
-        WorkflowInstance wfInstance = job.getWorkflowInstance();
-        assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.RUNNING);
+    public void testRunningWorkflow() throws Exception {
+        WorkflowJobBean job = addRecordToWfJobTableForNegCase(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.RUNNING);
 
-        new PurgeXCommand(7, 1, 1, 10).call();
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException ce) {
-            ce.printStackTrace();
-            fail("Workflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException ce) {
-            fail("Workflow Action should have been purged");
-        }
+        purgeWithDefaultParameters();
 
+        assertWorkflowJobNotPurged(job);
+        assertWorkflowActionNotPurged(action);
     }
 
     /**
      * Test : purge succeeded coord job and action successfully
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testSucCoordPurgeXCommand() throws Exception {
+    public void testSucceededCoordinator() throws Exception {
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-get.xml", 0);
 
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
-        CoordActionGetJPAExecutor coordActionGetExecutor = new CoordActionGetJPAExecutor(action.getId());
-
-        job = jpaService.execute(coordJobGetExecutor);
-        action = jpaService.execute(coordActionGetExecutor);
-        assertEquals(job.getStatus(), CoordinatorJob.Status.SUCCEEDED);
-        assertEquals(action.getStatus(), CoordinatorAction.Status.SUCCEEDED);
-
-        new PurgeXCommand(1, 7, 1, 10).call();
-
-        try {
-            jpaService.execute(coordJobGetExecutor);
-            fail("Coordinator Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordActionGetExecutor);
-            fail("Coordinator Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
+        assertCoordinatorJobPurged(job);
+        assertCoordinatorActionPurged(action);
     }
 
     /**
      * Test : purge failed coord job and action successfully
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testFailCoordPurgeXCommand() throws Exception {
+    public void testFailedCoordinator() throws Exception {
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.FAILED, false, false);
         CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.FAILED,
                 "coord-action-get.xml", 0);
 
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
-        CoordActionGetJPAExecutor coordActionGetExecutor = new CoordActionGetJPAExecutor(action.getId());
-
-        job = jpaService.execute(coordJobGetExecutor);
-        action = jpaService.execute(coordActionGetExecutor);
-        assertEquals(job.getStatus(), CoordinatorJob.Status.FAILED);
-        assertEquals(action.getStatus(), CoordinatorAction.Status.FAILED);
-
-        new PurgeXCommand(1, 7, 1, 10).call();
-
-        try {
-            jpaService.execute(coordJobGetExecutor);
-            fail("Coordinator Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordActionGetExecutor);
-            fail("Coordinator Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
+        assertCoordinatorJobPurged(job);
+        assertCoordinatorActionPurged(action);
     }
 
     /**
      * Test : purge killed coord job and action successfully
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testKillCoordPurgeXCommand() throws Exception {
+    public void testKilledCoordinator() throws Exception {
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false);
         CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.KILLED,
                 "coord-action-get.xml", 0);
 
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
-        CoordActionGetJPAExecutor coordActionGetExecutor = new CoordActionGetJPAExecutor(action.getId());
-
-        job = jpaService.execute(coordJobGetExecutor);
-        action = jpaService.execute(coordActionGetExecutor);
-        assertEquals(job.getStatus(), CoordinatorJob.Status.KILLED);
-        assertEquals(action.getStatus(), CoordinatorAction.Status.KILLED);
-
-        new PurgeXCommand(1, 7, 1, 10).call();
-
-        try {
-            jpaService.execute(coordJobGetExecutor);
-            fail("Coordinator Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordActionGetExecutor);
-            fail("Coordinator Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
+        assertCoordinatorJobPurged(job);
+        assertCoordinatorActionPurged(action);
     }
 
     /**
-     * Test : purge coord job and action failed
+     * Test : purge coord job and action still running
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testCoordPurgeXCommandFailed() throws Exception {
+    public void testRunningCoordinator() throws Exception {
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
-        CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+        CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING,
                 "coord-action-get.xml", 0);
 
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
-        CoordActionGetJPAExecutor coordActionGetExecutor = new CoordActionGetJPAExecutor(action.getId());
-
-        job = jpaService.execute(coordJobGetExecutor);
-        action = jpaService.execute(coordActionGetExecutor);
-        assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNING);
-        assertEquals(action.getStatus(), CoordinatorAction.Status.SUCCEEDED);
-
-        new PurgeXCommand(1, 7, 1, 10).call();
-
-        try {
-            jpaService.execute(coordJobGetExecutor);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
+        purgeWithDefaultParameters();
 
-        try {
-            jpaService.execute(coordActionGetExecutor);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action should not have been purged");
-        }
+        assertCoordinatorJobNotPurged(job);
+        assertCoordinatorActionNotPurged(action);
     }
 
     /**
      * Test : purge succeeded bundle job and action successfully
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testSucBundlePurgeXCommand() throws Exception {
-        BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ(
+    public void testSucceededBundle() throws Exception {
+        BundleJobBean job = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ(
             "2011-01-01T01:00Z"));
-        this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.SUCCEEDED);
-        this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUCCEEDED);
-
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
-        job = jpaService.execute(bundleJobGetExecutor);
-        assertEquals(Job.Status.SUCCEEDED, job.getStatus());
-
-        BundleActionGetJPAExecutor bundleActionGetExecutor1 = new BundleActionGetJPAExecutor(job.getId(), "action1");
-        BundleActionBean action1 = jpaService.execute(bundleActionGetExecutor1);
-        assertEquals(Job.Status.SUCCEEDED, action1.getStatus());
-
-        BundleActionGetJPAExecutor bundleActionGetExecutor2 = new BundleActionGetJPAExecutor(job.getId(), "action2");
-        BundleActionBean action2 = jpaService.execute(bundleActionGetExecutor2);
-        assertEquals(Job.Status.SUCCEEDED, action2.getStatus());
-
-        new PurgeXCommand(1, 1, 7, 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetExecutor);
-            fail("Bundle Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(bundleActionGetExecutor1);
-            fail("Bundle Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob1.setAppName("action1");
+        CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob2.setAppName("action2");
+        BundleActionBean bundleAction1 = addRecordToBundleActionTable(job.getId(), coordJob1.getAppName(), 0,
+                Job.Status.SUCCEEDED);
+        BundleActionBean bundleAction2 = addRecordToBundleActionTable(job.getId(), coordJob2.getAppName(), 0,
+                Job.Status.SUCCEEDED);
 
-        try {
-            jpaService.execute(bundleActionGetExecutor2);
-            fail("Bundle Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
+        assertBundleJobPurged(job);
+        assertBundleActionPurged(bundleAction1);
+        assertBundleActionPurged(bundleAction2);
+        assertCoordinatorJobPurged(coordJob1);
+        assertCoordinatorJobPurged(coordJob2);
     }
 
     /**
      * Test : purge failed bundle job and action successfully
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testFailBundlePurgeXCommand() throws Exception {
-        BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.DONEWITHERROR, DateUtils.parseDateOozieTZ(
+    public void testFailedBundle() throws Exception {
+        BundleJobBean job = addRecordToBundleJobTable(Job.Status.DONEWITHERROR, DateUtils.parseDateOozieTZ(
             "2011-01-01T01:00Z"));
-        this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.FAILED);
-        this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUCCEEDED);
-
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
-        job = jpaService.execute(bundleJobGetExecutor);
-        assertEquals(Job.Status.DONEWITHERROR, job.getStatus());
-
-        BundleActionGetJPAExecutor bundleActionGetExecutor1 = new BundleActionGetJPAExecutor(job.getId(), "action1");
-        BundleActionBean action1 = jpaService.execute(bundleActionGetExecutor1);
-        assertEquals(Job.Status.FAILED, action1.getStatus());
-
-        BundleActionGetJPAExecutor bundleActionGetExecutor2 = new BundleActionGetJPAExecutor(job.getId(), "action2");
-        BundleActionBean action2 = jpaService.execute(bundleActionGetExecutor2);
-        assertEquals(Job.Status.SUCCEEDED, action2.getStatus());
-
-        new PurgeXCommand(1, 1, 7, 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetExecutor);
-            fail("Bundle Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+        CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.DONEWITHERROR, false, false);
+        coordJob1.setAppName("action1");
+        CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob2.setAppName("action2");
 
-        try {
-            jpaService.execute(bundleActionGetExecutor1);
-            fail("Bundle Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        BundleActionBean bundleAction1 = addRecordToBundleActionTable(job.getId(), coordJob1.getAppName(), 0, Job.Status.FAILED);
+        BundleActionBean bundleAction2 = addRecordToBundleActionTable(job.getId(), coordJob2.getAppName(), 0,
+                Job.Status.SUCCEEDED);
 
-        try {
-            jpaService.execute(bundleActionGetExecutor2);
-            fail("Bundle Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
+        assertBundleJobPurged(job);
+        assertBundleActionPurged(bundleAction1);
+        assertBundleActionPurged(bundleAction2);
+        assertCoordinatorJobPurged(coordJob1);
+        assertCoordinatorJobPurged(coordJob2);
     }
 
     /**
      * Test : purge killed bundle job and action successfully
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testKillBundlePurgeXCommand() throws Exception {
-        BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.KILLED, DateUtils.parseDateOozieTZ(
+    public void testKilledBundle() throws Exception {
+        BundleJobBean job = addRecordToBundleJobTable(Job.Status.KILLED, DateUtils.parseDateOozieTZ(
             "2011-01-01T01:00Z"));
-        this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.KILLED);
-        this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.KILLED);
-
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
-        job = jpaService.execute(bundleJobGetExecutor);
-        assertEquals(Job.Status.KILLED, job.getStatus());
-
-        BundleActionGetJPAExecutor bundleActionGetExecutor1 = new BundleActionGetJPAExecutor(job.getId(), "action1");
-        BundleActionBean action1 = jpaService.execute(bundleActionGetExecutor1);
-        assertEquals(Job.Status.KILLED, action1.getStatus());
-
-        BundleActionGetJPAExecutor bundleActionGetExecutor2 = new BundleActionGetJPAExecutor(job.getId(), "action2");
-        BundleActionBean action2 = jpaService.execute(bundleActionGetExecutor2);
-        assertEquals(Job.Status.KILLED, action2.getStatus());
-
-        new PurgeXCommand(1, 1, 7, 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetExecutor);
-            fail("Bundle Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(bundleActionGetExecutor1);
-            fail("Bundle Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(bundleActionGetExecutor2);
-            fail("Bundle Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
+        CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false);
+        coordJob1.setAppName("action1");
+        CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false);
+        coordJob2.setAppName("action2");
+        BundleActionBean bundleAction1 = addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.KILLED);
+        BundleActionBean bundleAction2 = addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.KILLED);
+
+        purgeWithDefaultParameters();
+
+        assertBundleJobPurged(job);
+        assertBundleActionPurged(bundleAction1);
+        assertBundleActionPurged(bundleAction2);
+        assertCoordinatorJobPurged(coordJob1);
+        assertCoordinatorJobPurged(coordJob2);
     }
 
     /**
-     * Test : purge bundle job and action failed
+     * Test : purge bundle job and action still running
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testBundlePurgeXCommandFailed() throws Exception {
-        BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.RUNNING, DateUtils.parseDateOozieTZ(
+    public void testRunningBundle() throws Exception {
+        BundleJobBean job = addRecordToBundleJobTable(Job.Status.RUNNING, DateUtils.parseDateOozieTZ(
             "2011-01-01T01:00Z"));
-        this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.RUNNING);
-        this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUCCEEDED);
-
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
-        job = jpaService.execute(bundleJobGetExecutor);
-        assertEquals(Job.Status.RUNNING, job.getStatus());
-
-        BundleActionGetJPAExecutor bundleActionGetExecutor1 = new BundleActionGetJPAExecutor(job.getId(), "action1");
-        BundleActionBean action1 = jpaService.execute(bundleActionGetExecutor1);
-        assertEquals(Job.Status.RUNNING, action1.getStatus());
-
-        BundleActionGetJPAExecutor bundleActionGetExecutor2 = new BundleActionGetJPAExecutor(job.getId(), "action2");
-        BundleActionBean action2 = jpaService.execute(bundleActionGetExecutor2);
-        assertEquals(Job.Status.SUCCEEDED, action2.getStatus());
-
-        new PurgeXCommand(1, 1, 7, 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetExecutor);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Job should not have been purged");
-        }
+        CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        coordJob1.setAppName("action1");
+        CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob2.setAppName("action2");
+        BundleActionBean bundleActionBean1 = addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.RUNNING);
+        BundleActionBean bundleActionBean2 = addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUCCEEDED);
 
-        try {
-            jpaService.execute(bundleActionGetExecutor1);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action should not have been purged");
-        }
+        purgeWithDefaultParameters();
 
-        try {
-            jpaService.execute(bundleActionGetExecutor2);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action should not have been purged");
-        }
+        assertBundleJobNotPurged(job);
+        assertBundleActionNotPurged(bundleActionBean1);
+        assertBundleActionNotPurged(bundleActionBean2);
     }
 
     /**
      * Test : The workflow should get purged, but the coordinator parent shouldn't get purged --> neither will get purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeCoordWithWFChild1() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testUnpurgeableCoordinatorPurgeableWorkflow() throws Exception {
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
         CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
 
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-
-        new PurgeXCommand(7, getNumDaysToNotBePurged(coordJob.getLastModifiedTime()), 1, 10).call();
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
+        purgeWithSpecialParameters(coordJob);
 
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionNotPurged(coordAction);
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
     }
 
     /**
@@ -688,2177 +344,393 @@ public class TestPurgeXCommand extends XDataTestCase {
      * the workflow and corresponding coord actions will get purged after we turn the purge.old.coord.action on
      * Coordinator itself will not be purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeLongRunningCoordWithWFChild() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testUnpurgeableCoordinatorPurgeableWorkflowWithPurgeOldCoordActionTurnedOn() throws Exception {
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
         CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
 
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
+        purgeWithSpecialParametersAndAlsoPurgeCoordActions(coordJob);
 
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionPurged(coordAction);
+        assertWorkflowJobPurged(wfJob);
+        assertWorkflowActionPurged(wfAction);
+    }
 
-        new PurgeXCommand(7, getNumDaysToNotBePurged(coordJob.getLastModifiedTime()), 1, 10, true).call();
+    /**
+     * Test : The workflow should get purged, but the coordinator parent shouldn't get purged -->
+     * the workflow and corresponding coord actions will NOT get purged after we turn the purge.old.coord.action off
+     * Neither will be purged
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testUnpurgeableCoordinatorPurgeableWorkflowWithPurgeOldCoordActionTurnedOff() throws Exception {
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
 
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-            fail("Coordinator Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        purgeWithSpecialParameters(coordJob);
 
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-            fail("Workflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionNotPurged(coordAction);
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
     }
 
     /**
-     * Test : The workflow should get purged, but the coordinator parent shouldn't get purged -->
-     * the workflow and corresponding coord actions will NOT get purged after we turn the purge.old.coord.action off
-     * Neither will be purged
+     * Test : The workflow should not get purged, but the coordinator parent should get purged --> neither will get purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeLongRunningCoordWithWFChildNegative() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+    public void testPurgeableCoordinatorUnpurgeableWorkflow() throws Exception {
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
         CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
 
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-
-        new PurgeXCommand(7, getNumDaysToNotBePurged(coordJob.getLastModifiedTime()), 1, 10, false).call();
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
+        purgeWithSpecialParameters(wfJob);
 
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionNotPurged(coordAction);
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
     }
 
     /**
-     * Test : The workflow should not get purged, but the coordinator parent should get purged --> neither will get purged
+     * Test : The workflows should not get purged, but the coordinator parent should get purged --> none will get purged
+     * There are more workflow children than the limit
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeCoordWithWFChild2() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
+    public void testPurgeableCoordinatorOverTheLimitUnpurgeableWorkflows() throws Exception {
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        WorkflowJobBean[] wfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
+        WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
+        CoordinatorActionBean[] coordActions = new CoordinatorActionBean[TEST_CHILD_NUM];
+        for (int i=0; i<TEST_CHILD_NUM; ++i) {
+            wfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+            wfActions[i] = addRecordToWfActionTable(wfJobs[i].getId(), "1", WorkflowAction.Status.OK);
+            coordActions[i] = addRecordToCoordActionTable(coordJob.getId(), i, CoordinatorAction.Status.SUCCEEDED,
+                    "coord-action-get.xml", wfJobs[i].getId(), "SUCCEEDED", 0);
+        }
+
+        purgeWithSpecialParameters(LIMIT_3_ITEMS, wfJobs[0]);
+
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionsNotPurged(coordActions);
+        assertWorkflowJobsNotPurged(wfJobs);
+        assertWorkflowActionsNotPurged(wfActions);
+    }
 
+    /**
+     * Test : The workflow should get purged, and the coordinator parent should get purged --> both will get purged
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testPurgeableCoordinatorPurgeableWorkflow() throws Exception {
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
         CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
 
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-
-        new PurgeXCommand(getNumDaysToNotBePurged(wfJob.getEndTime()), 7, 1, 10).call();
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
+        purgeWithDefaultParameters();
 
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
+        assertCoordinatorJobPurged(coordJob);
+        assertCoordinatorActionPurged(coordAction);
+        assertWorkflowJobPurged(wfJob);
+        assertWorkflowActionPurged(wfAction);
     }
 
     /**
-     * Test : The workflows should not get purged, but the coordinator parent should get purged --> none will get purged
-     * There are more workflow children than the limit
+     * Test : The workflow should get purged, and the coordinator parent should get purged --> both will get purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeCoordWithWFChild2MoreThanLimit() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testPurgeableCoordinatorMultiplePurgeableWorkflows() throws Exception {
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob1.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction2 = addRecordToWfActionTable(wfJob2.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction3 = addRecordToWfActionTable(wfJob3.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction4 = addRecordToWfActionTable(wfJob4.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction5 = addRecordToWfActionTable(wfJob5.getId(), "1", WorkflowAction.Status.OK);
-        CoordinatorActionBean coordAction1 = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob1.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction2 = addRecordToCoordActionTable(coordJob.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob2.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction3 = addRecordToCoordActionTable(coordJob.getId(), 3, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob3.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction4 = addRecordToCoordActionTable(coordJob.getId(), 4, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob4.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction5 = addRecordToCoordActionTable(coordJob.getId(), 5, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob5.getId(), "SUCCEEDED", 0);
-
-        WorkflowJobGetJPAExecutor wfJob1GetCmd = new WorkflowJobGetJPAExecutor(wfJob1.getId());
-        WorkflowJobGetJPAExecutor wfJob2GetCmd = new WorkflowJobGetJPAExecutor(wfJob2.getId());
-        WorkflowJobGetJPAExecutor wfJob3GetCmd = new WorkflowJobGetJPAExecutor(wfJob3.getId());
-        WorkflowJobGetJPAExecutor wfJob4GetCmd = new WorkflowJobGetJPAExecutor(wfJob4.getId());
-        WorkflowJobGetJPAExecutor wfJob5GetCmd = new WorkflowJobGetJPAExecutor(wfJob5.getId());
-        WorkflowActionGetJPAExecutor wfAction1GetCmd = new WorkflowActionGetJPAExecutor(wfAction1.getId());
-        WorkflowActionGetJPAExecutor wfAction2GetCmd = new WorkflowActionGetJPAExecutor(wfAction2.getId());
-        WorkflowActionGetJPAExecutor wfAction3GetCmd = new WorkflowActionGetJPAExecutor(wfAction3.getId());
-        WorkflowActionGetJPAExecutor wfAction4GetCmd = new WorkflowActionGetJPAExecutor(wfAction4.getId());
-        WorkflowActionGetJPAExecutor wfAction5GetCmd = new WorkflowActionGetJPAExecutor(wfAction5.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordAction1GetCmd = new CoordActionGetJPAExecutor(coordAction1.getId());
-        CoordActionGetJPAExecutor coordAction2GetCmd = new CoordActionGetJPAExecutor(coordAction2.getId());
-        CoordActionGetJPAExecutor coordAction3GetCmd = new CoordActionGetJPAExecutor(coordAction3.getId());
-        CoordActionGetJPAExecutor coordAction4GetCmd = new CoordActionGetJPAExecutor(coordAction4.getId());
-        CoordActionGetJPAExecutor coordAction5GetCmd = new CoordActionGetJPAExecutor(coordAction5.getId());
-
-        wfJob1 = jpaService.execute(wfJob1GetCmd);
-        wfJob2 = jpaService.execute(wfJob2GetCmd);
-        wfJob3 = jpaService.execute(wfJob3GetCmd);
-        wfJob4 = jpaService.execute(wfJob4GetCmd);
-        wfJob5 = jpaService.execute(wfJob5GetCmd);
-        wfAction1 = jpaService.execute(wfAction1GetCmd);
-        wfAction2 = jpaService.execute(wfAction2GetCmd);
-        wfAction3 = jpaService.execute(wfAction3GetCmd);
-        wfAction4 = jpaService.execute(wfAction4GetCmd);
-        wfAction5 = jpaService.execute(wfAction5GetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction1 = jpaService.execute(coordAction1GetCmd);
-        coordAction2 = jpaService.execute(coordAction2GetCmd);
-        coordAction3 = jpaService.execute(coordAction3GetCmd);
-        coordAction4 = jpaService.execute(coordAction4GetCmd);
-        coordAction5 = jpaService.execute(coordAction5GetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob1.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob2.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob3.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob4.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob5.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction1.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction2.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction3.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction4.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction5.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction1.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction2.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction3.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction4.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction5.getStatus());
-
-        new PurgeXCommand(getNumDaysToNotBePurged(wfJob1.getEndTime()), 7, 1, 3).call();
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordAction1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action 1 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordAction2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action 2 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordAction3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action 3 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordAction4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action 4 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordAction5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action 5 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJob1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job 1 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJob2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job 2 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJob3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job 3 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJob4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job 4 should not have been purged");
-        }
+        WorkflowJobBean[] wfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
+        WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
+        CoordinatorActionBean[] coordActions = new CoordinatorActionBean[TEST_CHILD_NUM];
+        for (int i=0; i<TEST_CHILD_NUM; ++i) {
+            wfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+            wfActions[i] = addRecordToWfActionTable(wfJobs[i].getId(), "1", WorkflowAction.Status.OK);
+            coordActions[i] = addRecordToCoordActionTable(coordJob.getId(), i, CoordinatorAction.Status.SUCCEEDED,
+                    "coord-action-get.xml", wfJobs[i].getId(), "SUCCEEDED", 0);
+        }
+
+        purgeWithDefaultParameters();
+
+        assertCoordinatorJobPurged(coordJob);
+        assertCoordinatorActionsPurged(coordActions);
+        assertWorkflowJobsPurged(wfJobs);
+        assertWorkflowActionsPurged(wfActions);
+    }
 
-        try {
-            jpaService.execute(wfJob5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job 5 should not have been purged");
-        }
+    /**
+     * Test : The workflow and coordinator should get purged, but the bundle parent shouldn't get purged --> none will get purged
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testUnpurgeableBundlePurgeableCoordinatorPurgeableWorkflow() throws Exception {
+        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
+        BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
+                Job.Status.SUCCEEDED);
 
-        try {
-            jpaService.execute(wfAction1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 1 should not have been purged");
-        }
+        purgeWithSpecialParameters(bundleJob);
 
-        try {
-            jpaService.execute(wfAction2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 2 should not have been purged");
-        }
+        assertBundleJobNotPurged(bundleJob);
+        assertBundleActionNotPurged(bundleAction);
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionNotPurged(coordAction);
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
+    }
 
-        try {
-            jpaService.execute(wfAction3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 3 should not have been purged");
-        }
+    /**
+     * Test : The workflow and coordinator should not get purged, but the bundle parent should get purged --> none will get purged
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testPurgeableBundleUnpurgeableCoordinatorUnpurgeableWorkflow() throws Exception {
+        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
+        BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
+                Job.Status.SUCCEEDED);
 
-        try {
-            jpaService.execute(wfAction4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 4 should not have been purged");
-        }
+        purgeWithSpecialParameters(wfJob, coordJob);
 
-        try {
-            jpaService.execute(wfAction5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 5 should not have been purged");
-        }
+        assertBundleJobNotPurged(bundleJob);
+        assertBundleActionNotPurged(bundleAction);
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionNotPurged(coordAction);
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
     }
 
     /**
-     * Test : The workflow should get purged, and the coordinator parent should get purged --> both will get purged
+     * Test : The workflow and coordinator should not get purged, but the bundle parent should get purged --> none will get purged
+     * There are more coordinator children than the limit
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeCoordWithWFChild3() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
+    public void testPurgeableBundleAndOverTheLimitUnpurgeableCoordinatorsAndWorkflows() throws Exception {
+        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        CoordinatorJobBean[] coordJobs = new CoordinatorJobBean[TEST_CHILD_NUM];
+        WorkflowJobBean[] wfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
+        WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
+        CoordinatorActionBean[] coordActions = new CoordinatorActionBean[TEST_CHILD_NUM];
+        BundleActionBean[] bundleActions = new BundleActionBean[TEST_CHILD_NUM];
+        for (int i=0; i<TEST_CHILD_NUM; ++i) {
+            coordJobs[i] = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+            coordJobs[i].setAppName("coord" + i);
+            CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJobs[i]);
+            wfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+            wfActions[i] = addRecordToWfActionTable(wfJobs[i].getId(), "1", WorkflowAction.Status.OK);
+            coordActions[i] = addRecordToCoordActionTable(coordJobs[i].getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                    "coord-action-get.xml", wfJobs[i].getId(), "SUCCEEDED", 0);
+
+            bundleActions[i] = addRecordToBundleActionTable(bundleJob.getId(), coordJobs[i].getId(), coordJobs[i].getAppName(),
+                    0, Job.Status.SUCCEEDED);
+        }
+
+        purgeWithSpecialParameters(LIMIT_3_ITEMS, wfJobs[0], coordJobs[0]);
+
+        assertBundleJobNotPurged(bundleJob);
+        assertBundleActionsNotPurged(bundleActions);
+        assertCoordinatorJobsNotPurged(coordJobs);
+        assertCoordinatorActionsNotPurged(coordActions);
+        assertWorkflowJobsNotPurged(wfJobs);
+        assertWorkflowActionsNotPurged(wfActions);
+    }
 
+    /**
+     * Test : The workflow and coordinator should get purged, and the bundle parent should get purged --> all will get purged
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testPurgeableBundlePurgeableCoordinatorPurgeableWorkflow() throws Exception {
+        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
         CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
+        BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
+                Job.Status.SUCCEEDED);
 
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
+        purgeWithDefaultParameters();
 
-        new PurgeXCommand(7, 7, 1, 10).call();
+        assertBundleJobPurged(bundleJob);
+        assertBundleActionPurged(bundleAction);
+        assertCoordinatorJobPurged(coordJob);
+        assertCoordinatorActionPurged(coordAction);
+        assertWorkflowJobPurged(wfJob);
+        assertWorkflowActionPurged(wfAction);
+    }
 
-        try {
-            jpaService.execute(coordJobGetCmd);
-            fail("Coordinator Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+    /**
+     * Test : The workflow and coordinator should get purged, and the bundle parent should get purged --> all will get purged
+     * There are more coordinator children than the limit
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testPurgeableBundleOverTheLimitPurgeableCoordinatorsOverTheLimitPurgeableWorkflows() throws Exception {
+        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        CoordinatorJobBean[] coordJobs = new CoordinatorJobBean[TEST_CHILD_NUM];
+        WorkflowJobBean[] wfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
+        WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
+        CoordinatorActionBean[] coordActions = new CoordinatorActionBean[TEST_CHILD_NUM];
+        BundleActionBean[] bundleActions = new BundleActionBean[TEST_CHILD_NUM];
+        for (int i=0; i<TEST_CHILD_NUM; ++i) {
+            coordJobs[i] = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+            coordJobs[i].setAppName("coord" + i);
+            CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJobs[i]);
+            wfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+            wfActions[i] = addRecordToWfActionTable(wfJobs[i].getId(), "1", WorkflowAction.Status.OK);
+            coordActions[i] = addRecordToCoordActionTable(coordJobs[i].getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                    "coord-action-get.xml", wfJobs[i].getId(), "SUCCEEDED", 0);
+            bundleActions[i] = addRecordToBundleActionTable(bundleJob.getId(), coordJobs[i].getId(), coordJobs[i].getAppName(),
+                    0, Job.Status.SUCCEEDED);
+        }
+
+        purgeWithDefaultParameters();
+
+        assertBundleJobPurged(bundleJob);
+        assertBundleActionsPurged(bundleActions);
+        assertCoordinatorJobsPurged(coordJobs);
+        assertCoordinatorActionsPurged(coordActions);
+        assertWorkflowJobsPurged(wfJobs);
+        assertWorkflowActionsPurged(wfActions);
+    }
 
-        try {
-            jpaService.execute(coordActionGetCmd);
-            fail("Coordinator Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+    /**
+     * Test : The subworkflow should get purged, but the workflow parent shouldn't get purged --> neither will get purged
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testSucceededWorkflowRunningSubWorkflow() throws Exception {
+        WorkflowJobBean wfJob = addRecordToWfJobTableForNegCase(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
 
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
-        try {
-            jpaService.execute(wfActionGetCmd);
-            fail("Workflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
+        assertWorkflowJobNotPurged(subwfJob);
+        assertWorkflowActionNotPurged(subwfAction);
     }
 
     /**
-     * Test : The workflow should get purged, and the coordinator parent should get purged --> both will get purged
+     * Test : The subworkflow shouldn't get purged, but the workflow parent should get purged --> neither will get purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeCoordWithWFChild3MoreThanLimit() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob1.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction2 = addRecordToWfActionTable(wfJob2.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction3 = addRecordToWfActionTable(wfJob3.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction4 = addRecordToWfActionTable(wfJob4.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction5 = addRecordToWfActionTable(wfJob5.getId(), "1", WorkflowAction.Status.OK);
-        CoordinatorActionBean coordAction1 = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob1.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction2 = addRecordToCoordActionTable(coordJob.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob2.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction3 = addRecordToCoordActionTable(coordJob.getId(), 3, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob3.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction4 = addRecordToCoordActionTable(coordJob.getId(), 4, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob4.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction5 = addRecordToCoordActionTable(coordJob.getId(), 5, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob5.getId(), "SUCCEEDED", 0);
-
-        WorkflowJobGetJPAExecutor wfJob1GetCmd = new WorkflowJobGetJPAExecutor(wfJob1.getId());
-        WorkflowJobGetJPAExecutor wfJob2GetCmd = new WorkflowJobGetJPAExecutor(wfJob2.getId());
-        WorkflowJobGetJPAExecutor wfJob3GetCmd = new WorkflowJobGetJPAExecutor(wfJob3.getId());
-        WorkflowJobGetJPAExecutor wfJob4GetCmd = new WorkflowJobGetJPAExecutor(wfJob4.getId());
-        WorkflowJobGetJPAExecutor wfJob5GetCmd = new WorkflowJobGetJPAExecutor(wfJob5.getId());
-        WorkflowActionGetJPAExecutor wfAction1GetCmd = new WorkflowActionGetJPAExecutor(wfAction1.getId());
-        WorkflowActionGetJPAExecutor wfAction2GetCmd = new WorkflowActionGetJPAExecutor(wfAction2.getId());
-        WorkflowActionGetJPAExecutor wfAction3GetCmd = new WorkflowActionGetJPAExecutor(wfAction3.getId());
-        WorkflowActionGetJPAExecutor wfAction4GetCmd = new WorkflowActionGetJPAExecutor(wfAction4.getId());
-        WorkflowActionGetJPAExecutor wfAction5GetCmd = new WorkflowActionGetJPAExecutor(wfAction5.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordAction1GetCmd = new CoordActionGetJPAExecutor(coordAction1.getId());
-        CoordActionGetJPAExecutor coordAction2GetCmd = new CoordActionGetJPAExecutor(coordAction2.getId());
-        CoordActionGetJPAExecutor coordAction3GetCmd = new CoordActionGetJPAExecutor(coordAction3.getId());
-        CoordActionGetJPAExecutor coordAction4GetCmd = new CoordActionGetJPAExecutor(coordAction4.getId());
-        CoordActionGetJPAExecutor coordAction5GetCmd = new CoordActionGetJPAExecutor(coordAction5.getId());
-
-        wfJob1 = jpaService.execute(wfJob1GetCmd);
-        wfJob2 = jpaService.execute(wfJob2GetCmd);
-        wfJob3 = jpaService.execute(wfJob3GetCmd);
-        wfJob4 = jpaService.execute(wfJob4GetCmd);
-        wfJob5 = jpaService.execute(wfJob5GetCmd);
-        wfAction1 = jpaService.execute(wfAction1GetCmd);
-        wfAction2 = jpaService.execute(wfAction2GetCmd);
-        wfAction3 = jpaService.execute(wfAction3GetCmd);
-        wfAction4 = jpaService.execute(wfAction4GetCmd);
-        wfAction5 = jpaService.execute(wfAction5GetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction1 = jpaService.execute(coordAction1GetCmd);
-        coordAction2 = jpaService.execute(coordAction2GetCmd);
-        coordAction3 = jpaService.execute(coordAction3GetCmd);
-        coordAction4 = jpaService.execute(coordAction4GetCmd);
-        coordAction5 = jpaService.execute(coordAction5GetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob1.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob2.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob3.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob4.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob5.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction1.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction2.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction3.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction4.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction5.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction1.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction2.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction3.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction4.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction5.getStatus());
-
-        new PurgeXCommand(7, 7, 1, 10).call();
+    public void testRunningWorkflowSucceededSubWorkflow() throws Exception {
+        WorkflowJobBean wfJob = addRecordToWfJobTableForNegCase(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
+                wfJob.getId());
+        WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.RUNNING);
 
-        try {
-            jpaService.execute(coordJobGetCmd);
-            fail("Coordinator Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
-        try {
-            jpaService.execute(coordAction1GetCmd);
-            fail("Coordinator Action 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordAction2GetCmd);
-            fail("Coordinator Action 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordAction3GetCmd);
-            fail("Coordinator Action 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordAction4GetCmd);
-            fail("Coordinator Action 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordAction5GetCmd);
-            fail("Coordinator Action 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJob1GetCmd);
-            fail("Workflow Job 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJob2GetCmd);
-            fail("Workflow Job 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJob3GetCmd);
-            fail("Workflow Job 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJob4GetCmd);
-            fail("Workflow Job 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJob5GetCmd);
-            fail("Workflow Job 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction1GetCmd);
-            fail("Workflow Action 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction2GetCmd);
-            fail("Workflow Action 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction3GetCmd);
-            fail("Workflow Action 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction4GetCmd);
-            fail("Workflow Action 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction5GetCmd);
-            fail("Workflow Action 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-    }
-
-    /**
-     * Test : The workflow and coordinator should get purged, but the bundle parent shouldn't get purged --> none will get purged
-     *
-     * @throws Exception
-     */
-    public void testPurgeBundleWithCoordChildWithWFChild1() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
-        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
-        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
-        BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
-                Job.Status.SUCCEEDED);
-
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-        BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId());
-        BundleActionGetJPAExecutor bundleActionGetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob.getAppName());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        bundleJob = jpaService.execute(bundleJobGetCmd);
-        bundleAction = jpaService.execute(bundleActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleJob.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction.getStatus());
-
-        new PurgeXCommand(7, 7, getNumDaysToNotBePurged(bundleJob.getLastModifiedTime()), 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(bundleActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
-    }
-
-    /**
-     * Test : The workflow and coordinator should not get purged, but the bundle parent should get purged --> none will get purged
-     *
-     * @throws Exception
-     */
-    public void testPurgeBundleWithCoordChildWithWFChild2() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
-        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
-        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
-        BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
-                Job.Status.SUCCEEDED);
-
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-        BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId());
-        BundleActionGetJPAExecutor bundleActionGetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob.getAppName());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        bundleJob = jpaService.execute(bundleJobGetCmd);
-        bundleAction = jpaService.execute(bundleActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleJob.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction.getStatus());
-
-        new PurgeXCommand(getNumDaysToNotBePurged(wfJob.getEndTime()),
-                getNumDaysToNotBePurged(coordJob.getLastModifiedTime()), 7, 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(bundleActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
-    }
-
-    /**
-     * Test : The workflow and coordinator should not get purged, but the bundle parent should get purged --> none will get purged
-     * There are more coordinator children than the limit
-     *
-     * @throws Exception
-     */
-    public void testPurgeBundleWithCoordChildWithWFChild2MoreThanLimit() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
-        CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        coordJob2.setAppName("coord2");
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob2);
-        CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        coordJob3.setAppName("coord3");
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob3);
-        CoordinatorJobBean coordJob4 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        coordJob4.setAppName("coord4");
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob4);
-        CoordinatorJobBean coordJob5 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        coordJob5.setAppName("coord5");
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob5);
-        WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob1.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction2 = addRecordToWfActionTable(wfJob2.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction3 = addRecordToWfActionTable(wfJob3.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction4 = addRecordToWfActionTable(wfJob4.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction5 = addRecordToWfActionTable(wfJob5.getId(), "1", WorkflowAction.Status.OK);
-        CoordinatorActionBean coordAction1 = addRecordToCoordActionTable(coordJob1.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob1.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction2 = addRecordToCoordActionTable(coordJob2.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob2.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction3 = addRecordToCoordActionTable(coordJob3.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob3.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction4 = addRecordToCoordActionTable(coordJob4.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob4.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction5 = addRecordToCoordActionTable(coordJob5.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob5.getId(), "SUCCEEDED", 0);
-        BundleActionBean bundleAction1 = addRecordToBundleActionTable(bundleJob.getId(), coordJob1.getId(), coordJob1.getAppName(),
-                0, Job.Status.SUCCEEDED);
-        BundleActionBean bundleAction2 = addRecordToBundleActionTable(bundleJob.getId(), coordJob2.getId(), coordJob2.getAppName(),
-                0, Job.Status.SUCCEEDED);
-        BundleActionBean bundleAction3 = addRecordToBundleActionTable(bundleJob.getId(), coordJob3.getId(), coordJob3.getAppName(),
-                0, Job.Status.SUCCEEDED);
-        BundleActionBean bundleAction4 = addRecordToBundleActionTable(bundleJob.getId(), coordJob4.getId(), coordJob4.getAppName(),
-                0, Job.Status.SUCCEEDED);
-        BundleActionBean bundleAction5 = addRecordToBundleActionTable(bundleJob.getId(), coordJob5.getId(), coordJob5.getAppName(),
-                0, Job.Status.SUCCEEDED);
-
-        WorkflowJobGetJPAExecutor wfJob1GetCmd = new WorkflowJobGetJPAExecutor(wfJob1.getId());
-        WorkflowJobGetJPAExecutor wfJob2GetCmd = new WorkflowJobGetJPAExecutor(wfJob2.getId());
-        WorkflowJobGetJPAExecutor wfJob3GetCmd = new WorkflowJobGetJPAExecutor(wfJob3.getId());
-        WorkflowJobGetJPAExecutor wfJob4GetCmd = new WorkflowJobGetJPAExecutor(wfJob4.getId());
-        WorkflowJobGetJPAExecutor wfJob5GetCmd = new WorkflowJobGetJPAExecutor(wfJob5.getId());
-        WorkflowActionGetJPAExecutor wfAction1GetCmd = new WorkflowActionGetJPAExecutor(wfAction1.getId());
-        WorkflowActionGetJPAExecutor wfAction2GetCmd = new WorkflowActionGetJPAExecutor(wfAction2.getId());
-        WorkflowActionGetJPAExecutor wfAction3GetCmd = new WorkflowActionGetJPAExecutor(wfAction3.getId());
-        WorkflowActionGetJPAExecutor wfAction4GetCmd = new WorkflowActionGetJPAExecutor(wfAction4.getId());
-        WorkflowActionGetJPAExecutor wfAction5GetCmd = new WorkflowActionGetJPAExecutor(wfAction5.getId());
-        CoordJobGetJPAExecutor coordJob1GetCmd = new CoordJobGetJPAExecutor(coordJob1.getId());
-        CoordJobGetJPAExecutor coordJob2GetCmd = new CoordJobGetJPAExecutor(coordJob2.getId());
-        CoordJobGetJPAExecutor coordJob3GetCmd = new CoordJobGetJPAExecutor(coordJob3.getId());
-        CoordJobGetJPAExecutor coordJob4GetCmd = new CoordJobGetJPAExecutor(coordJob4.getId());
-        CoordJobGetJPAExecutor coordJob5GetCmd = new CoordJobGetJPAExecutor(coordJob5.getId());
-        CoordActionGetJPAExecutor coordAction1GetCmd = new CoordActionGetJPAExecutor(coordAction1.getId());
-        CoordActionGetJPAExecutor coordAction2GetCmd = new CoordActionGetJPAExecutor(coordAction2.getId());
-        CoordActionGetJPAExecutor coordAction3GetCmd = new CoordActionGetJPAExecutor(coordAction3.getId());
-        CoordActionGetJPAExecutor coordAction4GetCmd = new CoordActionGetJPAExecutor(coordAction4.getId());
-        CoordActionGetJPAExecutor coordAction5GetCmd = new CoordActionGetJPAExecutor(coordAction5.getId());
-        BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId());
-        BundleActionGetJPAExecutor bundleAction1GetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob1.getAppName());
-        BundleActionGetJPAExecutor bundleAction2GetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob2.getAppName());
-        BundleActionGetJPAExecutor bundleAction3GetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob3.getAppName());
-        BundleActionGetJPAExecutor bundleAction4GetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob4.getAppName());
-        BundleActionGetJPAExecutor bundleAction5GetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob5.getAppName());
-
-        wfJob1 = jpaService.execute(wfJob1GetCmd);
-        wfJob2 = jpaService.execute(wfJob2GetCmd);
-        wfJob3 = jpaService.execute(wfJob3GetCmd);
-        wfJob4 = jpaService.execute(wfJob4GetCmd);
-        wfJob5 = jpaService.execute(wfJob5GetCmd);
-        wfAction1 = jpaService.execute(wfAction1GetCmd);
-        wfAction2 = jpaService.execute(wfAction2GetCmd);
-        wfAction3 = jpaService.execute(wfAction3GetCmd);
-        wfAction4 = jpaService.execute(wfAction4GetCmd);
-        wfAction5 = jpaService.execute(wfAction5GetCmd);
-        coordJob1 = jpaService.execute(coordJob1GetCmd);
-        coordJob2 = jpaService.execute(coordJob2GetCmd);
-        coordJob3 = jpaService.execute(coordJob3GetCmd);
-        coordJob4 = jpaService.execute(coordJob4GetCmd);
-        coordJob5 = jpaService.execute(coordJob5GetCmd);
-        coordAction1 = jpaService.execute(coordAction1GetCmd);
-        coordAction2 = jpaService.execute(coordAction2GetCmd);
-        coordAction3 = jpaService.execute(coordAction3GetCmd);
-        coordAction4 = jpaService.execute(coordAction4GetCmd);
-        coordAction5 = jpaService.execute(coordAction5GetCmd);
-        bundleJob = jpaService.execute(bundleJobGetCmd);
-        bundleAction1 = jpaService.execute(bundleAction1GetCmd);
-        bundleAction2 = jpaService.execute(bundleAction2GetCmd);
-        bundleAction3 = jpaService.execute(bundleAction3GetCmd);
-        bundleAction4 = jpaService.execute(bundleAction4GetCmd);
-        bundleAction5 = jpaService.execute(bundleAction5GetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob1.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob2.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob3.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob4.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob5.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction1.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction2.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction3.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction4.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction5.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob1.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob2.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob3.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob4.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob5.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction1.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction2.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction3.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction4.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction5.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleJob.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction1.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction2.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction3.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction4.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction5.getStatus());
-
-        new PurgeXCommand(getNumDaysToNotBePurged(wfJob1.getEndTime()),
-                getNumDaysToNotBePurged(coordJob1.getLastModifiedTime()), 7, 3).call();
-
-        try {
-            jpaService.execute(bundleJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(bundleAction1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action 1 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(bundleAction2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action 2 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(bundleAction3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action 3 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(bundleAction4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action 4 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(bundleAction5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action 5 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordJob1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job 1 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordJob2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job 2 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordJob3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job 3 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordJob4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job 4 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordJob5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job 5 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordAction1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action 1 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordAction2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action 2 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordAction3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action 3 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordAction4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action 4 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordAction5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action 5 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJob1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job 1 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJob2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job 2 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJob3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job 3 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJob4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job 4 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJob5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job 5 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfAction1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 1 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfAction2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 2 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfAction3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 3 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfAction4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 4 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfAction5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 5 should not have been purged");
-        }
-    }
-
-    /**
-     * Test : The workflow and coordinator should get purged, and the bundle parent should get purged --> all will get purged
-     *
-     * @throws Exception
-     */
-    public void testPurgeBundleWithCoordChildWithWFChild3() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
-        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
-        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
-        BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
-                Job.Status.SUCCEEDED);
-
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-        BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId());
-        BundleActionGetJPAExecutor bundleActionGetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob.getAppName());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        bundleJob = jpaService.execute(bundleJobGetCmd);
-        bundleAction = jpaService.execute(bundleActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleJob.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction.getStatus());
-
-        new PurgeXCommand(7, 7, 7, 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetCmd);
-            fail("Bundle Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(bundleActionGetCmd);
-            fail("Bundle Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-            fail("Coordinator Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-            fail("Coordinator Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-            fail("Workflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-    }
-
-    /**
-     * Test : The workflow and coordinator should get purged, and the bundle parent should get purged --> all will get purged
-     * There are more coordinator children than the limit
-     *
-     * @throws Exception
-     */
-    public void testPurgeBundleWithCoordChildWithWFChild3MoreThanLimit() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
-        CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        coordJob2.setAppName("coord2");
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob2);
-        CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        coordJob3.setAppName("coord3");
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob3);
-        CoordinatorJobBean coordJob4 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        coordJob4.setAppName("coord4");
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob4);
-        CoordinatorJobBean coordJob5 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        coordJob5.setAppName("coord5");
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob5);
-        WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowJobBean wfJob5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob1.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction2 = addRecordToWfActionTable(wfJob2.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction3 = addRecordToWfActionTable(wfJob3.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction4 = addRecordToWfActionTable(wfJob4.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction5 = addRecordToWfActionTable(wfJob5.getId(), "1", WorkflowAction.Status.OK);
-        CoordinatorActionBean coordAction1 = addRecordToCoordActionTable(coordJob1.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob1.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction2 = addRecordToCoordActionTable(coordJob2.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob2.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction3 = addRecordToCoordActionTable(coordJob3.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob3.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction4 = addRecordToCoordActionTable(coordJob4.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob4.getId(), "SUCCEEDED", 0);
-        CoordinatorActionBean coordAction5 = addRecordToCoordActionTable(coordJob5.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob5.getId(), "SUCCEEDED", 0);
-        BundleActionBean bundleAction1 = addRecordToBundleActionTable(bundleJob.getId(), coordJob1.getId(), coordJob1.getAppName(),
-                0, Job.Status.SUCCEEDED);
-        BundleActionBean bundleAction2 = addRecordToBundleActionTable(bundleJob.getId(), coordJob2.getId(), coordJob2.getAppName(),
-                0, Job.Status.SUCCEEDED);
-        BundleActionBean bundleAction3 = addRecordToBundleActionTable(bundleJob.getId(), coordJob3.getId(), coordJob3.getAppName(),
-                0, Job.Status.SUCCEEDED);
-        BundleActionBean bundleAction4 = addRecordToBundleActionTable(bundleJob.getId(), coordJob4.getId(), coordJob4.getAppName(),
-                0, Job.Status.SUCCEEDED);
-        BundleActionBean bundleAction5 = addRecordToBundleActionTable(bundleJob.getId(), coordJob5.getId(), coordJob5.getAppName(),
-                0, Job.Status.SUCCEEDED);
-
-        WorkflowJobGetJPAExecutor wfJob1GetCmd = new WorkflowJobGetJPAExecutor(wfJob1.getId());
-        WorkflowJobGetJPAExecutor wfJob2GetCmd = new WorkflowJobGetJPAExecutor(wfJob2.getId());
-        WorkflowJobGetJPAExecutor wfJob3GetCmd = new WorkflowJobGetJPAExecutor(wfJob3.getId());
-        WorkflowJobGetJPAExecutor wfJob4GetCmd = new WorkflowJobGetJPAExecutor(wfJob4.getId());
-        WorkflowJobGetJPAExecutor wfJob5GetCmd = new WorkflowJobGetJPAExecutor(wfJob5.getId());
-        WorkflowActionGetJPAExecutor wfAction1GetCmd = new WorkflowActionGetJPAExecutor(wfAction1.getId());
-        WorkflowActionGetJPAExecutor wfAction2GetCmd = new WorkflowActionGetJPAExecutor(wfAction2.getId());
-        WorkflowActionGetJPAExecutor wfAction3GetCmd = new WorkflowActionGetJPAExecutor(wfAction3.getId());
-        WorkflowActionGetJPAExecutor wfAction4GetCmd = new WorkflowActionGetJPAExecutor(wfAction4.getId());
-        WorkflowActionGetJPAExecutor wfAction5GetCmd = new WorkflowActionGetJPAExecutor(wfAction5.getId());
-        CoordJobGetJPAExecutor coordJob1GetCmd = new CoordJobGetJPAExecutor(coordJob1.getId());
-        CoordJobGetJPAExecutor coordJob2GetCmd = new CoordJobGetJPAExecutor(coordJob2.getId());
-        CoordJobGetJPAExecutor coordJob3GetCmd = new CoordJobGetJPAExecutor(coordJob3.getId());
-        CoordJobGetJPAExecutor coordJob4GetCmd = new CoordJobGetJPAExecutor(coordJob4.getId());
-        CoordJobGetJPAExecutor coordJob5GetCmd = new CoordJobGetJPAExecutor(coordJob5.getId());
-        CoordActionGetJPAExecutor coordAction1GetCmd = new CoordActionGetJPAExecutor(coordAction1.getId());
-        CoordActionGetJPAExecutor coordAction2GetCmd = new CoordActionGetJPAExecutor(coordAction2.getId());
-        CoordActionGetJPAExecutor coordAction3GetCmd = new CoordActionGetJPAExecutor(coordAction3.getId());
-        CoordActionGetJPAExecutor coordAction4GetCmd = new CoordActionGetJPAExecutor(coordAction4.getId());
-        CoordActionGetJPAExecutor coordAction5GetCmd = new CoordActionGetJPAExecutor(coordAction5.getId());
-        BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId());
-        BundleActionGetJPAExecutor bundleAction1GetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob1.getAppName());
-        BundleActionGetJPAExecutor bundleAction2GetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob2.getAppName());
-        BundleActionGetJPAExecutor bundleAction3GetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob3.getAppName());
-        BundleActionGetJPAExecutor bundleAction4GetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob4.getAppName());
-        BundleActionGetJPAExecutor bundleAction5GetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob5.getAppName());
-
-        wfJob1 = jpaService.execute(wfJob1GetCmd);
-        wfJob2 = jpaService.execute(wfJob2GetCmd);
-        wfJob3 = jpaService.execute(wfJob3GetCmd);
-        wfJob4 = jpaService.execute(wfJob4GetCmd);
-        wfJob5 = jpaService.execute(wfJob5GetCmd);
-        wfAction1 = jpaService.execute(wfAction1GetCmd);
-        wfAction2 = jpaService.execute(wfAction2GetCmd);
-        wfAction3 = jpaService.execute(wfAction3GetCmd);
-        wfAction4 = jpaService.execute(wfAction4GetCmd);
-        wfAction5 = jpaService.execute(wfAction5GetCmd);
-        coordJob1 = jpaService.execute(coordJob1GetCmd);
-        coordJob2 = jpaService.execute(coordJob2GetCmd);
-        coordJob3 = jpaService.execute(coordJob3GetCmd);
-        coordJob4 = jpaService.execute(coordJob4GetCmd);
-        coordJob5 = jpaService.execute(coordJob5GetCmd);
-        coordAction1 = jpaService.execute(coordAction1GetCmd);
-        coordAction2 = jpaService.execute(coordAction2GetCmd);
-        coordAction3 = jpaService.execute(coordAction3GetCmd);
-        coordAction4 = jpaService.execute(coordAction4GetCmd);
-        coordAction5 = jpaService.execute(coordAction5GetCmd);
-        bundleJob = jpaService.execute(bundleJobGetCmd);
-        bundleAction1 = jpaService.execute(bundleAction1GetCmd);
-        bundleAction2 = jpaService.execute(bundleAction2GetCmd);
-        bundleAction3 = jpaService.execute(bundleAction3GetCmd);
-        bundleAction4 = jpaService.execute(bundleAction4GetCmd);
-        bundleAction5 = jpaService.execute(bundleAction5GetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob1.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob2.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob3.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob4.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob5.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction1.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction2.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction3.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction4.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction5.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob1.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob2.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob3.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob4.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob5.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction1.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction2.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction3.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction4.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction5.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleJob.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction1.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction2.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction3.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction4.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction5.getStatus());
-
-        new PurgeXCommand(7, 7, 7, 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetCmd);
-            fail("Bundle Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(bundleAction1GetCmd);
-            fail("Bundle Action 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(bundleAction2GetCmd);
-            fail("Bundle Action 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(bundleAction3GetCmd);
-            fail("Bundle Action 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(bundleAction4GetCmd);
-            fail("Bundle Action 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(bundleAction5GetCmd);
-            fail("Bundle Action 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordJob1GetCmd);
-            fail("Coordinator Job 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordJob2GetCmd);
-            fail("Coordinator Job 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordJob3GetCmd);
-            fail("Coordinator Job 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordJob4GetCmd);
-            fail("Coordinator Job 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordJob5GetCmd);
-            fail("Coordinator Job 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordAction1GetCmd);
-            fail("Coordinator Action 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordAction2GetCmd);
-            fail("Coordinator Action 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordAction3GetCmd);
-            fail("Coordinator Action 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordAction4GetCmd);
-            fail("Coordinator Action 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordAction5GetCmd);
-            fail("Coordinator Action 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJob1GetCmd);
-            fail("Workflow Job 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJob2GetCmd);
-            fail("Workflow Job 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJob3GetCmd);
-            fail("Workflow Job 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJob4GetCmd);
-            fail("Workflow Job 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJob5GetCmd);
-            fail("Workflow Job 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction1GetCmd);
-            fail("Workflow Action 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction2GetCmd);
-            fail("Workflow Action 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction3GetCmd);
-            fail("Workflow Action 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction4GetCmd);
-            fail("Workflow Action 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction5GetCmd);
-            fail("Workflow Action 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-    }
-
-    /**
-     * Test : The subworkflow should get purged, but the workflow parent shouldn't get purged --> neither will get purged
-     *
-     * @throws Exception
-     */
-    public void testPurgeWFWithSubWF1() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        WorkflowJobBean wfJob = addRecordToWfJobTableForNegCase(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
-        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
-                wfJob.getId());
-        WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
-
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        WorkflowJobGetJPAExecutor subwfJobGetCmd = new WorkflowJobGetJPAExecutor(subwfJob.getId());
-        WorkflowActionGetJPAExecutor subwfActionGetCmd = new WorkflowActionGetJPAExecutor(subwfAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        subwfJob = jpaService.execute(subwfJobGetCmd);
-        subwfAction = jpaService.execute(subwfActionGetCmd);
-        assertEquals(WorkflowJob.Status.RUNNING, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction.getStatus());
-
-        new PurgeXCommand(7, 1, 1, 10).call();
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            je.printStackTrace();
-            fail("Workflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action should not have been purged");
-        }
-    }
-
-    /**
-     * Test : The subworkflow shouldn't get purged, but the workflow parent should get purged --> neither will get purged
-     *
-     * @throws Exception
-     */
-    public void testPurgeWFWithSubWF2() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        WorkflowJobBean wfJob = addRecordToWfJobTableForNegCase(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
-                wfJob.getId());
-        WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.RUNNING);
-
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        WorkflowJobGetJPAExecutor subwfJobGetCmd = new WorkflowJobGetJPAExecutor(subwfJob.getId());
-        WorkflowActionGetJPAExecutor subwfActionGetCmd = new WorkflowActionGetJPAExecutor(subwfAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        subwfJob = jpaService.execute(subwfJobGetCmd);
-        subwfAction = jpaService.execute(subwfActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(WorkflowJob.Status.RUNNING, subwfJob.getStatus());
-        assertEquals(WorkflowAction.Status.RUNNING, subwfAction.getStatus());
-
-        new PurgeXCommand(7, 1, 1, 10).call();
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action should not have been purged");
-        }
-    }
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
+        assertWorkflowJobNotPurged(subwfJob);
+        assertWorkflowActionNotPurged(subwfAction);
+    }
 
     /**
      * Test : The subworkflows shouldn't get purged, but the workflow parent should get purged --> none will get purged
      * There are more subworkflow children than the limit
      *
-     * @throws Exception
-     */
-    public void testPurgeWFWithSubWF2MoreThanLimit() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        WorkflowJobBean wfJob = addRecordToWfJobTableForNegCase(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction2 = addRecordToWfActionTable(wfJob.getId(), "2", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction3 = addRecordToWfActionTable(wfJob.getId(), "3", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction4 = addRecordToWfActionTable(wfJob.getId(), "4", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction5 = addRecordToWfActionTable(wfJob.getId(), "5", WorkflowAction.Status.OK);
-        WorkflowJobBean subwfJob1 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
-                wfJob.getId());
-        WorkflowJobBean subwfJob2 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
-                wfJob.getId());
-        WorkflowJobBean subwfJob3 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
-                wfJob.getId());
-        WorkflowJobBean subwfJob4 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
-                wfJob.getId());
-        WorkflowJobBean subwfJob5 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
-                wfJob.getId());
-        WorkflowActionBean subwfAction1 = addRecordToWfActionTable(subwfJob1.getId(), "1", WorkflowAction.Status.RUNNING);
-        WorkflowActionBean subwfAction2 = addRecordToWfActionTable(subwfJob2.getId(), "1", WorkflowAction.Status.RUNNING);
-        WorkflowActionBean subwfAction3 = addRecordToWfActionTable(subwfJob3.getId(), "1", WorkflowAction.Status.RUNNING);
-        WorkflowActionBean subwfAction4 = addRecordToWfActionTable(subwfJob4.getId(), "1", WorkflowAction.Status.RUNNING);
-        WorkflowActionBean subwfAction5 = addRecordToWfActionTable(subwfJob5.getId(), "1", WorkflowAction.Status.RUNNING);
-
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfAction1GetCmd = new WorkflowActionGetJPAExecutor(wfAction1.getId());
-        WorkflowActionGetJPAExecutor wfAction2GetCmd = new WorkflowActionGetJPAExecutor(wfAction2.getId());
-        WorkflowActionGetJPAExecutor wfAction3GetCmd = new WorkflowActionGetJPAExecutor(wfAction3.getId());
-        WorkflowActionGetJPAExecutor wfAction4GetCmd = new WorkflowActionGetJPAExecutor(wfAction4.getId());
-        WorkflowActionGetJPAExecutor wfAction5GetCmd = new WorkflowActionGetJPAExecutor(wfAction5.getId());
-        WorkflowJobGetJPAExecutor subwfJob1GetCmd = new WorkflowJobGetJPAExecutor(subwfJob1.getId());
-        WorkflowJobGetJPAExecutor subwfJob2GetCmd = new WorkflowJobGetJPAExecutor(subwfJob2.getId());
-        WorkflowJobGetJPAExecutor subwfJob3GetCmd = new WorkflowJobGetJPAExecutor(subwfJob3.getId());
-        WorkflowJobGetJPAExecutor subwfJob4GetCmd = new WorkflowJobGetJPAExecutor(subwfJob4.getId());
-        WorkflowJobGetJPAExecutor subwfJob5GetCmd = new WorkflowJobGetJPAExecutor(subwfJob5.getId());
-        WorkflowActionGetJPAExecutor subwfAction1GetCmd = new WorkflowActionGetJPAExecutor(subwfAction1.getId());
-        WorkflowActionGetJPAExecutor subwfAction2GetCmd = new WorkflowActionGetJPAExecutor(subwfAction2.getId());
-        WorkflowActionGetJPAExecutor subwfAction3GetCmd = new WorkflowActionGetJPAExecutor(subwfAction3.getId());
-        WorkflowActionGetJPAExecutor subwfAction4GetCmd = new WorkflowActionGetJPAExecutor(subwfAction4.getId());
-        WorkflowActionGetJPAExecutor subwfAction5GetCmd = new WorkflowActionGetJPAExecutor(subwfAction5.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction1 = jpaService.execute(wfAction1GetCmd);
-        wfAction2 = jpaService.execute(wfAction2GetCmd);
-        wfAction3 = jpaService.execute(wfAction3GetCmd);
-        wfAction4 = jpaService.execute(wfAction4GetCmd);
-        wfAction5 = jpaService.execute(wfAction5GetCmd);
-        subwfJob1 = jpaService.execute(subwfJob1GetCmd);
-        subwfJob2 = jpaService.execute(subwfJob2GetCmd);
-        subwfJob3 = jpaService.execute(subwfJob3GetCmd);
-        subwfJob4 = jpaService.execute(subwfJob4GetCmd);
-        subwfJob5 = jpaService.execute(subwfJob5GetCmd);
-        subwfAction1 = jpaService.execute(subwfAction1GetCmd);
-        subwfAction2 = jpaService.execute(subwfAction2GetCmd);
-        subwfAction3 = jpaService.execute(subwfAction3GetCmd);
-        subwfAction4 = jpaService.execute(subwfAction4GetCmd);
-        subwfAction5 = jpaService.execute(subwfAction5GetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction1.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction2.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction3.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction4.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction5.getStatus());
-        assertEquals(WorkflowJob.Status.RUNNING, subwfJob1.getStatus());
-        assertEquals(WorkflowJob.Status.RUNNING, subwfJob2.getStatus());
-        assertEquals(WorkflowJob.Status.RUNNING, subwfJob3.getStatus());
-        assertEquals(WorkflowJob.Status.RUNNING, subwfJob4.getStatus());
-        assertEquals(WorkflowJob.Status.RUNNING, subwfJob5.getStatus());
-        assertEquals(WorkflowAction.Status.RUNNING, subwfAction1.getStatus());
-        assertEquals(WorkflowAction.Status.RUNNING, subwfAction2.getStatus());
-        assertEquals(WorkflowAction.Status.RUNNING, subwfAction3.getStatus());
-        assertEquals(WorkflowAction.Status.RUNNING, subwfAction4.getStatus());
-        assertEquals(WorkflowAction.Status.RUNNING, subwfAction5.getStatus());
-
-        new PurgeXCommand(7, 1, 1, 10).call();
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfAction1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 1 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfAction2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 2 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfAction3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 3 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfAction4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 4 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfAction5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action 5 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfJob1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job 1 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfJob2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job 2 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfJob3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job 3 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfJob4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job 4 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfJob5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job 5 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfAction1GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action 1 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfAction2GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action 2 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfAction3GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action 3 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfAction4GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action 4 should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfAction5GetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action 5 should not have been purged");
-        }
-    }
-
-    /**
-     * Test : The subworkflow should get purged, and the workflow parent should get purged --> both will get purged
-     *
-     * @throws Exception
-     */
-    public void testPurgeWFWithSubWF3() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
-                wfJob.getId());
-        WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
-
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        WorkflowJobGetJPAExecutor subwfJobGetCmd = new WorkflowJobGetJPAExecutor(subwfJob.getId());
-        WorkflowActionGetJPAExecutor subwfActionGetCmd = new WorkflowActionGetJPAExecutor(subwfAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        subwfJob = jpaService.execute(subwfJobGetCmd);
-        subwfAction = jpaService.execute(subwfActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction.getStatus());
-
-        new PurgeXCommand(7, 1, 1, 10).call();
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-            fail("Workflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfJobGetCmd);
-            fail("SubWorkflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfActionGetCmd);
-            fail("SubWorkflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-    }
-
-    /**
-     * Test : The subworkflow should get purged, and the workflow parent should get purged --> both will get purged
-     * There are more subworkflow children than the limit
-     *
-     * @throws Exception
-     */
-    public void testPurgeWFWithSubWF3MoreThanLimit() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction2 = addRecordToWfActionTable(wfJob.getId(), "2", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction3 = addRecordToWfActionTable(wfJob.getId(), "3", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction4 = addRecordToWfActionTable(wfJob.getId(), "4", WorkflowAction.Status.OK);
-        WorkflowActionBean wfAction5 = addRecordToWfActionTable(wfJob.getId(), "5", WorkflowAction.Status.OK);
-        WorkflowJobBean subwfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
-                wfJob.getId());
-        WorkflowJobBean subwfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
-                wfJob.getId());
-        WorkflowJobBean subwfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
-                wfJob.getId());
-        WorkflowJobBean subwfJob4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
-                wfJob.getId());
-        WorkflowJobBean subwfJob5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
-                wfJob.getId());
-        WorkflowActionBean subwfAction1 = addRecordToWfActionTable(subwfJob1.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean subwfAction2 = addRecordToWfActionTable(subwfJob2.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean subwfAction3 = addRecordToWfActionTable(subwfJob3.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean subwfAction4 = addRecordToWfActionTable(subwfJob4.getId(), "1", WorkflowAction.Status.OK);
-        WorkflowActionBean subwfAction5 = addRecordToWfActionTable(subwfJob5.getId(), "1", WorkflowAction.Status.OK);
-
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfAction1GetCmd = new WorkflowActionGetJPAExecutor(wfAction1.getId());
-        WorkflowActionGetJPAExecutor wfAction2GetCmd = new WorkflowActionGetJPAExecutor(wfAction2.getId());
-        WorkflowActionGetJPAExecutor wfAction3GetCmd = new WorkflowActionGetJPAExecutor(wfAction3.getId());
-        WorkflowActionGetJPAExecutor wfAction4GetCmd = new WorkflowActionGetJPAExecutor(wfAction4.getId());
-        WorkflowActionGetJPAExecutor wfAction5GetCmd = new WorkflowActionGetJPAExecutor(wfAction5.getId());
-        WorkflowJobGetJPAExecutor subwfJob1GetCmd = new WorkflowJobGetJPAExecutor(subwfJob1.getId());
-        WorkflowJobGetJPAExecutor subwfJob2GetCmd = new WorkflowJobGetJPAExecutor(subwfJob2.getId());
-        WorkflowJobGetJPAExecutor subwfJob3GetCmd = new WorkflowJobGetJPAExecutor(subwfJob3.getId());
-        WorkflowJobGetJPAExecutor subwfJob4GetCmd = new WorkflowJobGetJPAExecutor(subwfJob4.getId());
-        WorkflowJobGetJPAExecutor subwfJob5GetCmd = new WorkflowJobGetJPAExecutor(subwfJob5.getId());
-        WorkflowActionGetJPAExecutor subwfAction1GetCmd = new WorkflowActionGetJPAExecutor(subwfAction1.getId());
-        WorkflowActionGetJPAExecutor subwfAction2GetCmd = new WorkflowActionGetJPAExecutor(subwfAction2.getId());
-        WorkflowActionGetJPAExecutor subwfAction3GetCmd = new WorkflowActionGetJPAExecutor(subwfAction3.getId());
-        WorkflowActionGetJPAExecutor subwfAction4GetCmd = new WorkflowActionGetJPAExecutor(subwfAction4.getId());
-        WorkflowActionGetJPAExecutor subwfAction5GetCmd = new WorkflowActionGetJPAExecutor(subwfAction5.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction1 = jpaService.execute(wfAction1GetCmd);
-        wfAction2 = jpaService.execute(wfAction2GetCmd);
-        wfAction3 = jpaService.execute(wfAction3GetCmd);
-        wfAction4 = jpaService.execute(wfAction4GetCmd);
-        wfAction5 = jpaService.execute(wfAction5GetCmd);
-        subwfJob1 = jpaService.execute(subwfJob1GetCmd);
-        subwfJob2 = jpaService.execute(subwfJob2GetCmd);
-        subwfJob3 = jpaService.execute(subwfJob3GetCmd);
-        subwfJob4 = jpaService.execute(subwfJob4GetCmd);
-        subwfJob5 = jpaService.execute(subwfJob5GetCmd);
-        subwfAction1 = jpaService.execute(subwfAction1GetCmd);
-        subwfAction2 = jpaService.execute(subwfAction2GetCmd);
-        subwfAction3 = jpaService.execute(subwfAction3GetCmd);
-        subwfAction4 = jpaService.execute(subwfAction4GetCmd);
-        subwfAction5 = jpaService.execute(subwfAction5GetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction1.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction2.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction3.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction4.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction5.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob1.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob2.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob3.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob4.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob5.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction1.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction2.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction3.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction4.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction5.getStatus());
-
-        new PurgeXCommand(7, 1, 1, 3).call();
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction1GetCmd);
-            fail("Workflow Action 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction2GetCmd);
-            fail("Workflow Action 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction3GetCmd);
-            fail("Workflow Action 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction4GetCmd);
-            fail("Workflow Action 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfAction5GetCmd);
-            fail("Workflow Action 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfJob1GetCmd);
-            fail("SubWorkflow Job 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfJob2GetCmd);
-            fail("SubWorkflow Job 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfJob3GetCmd);
-            fail("SubWorkflow Job 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfJob4GetCmd);
-            fail("SubWorkflow Job 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfJob5GetCmd);
-            fail("SubWorkflow Job 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfAction1GetCmd);
-            fail("SubWorkflow Action 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfAction2GetCmd);
-            fail("SubWorkflow Action 2 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testSucceededWorkflowOverTheLimitRunningSubWorkflows() throws Exception {
+        WorkflowJobBean wfJob = addRecordToWfJobTableForNegCase(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
+        WorkflowJobBean[] subwfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
+        WorkflowActionBean[] subwfActions = new WorkflowActionBean[TEST_CHILD_NUM];
+        for (int i=0; i<TEST_CHILD_NUM; ++i) {
+            wfActions[i] = addRecordToWfActionTable(wfJob.getId(), String.format("action%d",i), WorkflowAction.Status.OK);
+            subwfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
+                    wfJob.getId());
+            subwfActions[i] = addRecordToWfActionTable(subwfJobs[i].getId(), String.format("action%d",i),
+                    WorkflowAction.Status.RUNNING);
+        }
+
+        purgeWithDefaultParameters();
+
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionsNotPurged(wfActions);
+        assertWorkflowJobsNotPurged(subwfJobs);
+        assertWorkflowActionsNotPurged(subwfActions);
+    }
 
-        try {
-            jpaService.execute(subwfAction3GetCmd);
-            fail("SubWorkflow Action 3 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+    /**
+     * Test : The subworkflow should get purged, and the workflow parent should get purged --> both will get purged
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testSucceededWorkflowSucceededSubWorkflow() throws Exception {
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
 
-        try {
-            jpaService.execute(subwfAction4GetCmd);
-            fail("SubWorkflow Action 4 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
-        try {
-            jpaService.execute(subwfAction5GetCmd);
-            fail("SubWorkflow Action 5 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        assertWorkflowJobPurged(wfJob);
+        assertWorkflowActionPurged(wfAction);
+        assertWorkflowJobPurged(subwfJob);
+        assertWorkflowActionPurged(subwfAction);
+    }
+
+    /**
+     * Test : The subworkflow should get purged, and the workflow parent should get purged --> both will get purged
+     * There are more subworkflow children than the limit
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testPurgeableWorkflowOverTheLimitSucceededSubWorkflows() throws Exception {
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
+        WorkflowJobBean[] subwfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
+        WorkflowActionBean[] subwfActions = new WorkflowActionBean[TEST_CHILD_NUM];
+        for (int i=0; i<TEST_CHILD_NUM; ++i) {
+            wfActions[i] = addRecordToWfActionTable(wfJob.getId(), String.format("action%d", i), WorkflowAction.Status.OK);
+            subwfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
+                    wfJob.getId());
+            subwfActions[i] = addRecordToWfActionTable(subwfJobs[i].getId(), "1", WorkflowAction.Status.OK);
+        }
+
+        purgeWithSpecialParameters(LIMIT_3_ITEMS);
+
+        assertWorkflowJobPurged(wfJob);
+        assertWorkflowActionsPurged(wfActions);
+        assertWorkflowJobsPurged(subwfJobs);
+        assertWorkflowActionsPurged(subwfActions);
     }
 
     /**
@@ -2868,7 +740,7 @@ public class TestPurgeXCommand extends XDataTestCase {
      *
      * @throws Exception if unable to create workflow job or action bean
      */
-    public void testPurgeWFWithPurgeableSubWFNonPurgeableSubSubWF() throws Exception {
+    public void testPurgeableWorkflowPurgeableSubWorkflowUnpurgeableSubSubWorkflow() throws Exception {
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
         WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
@@ -2878,18 +750,14 @@ public class TestPurgeXCommand extends XDataTestCase {
                 subwfJob.getId());
         WorkflowActionBean subsubwfAction = addRecordToWfActionTable(subsubwfJob.getId(), "1", WorkflowAction.Status.RUNNING);
 
-        final int wfOlderThanDays = 7;
-        final int coordOlderThanDays = 1;
-        final int bundleOlderThanDays = 1;
-        final int limit = 10;
-        new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, bundleOlderThanDays, limit).call();
-
-        assertWorkflowNotPurged(wfJob.getId());
-        assertWorkflowActionNotPurged(wfAction.getId());
-        assertWorkflowNotPurged(subwfJob.getId());
-        assertWorkflowActionNotPurged(subwfAction.getId());
-        assertWorkflowNotPurged(subsubwfJob.getId());
-        assertWorkflowActionNotPurged(subsubwfAction.getId());
+        purgeWithDefaultParameters();
+
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
+        assertWorkflowJobNotPurged(subwfJob);
+        assertWorkflowActionNotPurged(subwfAction);
+        assertWorkflowJobNotPurged(subsubwfJob);
+        assertWorkflowActionNotPurged(subsubwfAction);
     }
 
     /**
@@ -2899,7 +767,7 @@ public class TestPurgeXCommand extends XDataTestCase {
      *
      * @throws Exception if unable to create workflow job or action bean
      */
-    public void testPurgeWFWithNonPurgeableSubWFPurgeableSubSubWF() throws Exception {
+    public void testPurgeableWorkflowUnpurgeableSubWorkflowPurgeableSubSubWorkflow() throws Exception {
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
         WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
@@ -2909,18 +777,14 @@ public class TestPurgeXCommand extends XDataTestCase {
                 subwfJob.getId());
         WorkflowActionBean subsubwfAction = addRecordToWfActionTable(subsubwfJob.getId(), "1", WorkflowAction.Status.OK);
 
-        final int wfOlderThanDays = 7;
-        final int coordOlderThanDays = 1;
-        final int bundleOlderThanDays = 1;
-        final int limit = 10;
-        new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, bundleOlderThanDays, limit).call();
-
-        assertWorkflowNotPurged(wfJob.getId());
-        assertWorkflowActionNotPurged(wfAction.getId());
-        assertWorkflowNotPurged(subwfJob.getId());
-        assertWorkflowActionNotPurged(subwfAction.getId());
-        assertWorkflowNotPurged(subsubwfJob.getId());
-        assertWorkflowActionNotPurged(subsubwfAction.getId());
+        purgeWithDefaultParameters();
+
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
+        assertWorkflowJobNotPurged(subwfJob);
+        assertWorkflowActionNotPurged(subwfAction);
+        assertWorkflowJobNotPurged(subsubwfJob);
+        assertWorkflowActionNotPurged(subsubwfAction);
     }
 
     /**
@@ -2930,7 +794,7 @@ public class TestPurgeXCommand extends XDataTestCase {
      *
      * @throws Exception if unable to create workflow job or action bean
      */
-    public void testPurgeWFWithPurgeableSubWFPurgeableSubSubWF() throws Exception {
+    public void testPurgeableWorkflowPurgeableSubWorkflowPurgeableSubSubWorkflow() throws Exception {
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
         WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
@@ -2943,130 +807,49 @@ public class TestPurgeXCommand extends XDataTestCase {
         WorkflowActionBean subsub1wfAction = addRecordToWfActionTable(subsub1wfJob.getId(), "1", WorkflowAction.Status.OK);
         WorkflowActionBean subsub2wfAction = addRecordToWfActionTable(subsub2wfJob.getId(), "1", WorkflowAction.Status.OK);
 
-        final int wfOlderThanDays = 7;
-        final int coordOlderThanDays = 1;
-        final int bundleOlderThanDays = 1;
-        final int limit = 10;
-        new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, bundleOlderThanDays, limit).call();
-
-        assertWorkflowPurged(wfJob.getId());
-        assertWorkflowActionPurged(wfAction.getId());
-        assertWorkflowPurged(subwfJob.getId());
-        assertWorkflowActionPurged(subwfAction.getId());
-        assertWorkflowPurged(subsub1wfJob.getId());
-        assertWorkflowPurged(subsub2wfJob.getId());
-        assertWorkflowActionPurged(subsub1wfAction.getId());
-        assertWorkflowActionPurged(subsub2wfAction.getId());
-    }
-
-    private void assertWorkflowNotPurged(String workflowId) {
-        try {
-            JPAExecutor jpaExecutor = new WorkflowJobGetJPAExecutor(workflowId);
-            jpaService.execute(jpaExecutor);
-        } catch (JPAExecutorException je) {
-            fail("Workflow job "+workflowId+" should not have been purged");
-        }
-    }
-
-    private void assertWorkflowPurged(String workflowId) {
-        try {
-            JPAExecutor jpaExecutor = new WorkflowJobGetJPAExecutor(workflowId);
-            jpaService.execute(jpaExecutor);
-            fail("Workflow job "+workflowId+" should have been purged");
-        } catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-    }
-
-    private void assertWorkflowActionNotPurged(String workflowActionId) {
-        try {
-            JPAExecutor jpaExecutor = new WorkflowActionGetJPAExecutor(workflowActionId);
-            jpaService.execute(jpaExecutor);
-        } catch (JPAExecutorException je) {
-            fail("Workflow action "+workflowActionId+" should not have been purged");
-        }
-    }
+        purgeWithDefaultParameters();
 
-    private void assertWorkflowActionPurged(String workflowActionId) {
-        try {
-            JPAExecutor jpaExecutor = new WorkflowActionGetJPAExecutor(workflowActionId);
-            jpaService.execute(jpaExecutor);
-            fail("Workflow job "+workflowActionId+" should have been purged");
-        } catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        assertWorkflowJobPurged(wfJob);
+        assertWorkflowActionPurged(wfAction);
+        assertWorkflowJobPurged(subwfJob);
+        assertWorkflowActionPurged(subwfAction);
+        assertWorkflowJobPurged(subsub1wfJob);
+        assertWorkflowJobPurged(subsub2wfJob);
+        assertWorkflowActionPurged(subsub1wfAction);
+        assertWorkflowActionPurged(subsub2wfAction);
     }
 
     /**
      * Test : The subworkflow should get purged, and the workflow parent should get purged --> both will get purged
      * Subworkflow has terminated, last modified time is known, but end time is null
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeWFWithEndedSubWFWithNullEndTimeValidLastModifiedTime() throws Exception {
-        final JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testPurgeableWorkflowPurgeableSubWorkflowWithNullEndTimeValidLastModifiedTime() throws Exception {
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
-
         WorkflowJobBean subwfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
                 wfJob.getId());
+        subwfJob1.setLastModifiedTime(wfJob.getEndTime());
         subwfJob1.setEndTime(null);
-        WorkflowActionBean subwfAction1 = addRecordToWfActionTable(subwfJob1.getId(), "1", WorkflowAction.Status.OK);
-
-        final WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        final WorkflowActionGetJPAExecutor wfAction1GetCmd = new WorkflowActionGetJPAExecutor(wfAction1.getId());
-        final WorkflowJobGetJPAExecutor subwfJob1GetCmd = new WorkflowJobGetJPAExecutor(subwfJob1.getId());
-        final WorkflowActionGetJPAExecutor subwfAction1GetCmd = new WorkflowActionGetJPAExecutor(subwfAction1.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction1 = jpaService.execute(wfAction1GetCmd);
-        subwfJob1 = jpaService.execute(subwfJob1GetCmd);
-        subwfAction1 = jpaService.execute(subwfAction1GetCmd);
-
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction1.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob1.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction1.getStatus());
-
         final QueryExecutor<WorkflowJobBean, WorkflowJobQueryExecutor.WorkflowJobQuery> workflowJobQueryExecutor =
                 WorkflowJobQueryExecutor.getInstance();
         workflowJobQueryExecutor.executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW, subwfJob1);
 
-        final int wfOlderThanDays = 7;
-        final int coordOlderThanDays = 1;
-        final int bundleOlderThanDays = 1;
-        final int limit = 3;
-        new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, bundleOlderThanDays, limit).call();
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+        purgeWithSpecialParameters(LIMIT_3_ITEMS);
 
-        try {
-            jpaService.execute(subwfJob1GetCmd);
-            fail("SubWorkflow Job 1 should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+        assertWorkflowJobPurged(wfJob);
+        assertWorkflowActionPurged(wfAction1);
+        assertWorkflowJobPurged(subwfJob1);
     }
 
     /**
      * Test : The subworkflow and workflow should get purged, but the coordinator parent shouldn't get purged --> none will get
      * purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeCoordWithWFChildWithSubWF1() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testUnpurgeableCoordinatorPurgeableWorkflowPurgeableSubWorkflow() throws Exception {
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
@@ -3076,81 +859,23 @@ public class TestPurgeXCommand extends XDataTestCase {
         CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
 
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        WorkflowJobGetJPAExecutor subwfJobGetCmd = new WorkflowJobGetJPAExecutor(subwfJob.getId());
-        WorkflowActionGetJPAExecutor subwfActionGetCmd = new WorkflowActionGetJPAExecutor(subwfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        subwfJob = jpaService.execute(subwfJobGetCmd);
-        subwfAction = jpaService.execute(subwfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-
-        new PurgeXCommand(7, getNumDaysToNotBePurged(coordJob.getLastModifiedTime()), 1, 10).call();
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job should not have been purged");
-        }
+        purgeWithSpecialParameters(coordJob);
 
-        try {
-            jpaService.execute(subwfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action should not have been purged");
-        }
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionNotPurged(coordAction);
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
+        assertWorkflowJobNotPurged(subwfJob);
+        assertWorkflowActionNotPurged(subwfAction);
     }
 
     /**
      * Test : The subworkflow and workflow should not get purged, but the coordinator parent should get purged --> none will get
      * purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeCoordWithWFChildWithSubWF2() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testPurgeableCoordinatorUnpurgeableWorkflowPurgeableSubWorkflow() throws Exception {
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
@@ -3160,81 +885,23 @@ public class TestPurgeXCommand extends XDataTestCase {
         CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
 
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        WorkflowJobGetJPAExecutor subwfJobGetCmd = new WorkflowJobGetJPAExecutor(subwfJob.getId());
-        WorkflowActionGetJPAExecutor subwfActionGetCmd = new WorkflowActionGetJPAExecutor(subwfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        subwfJob = jpaService.execute(subwfJobGetCmd);
-        subwfAction = jpaService.execute(subwfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-
-        new PurgeXCommand(getNumDaysToNotBePurged(wfJob.getEndTime()), 7, 1, 10).call();
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job should not have been purged");
-        }
+        purgeWithSpecialParameters(wfJob);
 
-        try {
-            jpaService.execute(subwfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action should not have been purged");
-        }
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionNotPurged(coordAction);
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
+        assertWorkflowJobNotPurged(subwfJob);
+        assertWorkflowActionNotPurged(subwfAction);
     }
 
     /**
      * Test : The subworkflow and workflow should get purged, and the coordinator parent should get purged --> all will get
      * purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeCoordWithWFChildWithSubWF3() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testPurgeableCoordinatorPurgeableWorkflowPurgeableSubWorkflow() throws Exception {
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
@@ -3244,75 +911,14 @@ public class TestPurgeXCommand extends XDataTestCase {
         CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
 
-        final WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        final WorkflowJobGetJPAExecutor subwfJobGetCmd = new WorkflowJobGetJPAExecutor(subwfJob.getId());
-        final WorkflowActionGetJPAExecutor subwfActionGetCmd = new WorkflowActionGetJPAExecutor(subwfAction.getId());
-        final CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        final CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        subwfJob = jpaService.execute(subwfJobGetCmd);
-        subwfAction = jpaService.execute(subwfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-
-        new PurgeXCommand(7, 7, 1, 10).call();
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-            fail("Coordinator Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-            fail("Coordinator Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-            fail("Workflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfJobGetCmd);
-            fail("SubWorkflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
-        try {
-            jpaService.execute(subwfActionGetCmd);
-            fail("SubWorkflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        assertCoordinatorJobPurged(coordJob);
+        assertCoordinatorActionPurged(coordAction);
+        assertWorkflowJobPurged(wfJob);
+        assertWorkflowActionPurged(wfAction);
+        assertWorkflowJobPurged(subwfJob);
+        assertWorkflowActionPurged(subwfAction);
     }
 
 
@@ -3323,12 +929,10 @@ public class TestPurgeXCommand extends XDataTestCase {
      * Coordinator parent finished Workflow and its subworkflow have terminated, last modified time is known, but end time is null
      * for workflow and subworkflow
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeCoordWithWFChildWithSubWFNullEndTimeValidLastModifiedTime() throws Exception {
-        final JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testPurgeableCoordinatorPurgeableWorkflowPurgeableSubWorkflowWithNullEndTimeValidLastModifiedTime()
+            throws Exception {
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
 
         final QueryExecutor<WorkflowJobBean, WorkflowJobQueryExecutor.WorkflowJobQuery> workflowJobQueryExecutor =
@@ -3349,198 +953,54 @@ public class TestPurgeXCommand extends XDataTestCase {
         CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
 
-        final WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        final WorkflowJobGetJPAExecutor subwfJobGetCmd = new WorkflowJobGetJPAExecutor(subwfJob.getId());
-        final WorkflowActionGetJPAExecutor subwfActionGetCmd = new WorkflowActionGetJPAExecutor(subwfAction.getId());
-        final CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        final CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        subwfJob = jpaService.execute(subwfJobGetCmd);
-        subwfAction = jpaService.execute(subwfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-
-        final int wfOlderThanDays = 7;
-        final int coordOlderThanDays = 7;
-        final int bundleOlderThanDays = 1;
-        final int limit = 10;
-        new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, bundleOlderThanDays, limit).call();
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-            fail("Coordinator Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-            fail("Coordinator Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-            fail("Workflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfJobGetCmd);
-            fail("SubWorkflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
-        try {
-            jpaService.execute(subwfActionGetCmd);
-            fail("SubWorkflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        assertCoordinatorJobPurged(coordJob);
+        assertCoordinatorActionPurged(coordAction);
+        assertWorkflowJobPurged(wfJob);
+        assertWorkflowActionPurged(wfAction);
+        assertWorkflowJobPurged(subwfJob);
+        assertWorkflowActionPurged(subwfAction);
     }
 
     /**
      * Test : The subworkflow, workflow, and coordinator should get purged, but the bundle parent shouldn't get purged --> none will
      * get purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeBundleWithCoordChildWithWFChildWithSubWF1() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testUnpurgeableBundlePurgeableCoordiatorPurgeableWorkflowPurgeableSubWorkflow() throws Exception {
         BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
         WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
-                wfJob.getId());
-        WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
-        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
-        BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
-                Job.Status.SUCCEEDED);
-
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        WorkflowJobGetJPAExecutor subwfJobGetCmd = new WorkflowJobGetJPAExecutor(subwfJob.getId());
-        WorkflowActionGetJPAExecutor subwfActionGetCmd = new WorkflowActionGetJPAExecutor(subwfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-        BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId());
-        BundleActionGetJPAExecutor bundleActionGetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob.getAppName());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        subwfJob = jpaService.execute(subwfJobGetCmd);
-        subwfAction = jpaService.execute(subwfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        bundleJob = jpaService.execute(bundleJobGetCmd);
-        bundleAction = jpaService.execute(bundleActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleJob.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction.getStatus());
-
-        new PurgeXCommand(7, 7, getNumDaysToNotBePurged(bundleJob.getLastModifiedTime()), 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(bundleActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
+                wfJob.getId());
+        WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
+        BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
+                Job.Status.SUCCEEDED);
 
-        try {
-            jpaService.execute(subwfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job should not have been purged");
-        }
+        purgeWithSpecialParameters(bundleJob);
 
-        try {
-            jpaService.execute(subwfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action should not have been purged");
-        }
+        assertBundleJobNotPurged(bundleJob);
+        assertBundleActionNotPurged(bundleAction);
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionNotPurged(coordAction);
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
+        assertWorkflowJobNotPurged(subwfJob);
+        assertWorkflowActionNotPurged(subwfAction);
     }
 
     /**
      * Test : The subworkflow, workflow, and coordinator should not get purged, but the bundle parent should get purged --> none
      * will get purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeBundleWithCoordChildWithWFChildWithSubWF2() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testPurgeableBundleUnpurgeableCoordinatorUnpurgebleWorkflowPurgeableSubWorkflow() throws Exception {
         BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
@@ -3553,102 +1013,25 @@ public class TestPurgeXCommand extends XDataTestCase {
         BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
                 Job.Status.SUCCEEDED);
 
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        WorkflowJobGetJPAExecutor subwfJobGetCmd = new WorkflowJobGetJPAExecutor(subwfJob.getId());
-        WorkflowActionGetJPAExecutor subwfActionGetCmd = new WorkflowActionGetJPAExecutor(subwfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-        BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId());
-        BundleActionGetJPAExecutor bundleActionGetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob.getAppName());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        subwfJob = jpaService.execute(subwfJobGetCmd);
-        subwfAction = jpaService.execute(subwfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        bundleJob = jpaService.execute(bundleJobGetCmd);
-        bundleAction = jpaService.execute(bundleActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleJob.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction.getStatus());
-
-        new PurgeXCommand(getNumDaysToNotBePurged(wfJob.getEndTime()),
-                getNumDaysToNotBePurged(coordJob.getLastModifiedTime()), 7, 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(bundleActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job should not have been purged");
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action should not have been purged");
-        }
-
-        try {
-            jpaService.execute(subwfJobGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job should not have been purged");
-        }
+        purgeWithSpecialParameters(wfJob, coordJob);
 
-        try {
-            jpaService.execute(subwfActionGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action should not have been purged");
-        }
+        assertBundleJobNotPurged(bundleJob);
+        assertBundleActionNotPurged(bundleAction);
+        assertCoordinatorJobNotPurged(coordJob);
+        assertCoordinatorActionNotPurged(coordAction);
+        assertWorkflowJobNotPurged(wfJob);
+        assertWorkflowActionNotPurged(wfAction);
+        assertWorkflowJobNotPurged(subwfJob);
+        assertWorkflowActionNotPurged(subwfAction);
     }
 
     /**
      * Test : The subworkflow, workflow, and coordinator should get purged, and the bundle parent should get purged --> all
      * will get purged
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeBundleWithCoordChildWithWFChildWithSubWF3() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
+    public void testPurgeableBundlePurgeableCoordinatorPurgeableWorkflowPurgeableSubWorkflow() throws Exception {
         BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
         CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
@@ -3661,106 +1044,24 @@ public class TestPurgeXCommand extends XDataTestCase {
         BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
                 Job.Status.SUCCEEDED);
 
-        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
-        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
-        WorkflowJobGetJPAExecutor subwfJobGetCmd = new WorkflowJobGetJPAExecutor(subwfJob.getId());
-        WorkflowActionGetJPAExecutor subwfActionGetCmd = new WorkflowActionGetJPAExecutor(subwfAction.getId());
-        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
-        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
-        BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId());
-        BundleActionGetJPAExecutor bundleActionGetCmd = new BundleActionGetJPAExecutor(bundleJob.getId(), coordJob.getAppName());
-
-        wfJob = jpaService.execute(wfJobGetCmd);
-        wfAction = jpaService.execute(wfActionGetCmd);
-        subwfJob = jpaService.execute(subwfJobGetCmd);
-        subwfAction = jpaService.execute(subwfActionGetCmd);
-        coordJob = jpaService.execute(coordJobGetCmd);
-        coordAction = jpaService.execute(coordActionGetCmd);
-        bundleJob = jpaService.execute(bundleJobGetCmd);
-        bundleAction = jpaService.execute(bundleActionGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJob.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfAction.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleJob.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleAction.getStatus());
-
-        new PurgeXCommand(7, 7, 7, 10).call();
-
-        try {
-            jpaService.execute(bundleJobGetCmd);
-            fail("Bundle Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(bundleActionGetCmd);
-            fail("Bundle Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordJobGetCmd);
-            fail("Coordinator Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(coordActionGetCmd);
-            fail("Coordinator Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfJobGetCmd);
-            fail("Workflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(wfActionGetCmd);
-            fail("Workflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
-
-        try {
-            jpaService.execute(subwfJobGetCmd);
-            fail("SubWorkflow Job should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+        purgeWithDefaultParameters();
 
-        try {
-            jpaService.execute(subwfActionGetCmd);
-            fail("SubWorkflow Action should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+        assertBundleJobPurged(bundleJob);
+        assertBundleActionPurged(bundleAction);
+        assertCoordinatorJobPurged(coordJob);
+        assertCoordinatorActionPurged(coordAction);
+        assertWorkflowJobPurged(wfJob);
+        assertWorkflowActionPurged(wfAction);
+        assertWorkflowJobPurged(subwfJob);
+        assertWorkflowActionPurged(subwfAction);
     }
 
     /**
      * Test : Test purging a lot of jobs (with different parent-child relationships and purge-eligibility) in one go
      *
-     * @throws Exception
+     * @throws Exception if cannot insert records to the database
      */
-    public void testPurgeLotsOfJobs() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
+    public void testComplexExample() throws Exception {
         assertNotNull(jpaService);
 
         /* Job relationships:
@@ -3780,8 +1081,10 @@ public class TestPurgeXCommand extends XDataTestCase {
         wfJobF                  yes     no
             subwfJobF           no      ^
         */
-        BundleJobBean bundleJobA = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-05-01T01:00Z"));
-        BundleJobBean bundleJobB = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-06-01T01:00Z"));
+        BundleJobBean bundleJobA = addRecordToBundleJobTable(Job.Status.SUCCEEDED,
+                DateUtils.parseDateOozieTZ("2011-05-01T01:00Z"));
+        BundleJobBean bundleJobB = addRecordToBundleJobTable(Job.Status.SUCCEEDED,
+                DateUtils.parseDateOozieTZ("2011-06-01T01:00Z"));
         CoordinatorJobBean coordJobA = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         setLastModifiedTime(coordJobA, "2011-03-01T01:00Z");
         CoordinatorJobBean coordJobB = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
@@ -3833,325 +1136,283 @@ public class TestPurgeXCommand extends XDataTestCase {
         BundleActionBean bundleActionB = addRecordToBundleActionTable(bundleJobB.getId(), coordJobB.getId(), coordJobB.getAppName(),
                 0, Job.Status.SUCCEEDED);
 
-        WorkflowJobGetJPAExecutor wfJobAGetCmd = new WorkflowJobGetJPAExecutor(wfJobA.getId());
-        WorkflowJobGetJPAExecutor wfJobBGetCmd = new WorkflowJobGetJPAExecutor(wfJobB.getId());
-        WorkflowJobGetJPAExecutor wfJobCGetCmd = new WorkflowJobGetJPAExecutor(wfJobC.getId());
-        WorkflowJobGetJPAExecutor wfJobDGetCmd = new WorkflowJobGetJPAExecutor(wfJobD.getId());
-        WorkflowJobGetJPAExecutor wfJobEGetCmd = new WorkflowJobGetJPAExecutor(wfJobE.getId());
-        WorkflowJobGetJPAExecutor wfJobFGetCmd = new WorkflowJobGetJPAExecutor(wfJobF.getId());
-        WorkflowActionGetJPAExecutor wfActionAGetCmd = new WorkflowActionGetJPAExecutor(wfActionA.getId());
-        WorkflowActionGetJPAExecutor wfActionBGetCmd = new WorkflowActionGetJPAExecutor(wfActionB.getId());
-        WorkflowActionGetJPAExecutor wfActionCGetCmd = new WorkflowActionGetJPAExecutor(wfActionC.getId());
-        WorkflowActionGetJPAExecutor wfActionDGetCmd = new WorkflowActionGetJPAExecutor(wfActionD.getId());
-        WorkflowActionGetJPAExecutor wfActionEGetCmd = new WorkflowActionGetJPAExecutor(wfActionE.getId());
-        WorkflowActionGetJPAExecutor wfActionFGetCmd = new WorkflowActionGetJPAExecutor(wfActionF.getId());
-        WorkflowJobGetJPAExecutor subwfJobAGetCmd = new WorkflowJobGetJPAExecutor(subwfJobA.getId());
-        WorkflowJobGetJPAExecutor subwfJobCGetCmd = new WorkflowJobGetJPAExecutor(subwfJobC.getId());
-        WorkflowJobGetJPAExecutor subwfJobFGetCmd = new WorkflowJobGetJPAExecutor(subwfJobF.getId());
-        WorkflowActionGetJPAExecutor subwfActionAGetCmd = new WorkflowActionGetJPAExecutor(subwfActionA.getId());
-        WorkflowActionGetJPAExecutor subwfActionCGetCmd = new WorkflowActionGetJPAExecutor(subwfActionC.getId());
-        WorkflowActionGetJPAExecutor subwfActionFGetCmd = new WorkflowActionGetJPAExecutor(subwfActionF.getId());
-        CoordJobGetJPAExecutor coordJobAGetCmd = new CoordJobGetJPAExecutor(coordJobA.getId());
-        CoordJobGetJPAExecutor coordJobBGetCmd = new CoordJobGetJPAExecutor(coordJobB.getId());
-        CoordJobGetJPAExecutor coordJobCGetCmd = new CoordJobGetJPAExecutor(coordJobC.getId());
-        CoordJobGetJPAExecutor coordJobDGetCmd = new CoordJobGetJPAExecutor(coordJobD.getId());
-        CoordActionGetJPAExecutor coordActionAGetCmd = new CoordActionGetJPAExecutor(coordActionA.getId());
-        CoordActionGetJPAExecutor coordActionBGetCmd = new CoordActionGetJPAExecutor(coordActionB.getId());
-        CoordActionGetJPAExecutor coordActionCGetCmd = new CoordActionGetJPAExecutor(coordActionC.getId());
-        CoordActionGetJPAExecutor coordActionDGetCmd = new CoordActionGetJPAExecutor(coordActionD.getId());
-        BundleJobGetJPAExecutor bundleJobAGetCmd = new BundleJobGetJPAExecutor(bundleJobA.getId());
-        BundleJobGetJPAExecutor bundleJobBGetCmd = new BundleJobGetJPAExecutor(bundleJobB.getId());
-        BundleActionGetJPAExecutor bundleActionAGetCmd = new BundleActionGetJPAExecutor(bundleJobA.getId(), coordJobA.getAppName());
-        BundleActionGetJPAExecutor bundleActionBGetCmd = new BundleActionGetJPAExecutor(bundleJobB.getId(), coordJobB.getAppName());
-
-        wfJobA = jpaService.execute(wfJobAGetCmd);
-        wfJobB = jpaService.execute(wfJobBGetCmd);
-        wfJobC = jpaService.execute(wfJobCGetCmd);
-        wfJobD = jpaService.execute(wfJobDGetCmd);
-        wfJobE = jpaService.execute(wfJobEGetCmd);
-        wfJobF = jpaService.execute(wfJobFGetCmd);
-        wfActionA = jpaService.execute(wfActionAGetCmd);
-        wfActionB = jpaService.execute(wfActionBGetCmd);
-        wfActionC = jpaService.execute(wfActionCGetCmd);
-        wfActionD = jpaService.execute(wfActionDGetCmd);
-        wfActionE = jpaService.execute(wfActionEGetCmd);
-        wfActionF = jpaService.execute(wfActionFGetCmd);
-        subwfJobA = jpaService.execute(subwfJobAGetCmd);
-        subwfJobC = jpaService.execute(subwfJobCGetCmd);
-        subwfJobF = jpaService.execute(subwfJobFGetCmd);
-        subwfActionA = jpaService.execute(subwfActionAGetCmd);
-        subwfActionC = jpaService.execute(subwfActionCGetCmd);
-        subwfActionF = jpaService.execute(subwfActionFGetCmd);
-        coordJobA = jpaService.execute(coordJobAGetCmd);
-        coordJobB = jpaService.execute(coordJobBGetCmd);
-        coordJobC = jpaService.execute(coordJobCGetCmd);
-        coordJobD = jpaService.execute(coordJobDGetCmd);
-        coordActionA = jpaService.execute(coordActionAGetCmd);
-        coordActionB = jpaService.execute(coordActionBGetCmd);
-        coordActionC = jpaService.execute(coordActionCGetCmd);
-        coordActionD = jpaService.execute(coordActionDGetCmd);
-        bundleJobA = jpaService.execute(bundleJobAGetCmd);
-        bundleJobB = jpaService.execute(bundleJobBGetCmd);
-        bundleActionA = jpaService.execute(bundleActionAGetCmd);
-        bundleActionB = jpaService.execute(bundleActionBGetCmd);
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJobA.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJobB.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJobC.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJobD.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJobE.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJobF.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfActionA.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfActionB.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfActionC.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfActionD.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfActionE.getStatus());
-        assertEquals(WorkflowAction.Status.OK, wfActionF.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJobA.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJobC.getStatus());
-        assertEquals(WorkflowJob.Status.SUCCEEDED, subwfJobF.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfActionA.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfActionC.getStatus());
-        assertEquals(WorkflowAction.Status.OK, subwfActionF.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJobA.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJobB.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJobC.getStatus());
-        assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJobD.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionA.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionB.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionC.getStatus());
-        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionD.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleJobA.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleJobB.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleActionA.getStatus());
-        assertEquals(BundleJobBean.Status.SUCCEEDED, bundleActionB.getStatus());
-
-        new PurgeXCommand(getNumDaysToNotBePurged(wfJobB.getEndTime()),
-                          getNumDaysToNotBePurged(coordJobC.getLastModifiedTime()),
-                          getNumDaysToNotBePurged(bundleJobB.getLastModifiedTime()),
-                          10).call();
-
-        try {
-            jpaService.execute(bundleJobAGetCmd);
-            fail("Bundle Job A should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+        purgeWithSpecialParameters(wfJobB, coordJobC, bundleJobB);
+
+        assertBundleJobPurged(bundleJobA);
+        assertBundleActionPurged(bundleActionA);
+        assertBundleJobNotPurged(bundleJobB);
+        assertBundleActionNotPurged(bundleActionB);
+        assertCoordinatorJobPurged(coordJobA);
+        assertCoordinatorActionPurged(coordActionA);
+        assertCoordinatorJobNotPurged(coordJobB);
+        assertCoordinatorActionNotPurged(coordActionB);
+        assertCoordinatorJobNotPurged(coordJobC);
+        assertCoordinatorActionNotPurged(coordActionC);
+        assertCoordinatorJobNotPurged(coordJobD);
+        assertCoordinatorActionNotPurged(coordActionD);
+        assertWorkflowJobPurged(wfJobA);
+        assertWorkflowActionPurged(wfActionA);
+        assertWorkflowJobNotPurged(wfJobB);
+        assertWorkflowActionNotPurged(wfActionB);
+        assertWorkflowJobNotPurged(wfJobC);
+        assertWorkflowActionNotPurged(wfActionC);
+        assertWorkflowJobNotPurged(wfJobD);
+        assertWorkflowActionNotPurged(wfActionD);
+        assertWorkflowJobPurged(wfJobE);
+        assertWorkflowActionPurged(wfActionE);
+        assertWorkflowJobNotPurged(wfJobF);
+        assertWorkflowActionNotPurged(wfActionF);
+        assertWorkflowJobPurged(subwfJobA);
+        assertWorkflowActionPurged(subwfActionA);
+        assertWorkflowJobNotPurged(subwfJobC);
+        assertWorkflowActionNotPurged(subwfActionC);
+        assertWorkflowJobNotPurged(subwfJobF);
+        assertWorkflowActionNotPurged(subwfActionF);
+    }
 
-        try {
-            jpaService.execute(bundleActionAGetCmd);
-            fail("Bundle Action A should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
+    private static class PurgeParameters {
+        private int workflowDays;
+        private int coordinatorDays;
+        private int bundleDays;
+        private int limit;
+        private boolean purgeOldCoordAction;
+
+        private static PurgeParameters createDefaultParameters() {
+            PurgeParameters purgeParameters = new PurgeParameters();
+            purgeParameters.workflowDays = 30;
+            purgeParameters.coordinatorDays = 7;
+            purgeParameters.bundleDays = 7;
+            purgeParameters.limit = 100;
+            purgeParameters.purgeOldCoordAction = false;
+            return purgeParameters;
+        }
+
+        private PurgeParameters withPurgeOldCordAction() {
+            purgeOldCoordAction = true;
+            return this;
+        }
+
+        private PurgeParameters withKeepingJobs(JsonBean... jsonBeans) {
+            for (JsonBean jsonBean : jsonBeans) {
+                if (jsonBean instanceof WorkflowJobBean) {
+                    workflowDays = getNumDaysToNotBePurged(((WorkflowJobBean)jsonBean).getEndTime());
+                }
+                else if (jsonBean instanceof CoordinatorJobBean) {
+                    coordinatorDays = getNumDaysToNotBePurged(((CoordinatorJobBean)jsonBean).getLastModifiedTime());
+                }
+                else if (jsonBean instanceof BundleJobBean) {
+                    bundleDays = getNumDaysToNotBePurged(((BundleJobBean)jsonBean).getLastModifiedTime());
+                }
+            }
+            return this;
+        }
+
+        private PurgeParameters withLimit(int limit) {
+            this.limit = limit;
+            return this;
         }
+    }
 
-        try {
-            jpaService.execute(bundleJobBGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Job B should not have been purged");
-        }
+    private void purgeWithDefaultParameters() throws CommandException {
+        purgeWithParameters(PurgeParameters.createDefaultParameters());
+    }
 
-        try {
-            jpaService.execute(bundleActionBGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Bundle Action B should not have been purged");
-        }
+    private void purgeWithSpecialParametersAndAlsoPurgeCoordActions(JsonBean... jsonBeans) throws CommandException {
+        purgeWithParameters(PurgeParameters.createDefaultParameters().withPurgeOldCordAction());
+    }
 
-        try {
-            jpaService.execute(coordJobAGetCmd);
-            fail("Coordinator Job A should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
-        }
+    private void purgeWithSpecialParameters(JsonBean... jsonBeans) throws CommandException {
+        purgeWithParameters(PurgeParameters.createDefaultParameters().withKeepingJobs(jsonBeans));
+    }
 
-        try {
-            jpaService.execute(coordActionAGetCmd);
-            fail("Coordinator Action A should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
-        }
+    private void purgeWithSpecialParameters(int limit, JsonBean... jsonBeans) throws CommandException {
+        purgeWithParameters(PurgeParameters.createDefaultParameters().withLimit(limit).withKeepingJobs(jsonBeans));
+    }
 
-        try {
-            jpaService.execute(coordJobBGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job B should not have been purged");
-        }
+    private void purgeWithParameters(PurgeParameters purgeParameters) throws CommandException {
+        new PurgeXCommand(purgeParameters.workflowDays, purgeParameters.coordinatorDays, purgeParameters.bundleDays,
+                purgeParameters.limit, purgeParameters.purgeOldCoordAction).call();
+    }
 
-        try {
-            jpaService.execute(coordActionBGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action B should not have been purged");
-        }
 
-        try {
-            jpaService.execute(coordJobCGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job C should not have been purged");
+    private void assertBundleActionsPurged(BundleActionBean... bundleActionBeans) {
+        for (BundleActionBean bean : bundleActionBeans) {
+            assertBundleActionPurged(bean);
         }
+    }
 
-        try {
-            jpaService.execute(coordActionCGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action C should not have been purged");
+    private void assertBundleActionsNotPurged(BundleActionBean... bundleActionBeans) {
+        for (BundleActionBean bean : bundleActionBeans) {
+            assertBundleActionNotPurged(bean);
         }
+    }
 
-        try {
-            jpaService.execute(coordJobDGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Job D should not have been purged");
+    private void assertCoordinatorJobsPurged(CoordinatorJobBean... coordinatorJobBeans) {
+        for (CoordinatorJobBean bean : coordinatorJobBeans) {
+            assertCoordinatorJobPurged(bean);
         }
+    }
 
-        try {
-            jpaService.execute(coordActionDGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Coordinator Action D should not have been purged");
+    private void assertCoordinatorJobsNotPurged(CoordinatorJobBean... coordinatorJobBeans) {
+        for (CoordinatorJobBean bean : coordinatorJobBeans) {
+            assertCoordinatorJobNotPurged(bean);
         }
+    }
 
-        try {
-            jpaService.execute(wfJobAGetCmd);
-            fail("Workflow Job A should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
+    private void assertCoordinatorActionsPurged(CoordinatorActionBean... coordinatorActionBeans) {
+        for (CoordinatorActionBean bean : coordinatorActionBeans) {
+            assertCoordinatorActionPurged(bean);
         }
+    }
 
-        try {
-            jpaService.execute(wfActionAGetCmd);
-            fail("Workflow Action A should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0605, je.getErrorCode());
+    private void assertCoordinatorActionsNotPurged(CoordinatorActionBean... coordinatorActionBeans) {
+        for (CoordinatorActionBean bean : coordinatorActionBeans) {
+            assertCoordinatorActionNotPurged(bean);
         }
+    }
 
-        try {
-            jpaService.execute(wfJobBGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job B should not have been purged");
+    private void assertWorkflowJobsPurged(WorkflowJobBean... workflowJobBeans) {
+        for (WorkflowJobBean bean : workflowJobBeans) {
+            assertWorkflowJobPurged(bean);
         }
+    }
 
-        try {
-            jpaService.execute(wfActionBGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action B should not have been purged");
+    private void assertWorkflowJobsNotPurged(WorkflowJobBean... workflowJobBeans) {
+        for (WorkflowJobBean bean : workflowJobBeans) {
+            assertWorkflowJobNotPurged(bean);
         }
+    }
 
-        try {
-            jpaService.execute(wfJobCGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job C should not have been purged");
+    private void assertWorkflowActionsPurged(WorkflowActionBean... workflowActionBeans) {
+        for (WorkflowActionBean bean : workflowActionBeans) {
+            assertWorkflowActionPurged(bean);
         }
+    }
 
-        try {
-            jpaService.execute(wfActionCGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action C should not have been purged");
+    private void assertWorkflowActionsNotPurged(WorkflowActionBean... workflowActionBeans) {
+        for (WorkflowActionBean bean : workflowActionBeans) {
+            assertWorkflowActionNotPurged(bean);
         }
+    }
 
+    private void assertWorkflowJobNotPurged(WorkflowJobBean workflowJobBean) {
         try {
-            jpaService.execute(wfJobDGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job D should not have been purged");
+            WorkflowJobGetJPAExecutor jpaExecutor = new WorkflowJobGetJPAExecutor(workflowJobBean.getId());
+            jpaService.execute(jpaExecutor);
+        } catch (JPAExecutorException je) {
+            fail("Workflow job "+workflowJobBean.getId()+" should not have been purged");
         }
+    }
 
+    private void assertWorkflowJobPurged(WorkflowJobBean workflowJobBean) {
         try {
-            jpaService.execute(wfActionDGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action D should not have been purged");
+            WorkflowJobGetJPAExecutor jpaExecutor = new WorkflowJobGetJPAExecutor(workflowJobBean.getId());
+            jpaService.execute(jpaExecutor);
+            fail("Workflow job "+workflowJobBean.getId()+" should have been purged");
+        } catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
         }
+    }
 
+    private void assertWorkflowActionNotPurged(WorkflowActionBean workflowActionBean) {
         try {
-            jpaService.execute(wfJobEGetCmd);
-            fail("Workflow Job E should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
+            WorkflowActionGetJPAExecutor jpaExecutor = new WorkflowActionGetJPAExecutor(workflowActionBean.getId());
+            jpaService.execute(jpaExecutor);
+        } catch (JPAExecutorException je) {
+            fail("Workflow action "+workflowActionBean.getId()+" should not have been purged");
         }
+    }
 
+    private void assertWorkflowActionPurged(WorkflowActionBean workflowActionBean) {
         try {
-            jpaService.execute(wfActionEGetCmd);
-            fail("Workflow Action E should have been purged");
-        }
-        catch (JPAExecutorException je) {
+            WorkflowActionGetJPAExecutor jpaExecutor = new WorkflowActionGetJPAExecutor(workflowActionBean.getId());
+            jpaService.execute(jpaExecutor);
+            fail("Workflow job "+workflowActionBean.getId()+" should have been purged");
+        } catch (JPAExecutorException je) {
             assertEquals(ErrorCode.E0605, je.getErrorCode());
         }
+    }
 
+    private void assertCoordinatorJobNotPurged(CoordinatorJobBean coordinatorJobBean) {
         try {
-            jpaService.execute(wfJobFGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Job F should not have been purged");
+            CoordJobGetJPAExecutor jpaExecutor = new CoordJobGetJPAExecutor(coordinatorJobBean.getId());
+            jpaService.execute(jpaExecutor);
+        } catch (JPAExecutorException je) {
+            fail("Coordinator job "+ coordinatorJobBean.getId()+" should not have been purged");
         }
+    }
 
+    private void assertCoordinatorJobPurged(CoordinatorJobBean coordinatorJobBean) {
         try {
-            jpaService.execute(wfActionFGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("Workflow Action F should not have been purged");
+            CoordJobGetJPAExecutor jpaExecutor = new CoordJobGetJPAExecutor(coordinatorJobBean.getId());
+            jpaService.execute(jpaExecutor);
+            fail("Coordinator job "+coordinatorJobBean.getId()+" should have been purged");
+        } catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
         }
+    }
 
+    private void assertCoordinatorActionNotPurged(CoordinatorActionBean coordinatorActionBean) {
         try {
-            jpaService.execute(subwfJobAGetCmd);
-            fail("SubWorkflow Job A should have been purged");
-        }
-        catch (JPAExecutorException je) {
-            assertEquals(ErrorCode.E0604, je.getErrorCode());
+            CoordActionGetJPAExecutor jpaExecutor = new CoordActionGetJPAExecutor(coordinatorActionBean.getId());
+            jpaService.execute(jpaExecutor);
+        } catch (JPAExecutorException je) {
+            fail("Coordinator action "+coordinatorActionBean.getId()+" should not have been purged");
         }
+    }
 
+    private void assertCoordinatorActionPurged(CoordinatorActionBean coordinatorActionBean) {
         try {
-            jpaService.execute(subwfActionAGetCmd);
-            fail("SubWorkflow Action A should have been purged");
-        }
-        catch (JPAExecutorException je) {
+            CoordActionGetJPAExecutor jpaExecutor = new CoordActionGetJPAExecutor(coordinatorActionBean.getId());
+            jpaService.execute(jpaExecutor);
+            fail("Coordinator action "+coordinatorActionBean.getId()+" should have been purged");
+        } catch (JPAExecutorException je) {
             assertEquals(ErrorCode.E0605, je.getErrorCode());
         }
+    }
 
+    private void assertBundleJobNotPurged(BundleJobBean bundleJobBean) {
         try {
-            jpaService.execute(subwfJobCGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job C should not have been purged");
+            BundleJobGetJPAExecutor jpaExecutor = new BundleJobGetJPAExecutor(bundleJobBean.getId());
+            jpaService.execute(jpaExecutor);
+        } catch (JPAExecutorException je) {
+            fail("Bundle job "+bundleJobBean.getId()+" should not have been purged");
         }
+    }
 
+    private void assertBundleJobPurged(BundleJobBean bundleJobBean) {
         try {
-            jpaService.execute(subwfActionCGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action C should not have been purged");
+            BundleJobGetJPAExecutor jpaExecutor = new BundleJobGetJPAExecutor(bundleJobBean.getId());
+            jpaService.execute(jpaExecutor);
+            fail("Bundle job "+bundleJobBean.getId()+" should have been purged");
+        } catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
         }
+    }
 
+    private void assertBundleActionNotPurged(BundleActionBean bundleActionBean) {
         try {
-            jpaService.execute(subwfJobFGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Job F should not have been purged");
+            BundleActionGetJPAExecutor jpaExecutor = new BundleActionGetJPAExecutor(bundleActionBean.getBundleId(),
+                    bundleActionBean.getCoordName());
+            jpaService.execute(jpaExecutor);
+        } catch (JPAExecutorException je) {
+            fail("Bundle action "+bundleActionBean.getBundleActionId()+" should not have been purged");
         }
+    }
 
+    private void assertBundleActionPurged(BundleActionBean bundleActionBean) {
         try {
-            jpaService.execute(subwfActionFGetCmd);
-        }
-        catch (JPAExecutorException je) {
-            fail("SubWorkflow Action F should not have been purged");
+            BundleActionGetJPAExecutor jpaExecutor = new BundleActionGetJPAExecutor(bundleActionBean.getBundleId(),
+                    bundleActionBean.getCoordName());
+            jpaService.execute(jpaExecutor);
+            fail("Bundle action "+bundleActionBean.getBundleActionId()+" should have been purged");
+        } catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
         }
     }
 
-    protected WorkflowJobBean addRecordToWfJobTableForNegCase(WorkflowJob.Status jobStatus,
+
+    private WorkflowJobBean addRecordToWfJobTableForNegCase(WorkflowJob.Status jobStatus,
             WorkflowInstance.Status instanceStatus) throws Exception {
         WorkflowApp app =
             new LiteWorkflowApp("testApp", "<workflow-app/>",
@@ -4170,8 +1431,6 @@ public class TestPurgeXCommand extends XDataTestCase {
         wfBean.setEndTime(new Date(System.currentTimeMillis() + (long)100*24*60*60*1000 + (long)2*60*60*1000));
 
         try {
-            JPAService jpaService = Services.get().get(JPAService.class);
-            assertNotNull(jpaService);
             WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(wfBean);
             jpaService.execute(wfInsertCmd);
         }
@@ -4202,7 +1461,6 @@ public class TestPurgeXCommand extends XDataTestCase {
         workflow.setStatus(jobStatus);
         workflow.setRun(0);
         workflow.setUser(conf.get(OozieClient.USER_NAME));
-        workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
         workflow.setWorkflowInstance(wfInstance);
         workflow.setStartTime(DateUtils.parseDateOozieTZ("2009-12-18T01:00Z"));
         workflow.setEndTime(DateUtils.parseDateOozieTZ("2009-12-18T03:00Z"));
@@ -4215,8 +1473,6 @@ public class TestPurgeXCommand extends XDataTestCase {
         CoordinatorJobBean coordJob = createCoordJob(status, pending, doneMatd);
         coordJob.setLastModifiedTime(DateUtils.parseDateOozieTZ("2009-12-18T01:00Z"));
         try {
-            JPAService jpaService = Services.get().get(JPAService.class);
-            assertNotNull(jpaService);
             CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob);
             jpaService.execute(coordInsertCmd);
         }
@@ -4229,12 +1485,10 @@ public class TestPurgeXCommand extends XDataTestCase {
         return coordJob;
     }
 
-    protected BundleJobBean addRecordToBundleJobTable(Job.Status jobStatus, Date lastModifiedTime) throws Exception {
+    private BundleJobBean addRecordToBundleJobTable(Job.Status jobStatus, Date lastModifiedTime) throws Exception {
         BundleJobBean bundle = createBundleJob(jobStatus, false);
         bundle.setLastModifiedTime(lastModifiedTime);
         try {
-            JPAService jpaService = Services.get().get(JPAService.class);
-            assertNotNull(jpaService);
             BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundle);
             jpaService.execute(bundleInsertjpa);
         }
@@ -4259,8 +1513,6 @@ public class TestPurgeXCommand extends XDataTestCase {
     public static CoordinatorJobBean setLastModifiedTime(CoordinatorJobBean job, String date) throws Exception {
         job.setLastModifiedTime(DateUtils.parseDateOozieTZ(date));
         try {
-            JPAService jpaService = Services.get().get(JPAService.class);
-            assertNotNull(jpaService);
             CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, job);
         }
         catch (JPAExecutorException je) {
@@ -4271,11 +1523,9 @@ public class TestPurgeXCommand extends XDataTestCase {
         return job;
     }
 
-    public static WorkflowJobBean setEndTime(WorkflowJobBean job, String date) throws Exception {
+    private static void setEndTime(WorkflowJobBean job, String date) throws Exception {
         job.setEndTime(DateUtils.parseDateOozieTZ(date));
         try {
-            JPAService jpaService = Services.get().get(JPAService.class);
-            assertNotNull(jpaService);
             WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, job);
         }
         catch (JPAExecutorException je) {
@@ -4283,6 +1533,5 @@ public class TestPurgeXCommand extends XDataTestCase {
             fail("Unable to update workflow job last modified time");
             throw je;
         }
-        return job;
     }
 }
diff --git a/release-log.txt b/release-log.txt
index 8735251..2b6664f 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.2.0 release (trunk - unreleased)
 
+OOZIE-3407 [tests] Cleanup TestPurgeXCommand (asalamon74 via andras.piros)
 OOZIE-3067 [core] Remove duplicate logic from ZKJobsConcurrencyService (dionusos via andras.piros)
 OOZIE-3186 [core] Oozie is unable to use configuration linked using jceks://file/ (dionusos via andras.piros)
 OOZIE-3194 [tools] Oozie should set proper permissions to sharelib after upload (dionusos via andras.piros)