You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2013/04/05 19:25:09 UTC

svn commit: r1465054 [3/4] - in /oozie/trunk: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/bundle/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java...

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java?rev=1465054&r1=1465053&r2=1465054&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java Fri Apr  5 17:25:08 2013
@@ -36,7 +36,6 @@ import org.apache.oozie.client.OozieClie
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.service.ActionService;
@@ -385,16 +384,14 @@ public class TestActionErrors extends XD
         conf.set("external-status", "ok");
         conf.set("error", errorType);
 
-        JPAService jpaService = Services.get().get(JPAService.class);
-        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
-        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1,
-                CoordinatorAction.Status.RUNNING, "coord-action-get.xml", "wfId", "RUNNING", 0);
+        final String jobId = engine.submitJob(conf, false);
 
-        final String jobId = engine.submitJob(conf, true);
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        final CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1,
+                CoordinatorAction.Status.RUNNING, "coord-action-get.xml", jobId, "RUNNING", 0);
 
-        coordAction.setExternalId(jobId);
-        CoordActionUpdateJPAExecutor coordActionUpdateExecutor = new CoordActionUpdateJPAExecutor(coordAction);
-        jpaService.execute(coordActionUpdateExecutor);
+        engine.start(jobId);
 
         waitFor(5000, new Predicate() {
             public boolean evaluate() throws Exception {
@@ -417,6 +414,13 @@ public class TestActionErrors extends XD
 
         assertEquals (WorkflowJob.Status.SUSPENDED, job.getStatus());
 
+        waitFor(5000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                CoordinatorActionBean coordAction2 = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(jobId));
+                return coordAction2.getStatus().equals(CoordinatorAction.Status.SUSPENDED);
+            }
+        });
+
         coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(jobId));
         assertEquals (CoordinatorAction.Status.SUSPENDED, coordAction.getStatus());
     }

Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsDeleteJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsDeleteJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsDeleteJPAExecutor.java Fri Apr  5 17:25:08 2013
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestBundleJobsDeleteJPAExecutor extends XDataTestCase {
+    Services services;
+    private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testDeleteBundles() throws Exception {
+        BundleJobBean jobA = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+        BundleActionBean actionA1 = addRecordToBundleActionTable(jobA.getId(), "actionA1", 0, Job.Status.SUCCEEDED);
+        BundleActionBean actionA2 = addRecordToBundleActionTable(jobA.getId(), "actionA2", 0, Job.Status.SUCCEEDED);
+
+        BundleJobBean jobB = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+        BundleActionBean actionB1 = addRecordToBundleActionTable(jobB.getId(), "actionB1", 0, Job.Status.SUCCEEDED);
+        BundleActionBean actionB2 = addRecordToBundleActionTable(jobB.getId(), "actionB2", 0, Job.Status.SUCCEEDED);
+
+        BundleJobBean jobC = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+        BundleActionBean actionC1 = addRecordToBundleActionTable(jobC.getId(), "actionC1", 0, Job.Status.SUCCEEDED);
+        BundleActionBean actionC2 = addRecordToBundleActionTable(jobC.getId(), "actionC2", 0, Job.Status.SUCCEEDED);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        List<String> deleteList = new ArrayList<String>();
+        deleteList.add(jobA.getId());
+        deleteList.add(jobB.getId());
+        deleteList.add(jobC.getId());
+        jpaService.execute(new BundleJobsDeleteJPAExecutor(deleteList));
+
+        try {
+            jpaService.execute(new BundleJobGetJPAExecutor(jobA.getId()));
+            fail("Bundle Job A should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionA1.getBundleId(), actionA1.getCoordName()));
+            fail("Bundle Action A1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionA2.getBundleId(), actionA2.getCoordName()));
+            fail("Bundle Action A2 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new BundleJobGetJPAExecutor(jobB.getId()));
+            fail("Bundle Job B should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionB1.getBundleId(), actionB1.getCoordName()));
+            fail("Bundle Action B1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionB2.getBundleId(), actionB2.getCoordName()));
+            fail("Bundle Action B2 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new BundleJobGetJPAExecutor(jobC.getId()));
+            fail("Bundle Job C should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionC1.getBundleId(), actionC1.getCoordName()));
+            fail("Bundle Action C1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionC2.getBundleId(), actionC2.getCoordName()));
+            fail("Bundle Action C2 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+    }
+
+    public void testDeleteBundlesRollback() throws Exception{
+        BundleJobBean jobA = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+        BundleActionBean actionA1 = addRecordToBundleActionTable(jobA.getId(), "actionA1", 0, Job.Status.SUCCEEDED);
+        BundleActionBean actionA2 = addRecordToBundleActionTable(jobA.getId(), "actionA2", 0, Job.Status.SUCCEEDED);
+
+        BundleJobBean jobB = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+        BundleActionBean actionB1 = addRecordToBundleActionTable(jobB.getId(), "actionB1", 0, Job.Status.SUCCEEDED);
+        BundleActionBean actionB2 = addRecordToBundleActionTable(jobB.getId(), "actionB2", 0, Job.Status.SUCCEEDED);
+
+        BundleJobBean jobC = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+        BundleActionBean actionC1 = addRecordToBundleActionTable(jobC.getId(), "actionC1", 0, Job.Status.SUCCEEDED);
+        BundleActionBean actionC2 = addRecordToBundleActionTable(jobC.getId(), "actionC2", 0, Job.Status.SUCCEEDED);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        try {
+            // set fault injection to true, so transaction is roll backed
+            setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+            setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+
+            List<String> deleteList = new ArrayList<String>();
+            deleteList.add(jobA.getId());
+            deleteList.add(jobB.getId());
+            deleteList.add(jobC.getId());
+
+            try {
+                jpaService.execute(new BundleJobsDeleteJPAExecutor(deleteList));
+                fail("Should have skipped commit for failover testing");
+            }
+            catch (RuntimeException re) {
+                assertEquals("Skipping Commit for Failover Testing", re.getMessage());
+            }
+        }
+        finally {
+            // Remove fault injection
+            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+        }
+
+        try {
+            jpaService.execute(new BundleJobGetJPAExecutor(jobA.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Bundle Job A should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionA1.getBundleId(), actionA1.getCoordName()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Bundle Action A1 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionA2.getBundleId(), actionA2.getCoordName()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Bundle Action A2 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new BundleJobGetJPAExecutor(jobB.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Bundle Job B should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionB1.getBundleId(), actionB1.getCoordName()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Bundle Action B1 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionB2.getBundleId(), actionB2.getCoordName()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Bundle Action B2 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new BundleJobGetJPAExecutor(jobC.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Bundle Job C should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionC1.getBundleId(), actionC1.getCoordName()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Bundle Action C1 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new BundleActionGetJPAExecutor(actionC2.getBundleId(), actionC2.getCoordName()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Bundle Action C2 should not have been deleted");
+        }
+    }
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsGetForPurgeJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsGetForPurgeJPAExecutor.java?rev=1465054&r1=1465053&r2=1465054&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsGetForPurgeJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsGetForPurgeJPAExecutor.java Fri Apr  5 17:25:08 2013
@@ -17,6 +17,9 @@
  */
 package org.apache.oozie.executor.jpa;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 
@@ -50,22 +53,24 @@ public class TestBundleJobsGetForPurgeJP
         super.tearDown();
     }
 
-    public void testBundleJobsGetForPurgeJPAExecutor() throws Exception {
-        this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
-
-        _testBundleJobsForPurge(10, 1);
-
-        this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-02T01:00Z"));
-        _testBundleJobsForPurge(10, 2);
-    }
-
-    private void _testBundleJobsForPurge(int olderThan, int expected) throws Exception {
+    public void testBundleJobsGetForPurgeJPAExecutorTooMany() throws Exception {
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
 
-        BundleJobsGetForPurgeJPAExecutor executor = new BundleJobsGetForPurgeJPAExecutor(olderThan, 50);
-        List<BundleJobBean> jobList = jpaService.execute(executor);
-        assertEquals(expected, jobList.size());
+        BundleJobBean job1 = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        BundleJobBean job2 = this.addRecordToBundleJobTable(Job.Status.FAILED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        BundleJobBean job3 = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        BundleJobBean job4 = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        BundleJobBean job5 = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+
+        List<String> list = new ArrayList<String>();
+        // Get the first 3
+        list.addAll(jpaService.execute(new BundleJobsGetForPurgeJPAExecutor(1, 3)));
+        assertEquals(3, list.size());
+        // Get the next 3 (though there's only 2 more)
+        list.addAll(jpaService.execute(new BundleJobsGetForPurgeJPAExecutor(1, 3, 3)));
+        assertEquals(5, list.size());
+        checkBundles(list, job1.getId(), job2.getId(), job3.getId(), job4.getId(), job5.getId());
     }
 
     protected BundleJobBean addRecordToBundleJobTable(Job.Status jobStatus, Date lastModifiedTime) throws Exception {
@@ -85,4 +90,13 @@ public class TestBundleJobsGetForPurgeJP
         return bundle;
     }
 
+    private void checkBundles(List<String> bundles, String... bundleJobIDs) {
+        assertEquals(bundleJobIDs.length, bundles.size());
+        Arrays.sort(bundleJobIDs);
+        Collections.sort(bundles);
+
+        for (int i = 0; i < bundleJobIDs.length; i++) {
+            assertEquals(bundleJobIDs[i], bundles.get(i));
+        }
+    }
 }

Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsCountNotForPurgeFromParentIdJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsCountNotForPurgeFromParentIdJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsCountNotForPurgeFromParentIdJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsCountNotForPurgeFromParentIdJPAExecutor.java Fri Apr  5 17:25:08 2013
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.command.TestPurgeXCommand;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordJobsCountNotForPurgeFromParentIdJPAExecutor extends XDataTestCase {
+    Services services;
+    private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testCount() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+        String bundleJobId = bundleJob.getId();
+        int days = 1;
+        assertEquals(0, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob1 = TestPurgeXCommand.setLastModifiedTime(coordJob1, "2009-12-01T01:00Z");
+        coordJob1.setAppName("coord1");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob1));
+        addRecordToBundleActionTable(bundleJobId, coordJob1.getId(), coordJob1.getAppName(), 0, Job.Status.SUCCEEDED);
+        days = 1;
+        assertEquals(0, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob1.getLastModifiedTime());
+        assertEquals(1, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.DONEWITHERROR, false, false);
+        coordJob2 = TestPurgeXCommand.setLastModifiedTime(coordJob2, "2009-11-01T01:00Z");
+        coordJob2.setAppName("coord2");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob2));
+        addRecordToBundleActionTable(bundleJobId, coordJob2.getId(), coordJob2.getAppName(), 0, Job.Status.DONEWITHERROR);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob1.getLastModifiedTime());
+        assertEquals(1, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob2.getLastModifiedTime());
+        assertEquals(2, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.FAILED, false, false);
+        coordJob3 = TestPurgeXCommand.setLastModifiedTime(coordJob3, "2009-10-01T01:00Z");
+        coordJob3.setAppName("coord3");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob3));
+        addRecordToBundleActionTable(bundleJobId, coordJob3.getId(), coordJob3.getAppName(), 0, Job.Status.FAILED);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob2.getLastModifiedTime());
+        assertEquals(2, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob3.getLastModifiedTime());
+        assertEquals(3, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob4 = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false);
+        coordJob4 = TestPurgeXCommand.setLastModifiedTime(coordJob4, "2009-09-01T01:00Z");
+        coordJob4.setAppName("coord4");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob4));
+        addRecordToBundleActionTable(bundleJobId, coordJob4.getId(), coordJob4.getAppName(), 0, Job.Status.KILLED);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob3.getLastModifiedTime());
+        assertEquals(3, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob4.getLastModifiedTime());
+        assertEquals(4, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob5 = addRecordToCoordJobTable(CoordinatorJob.Status.PAUSED, false, false);
+        coordJob5 = TestPurgeXCommand.setLastModifiedTime(coordJob5, "2009-08-01T01:00Z");
+        coordJob5.setAppName("coord5");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob5));
+        addRecordToBundleActionTable(bundleJobId, coordJob5.getId(), coordJob5.getAppName(), 0, Job.Status.PAUSED);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob4.getLastModifiedTime());
+        assertEquals(5, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob5.getLastModifiedTime());
+        assertEquals(5, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob6 = addRecordToCoordJobTable(CoordinatorJob.Status.PAUSEDWITHERROR, false, false);
+        coordJob5 = TestPurgeXCommand.setLastModifiedTime(coordJob6, "2009-07-01T01:00Z");
+        coordJob5.setAppName("coord6");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob6));
+        addRecordToBundleActionTable(bundleJobId, coordJob6.getId(), coordJob6.getAppName(), 0, Job.Status.PAUSEDWITHERROR);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob5.getLastModifiedTime());
+        assertEquals(6, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob6.getLastModifiedTime());
+        assertEquals(6, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob7 = addRecordToCoordJobTable(CoordinatorJob.Status.PREMATER, false, false);
+        coordJob5 = TestPurgeXCommand.setLastModifiedTime(coordJob7, "2009-06-01T01:00Z");
+        coordJob5.setAppName("coord7");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob6));
+        addRecordToBundleActionTable(bundleJobId, coordJob7.getId(), coordJob7.getAppName(), 0, Job.Status.PREMATER);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob6.getLastModifiedTime());
+        assertEquals(7, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob7.getLastModifiedTime());
+        assertEquals(7, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob8 = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false);
+        coordJob8 = TestPurgeXCommand.setLastModifiedTime(coordJob8, "2009-05-01T01:00Z");
+        coordJob8.setAppName("coord8");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob8));
+        addRecordToBundleActionTable(bundleJobId, coordJob8.getId(), coordJob8.getAppName(), 0, Job.Status.PREP);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob7.getLastModifiedTime());
+        assertEquals(8, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob8.getLastModifiedTime());
+        assertEquals(8, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob9 = addRecordToCoordJobTable(CoordinatorJob.Status.PREPPAUSED, false, false);
+        coordJob9 = TestPurgeXCommand.setLastModifiedTime(coordJob9, "2009-04-01T01:00Z");
+        coordJob9.setAppName("coord9");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob9));
+        addRecordToBundleActionTable(bundleJobId, coordJob9.getId(), coordJob9.getAppName(), 0, Job.Status.PREPPAUSED);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob8.getLastModifiedTime());
+        assertEquals(9, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob9.getLastModifiedTime());
+        assertEquals(9, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob10 = addRecordToCoordJobTable(CoordinatorJob.Status.PREPSUSPENDED, false, false);
+        coordJob10 = TestPurgeXCommand.setLastModifiedTime(coordJob10, "2009-03-01T01:00Z");
+        coordJob10.setAppName("coord10");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob10));
+        addRecordToBundleActionTable(bundleJobId, coordJob10.getId(), coordJob10.getAppName(), 0, Job.Status.PREPSUSPENDED);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob9.getLastModifiedTime());
+        assertEquals(10, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob10.getLastModifiedTime());
+        assertEquals(10, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob11 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        coordJob11 = TestPurgeXCommand.setLastModifiedTime(coordJob11, "2009-02-01T01:00Z");
+        coordJob11.setAppName("coord11");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob11));
+        addRecordToBundleActionTable(bundleJobId, coordJob11.getId(), coordJob11.getAppName(), 0, Job.Status.RUNNING);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob10.getLastModifiedTime());
+        assertEquals(11, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob11.getLastModifiedTime());
+        assertEquals(11, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob12 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNINGWITHERROR, false, false);
+        coordJob12 = TestPurgeXCommand.setLastModifiedTime(coordJob12, "2009-01-01T01:00Z");
+        coordJob12.setAppName("coord12");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob12));
+        addRecordToBundleActionTable(bundleJobId, coordJob12.getId(), coordJob12.getAppName(), 0, Job.Status.RUNNINGWITHERROR);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob11.getLastModifiedTime());
+        assertEquals(12, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob12.getLastModifiedTime());
+        assertEquals(12, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob13 = addRecordToCoordJobTable(CoordinatorJob.Status.SUSPENDED, false, false);
+        coordJob13 = TestPurgeXCommand.setLastModifiedTime(coordJob13, "2008-12-01T01:00Z");
+        coordJob13.setAppName("coord13");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob13));
+        addRecordToBundleActionTable(bundleJobId, coordJob13.getId(), coordJob13.getAppName(), 0, Job.Status.SUSPENDED);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob12.getLastModifiedTime());
+        assertEquals(13, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob13.getLastModifiedTime());
+        assertEquals(13, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+        CoordinatorJobBean coordJob14 = addRecordToCoordJobTable(CoordinatorJob.Status.SUSPENDEDWITHERROR, false, false);
+        coordJob14 = TestPurgeXCommand.setLastModifiedTime(coordJob14, "2008-11-01T01:00Z");
+        coordJob14.setAppName("coord14");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob14));
+        addRecordToBundleActionTable(bundleJobId, coordJob14.getId(), coordJob14.getAppName(), 0, Job.Status.SUSPENDEDWITHERROR);
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob13.getLastModifiedTime());
+        assertEquals(14, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob14.getLastModifiedTime());
+        assertEquals(14, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+    }
+}

Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java Fri Apr  5 17:25:08 2013
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordJobsDeleteJPAExecutor extends XDataTestCase {
+    Services services;
+    private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testDeleteCoords() throws Exception {
+        CoordinatorJobBean jobA = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorActionBean actionA1 = addRecordToCoordActionTable(jobA.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean actionA2 = addRecordToCoordActionTable(jobA.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+
+        CoordinatorJobBean jobB = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorActionBean actionB1 = addRecordToCoordActionTable(jobB.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean actionB2 = addRecordToCoordActionTable(jobB.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+
+        CoordinatorJobBean jobC = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorActionBean actionC1 = addRecordToCoordActionTable(jobC.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean actionC2 = addRecordToCoordActionTable(jobC.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        List<String> deleteList = new ArrayList<String>();
+        deleteList.add(jobA.getId());
+        deleteList.add(jobB.getId());
+        deleteList.add(jobC.getId());
+        jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteList));
+
+        try {
+            jpaService.execute(new CoordJobGetJPAExecutor(jobA.getId()));
+            fail("Coordinator Job A should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionA1.getId()));
+            fail("Coordinator Action A1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionA2.getId()));
+            fail("Coordinator Action A2 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new CoordJobGetJPAExecutor(jobB.getId()));
+            fail("Coordinator Job B should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionB1.getId()));
+            fail("Coordinator Action B1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionB2.getId()));
+            fail("Coordinator Action B2 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new CoordJobGetJPAExecutor(jobC.getId()));
+            fail("Coordinator Job C should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionC1.getId()));
+            fail("Coordinator Action C1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionC2.getId()));
+            fail("Coordinator Action C2 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+    }
+
+    public void testDeleteCoordsRollback() throws Exception{
+        CoordinatorJobBean jobA = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorActionBean actionA1 = addRecordToCoordActionTable(jobA.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean actionA2 = addRecordToCoordActionTable(jobA.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+
+        CoordinatorJobBean jobB = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorActionBean actionB1 = addRecordToCoordActionTable(jobB.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean actionB2 = addRecordToCoordActionTable(jobB.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+
+        CoordinatorJobBean jobC = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorActionBean actionC1 = addRecordToCoordActionTable(jobC.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean actionC2 = addRecordToCoordActionTable(jobC.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        try {
+            // set fault injection to true, so transaction is roll backed
+            setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+            setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+
+            List<String> deleteList = new ArrayList<String>();
+            deleteList.add(jobA.getId());
+            deleteList.add(jobB.getId());
+            deleteList.add(jobC.getId());
+
+            try {
+                jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteList));
+                fail("Should have skipped commit for failover testing");
+            }
+            catch (RuntimeException re) {
+                assertEquals("Skipping Commit for Failover Testing", re.getMessage());
+            }
+        }
+        finally {
+            // Remove fault injection
+            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+        }
+
+        try {
+            jpaService.execute(new CoordJobGetJPAExecutor(jobA.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Job A should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionA1.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Action A1 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionA2.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Action A2 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new CoordJobGetJPAExecutor(jobB.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Job B should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionB1.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Action B1 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionB2.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Action B2 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new CoordJobGetJPAExecutor(jobC.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Job C should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionC1.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Action C1 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(actionC2.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Action C2 should not have been deleted");
+        }
+    }
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetForPurgeJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetForPurgeJPAExecutor.java?rev=1465054&r1=1465053&r2=1465054&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetForPurgeJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetForPurgeJPAExecutor.java Fri Apr  5 17:25:08 2013
@@ -19,6 +19,9 @@ package org.apache.oozie.executor.jpa;
 
 import java.io.IOException;
 import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
@@ -61,22 +64,51 @@ public class TestCoordJobsGetForPurgeJPA
         super.tearDown();
     }
 
-    public void testCoordJobsGetForPurgeJPAExecutor() throws Exception {
-        String jobId = "00000-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
-        insertJob(jobId, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
-        _testCoordJobsForPurge(10, 1);
+    public void testCoordJobsGetForPurgeJPAExecutorWithParent() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        String jobId1 = "00001-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+        insertJob(jobId1, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        String jobId2 = "00002-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+        CoordinatorJobBean job2 = insertJob(jobId2, CoordinatorJob.Status.SUCCEEDED,
+                DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        job2.setBundleId("some_bundle_parent_id");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(job2));
+
+        CoordJobsGetForPurgeJPAExecutor executor = new CoordJobsGetForPurgeJPAExecutor(10, 50);
+        List<String> jobList = jpaService.execute(executor);
+        // job2 shouldn't be in the list because it has a parent
+        assertEquals(1, jobList.size());
+        assertEquals(jobId1, jobList.get(0));
     }
 
-    private void _testCoordJobsForPurge(int olderThan, int expected) throws Exception {
+    public void testCoordJobsGetForPurgeJPAExecutorTooMany() throws Exception {
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
 
-        CoordJobsGetForPurgeJPAExecutor executor = new CoordJobsGetForPurgeJPAExecutor(olderThan, 50);
-        List<CoordinatorJobBean> jobList = jpaService.execute(executor);
-        assertEquals(expected, jobList.size());
+        String jobId1 = "00001-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+        insertJob(jobId1, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        String jobId2 = "00002-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+        insertJob(jobId2, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        String jobId3 = "00003-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+        insertJob(jobId3, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        String jobId4 = "00004-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+        insertJob(jobId4, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+        String jobId5 = "00005-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+        insertJob(jobId5, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+
+        List<String> list = new ArrayList<String>();
+        // Get the first 3
+        list.addAll(jpaService.execute(new CoordJobsGetForPurgeJPAExecutor(1, 3)));
+        assertEquals(3, list.size());
+        // Get the next 3 (though there's only 2 more)
+        list.addAll(jpaService.execute(new CoordJobsGetForPurgeJPAExecutor(1, 3, 3)));
+        assertEquals(5, list.size());
+        checkCoordinators(list, jobId1, jobId2, jobId3, jobId4, jobId5);
     }
 
-    private void insertJob(String jobId, CoordinatorJob.Status status, Date d) throws Exception {
+    private CoordinatorJobBean insertJob(String jobId, CoordinatorJob.Status status, Date d) throws Exception {
         Path appPath = new Path(getFsTestCaseDir(), "coord");
         String appXml = getCoordJobXml(appPath);
 
@@ -120,6 +152,7 @@ public class TestCoordJobsGetForPurgeJPA
             fail("Unable to insert the test job record to table");
             throw ce;
         }
+        return coordJob;
     }
 
     private String getCoordJobXml(Path appPath) {
@@ -152,4 +185,14 @@ public class TestCoordJobsGetForPurgeJPA
 
         return conf;
     }
+
+    private void checkCoordinators(List<String> coords, String... coordJobIDs) {
+        assertEquals(coordJobIDs.length, coords.size());
+        Arrays.sort(coordJobIDs);
+        Collections.sort(coords);
+
+        for (int i = 0; i < coordJobIDs.length; i++) {
+            assertEquals(coordJobIDs[i], coords.get(i));
+        }
+    }
 }

Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetFromParentIdJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetFromParentIdJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetFromParentIdJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetFromParentIdJPAExecutor.java Fri Apr  5 17:25:08 2013
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordJobsGetFromParentIdJPAExecutor extends XDataTestCase {
+    Services services;
+    private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testGetBundleParent() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        BundleJobBean bundleJobA = addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+        BundleJobBean bundleJobB = addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+        CoordinatorJobBean coordJobA1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorJobBean coordJobA2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJobA2.setAppName("something_different");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJobA2));
+        CoordinatorJobBean coordJobB = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorActionBean coordActionA1 = addRecordToCoordActionTable(coordJobA1.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean coordActionA2 = addRecordToCoordActionTable(coordJobA2.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean coordActionB = addRecordToCoordActionTable(coordJobB.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        BundleActionBean bundleActionA1 = addRecordToBundleActionTable(bundleJobA.getId(), coordJobA1.getId(),
+                coordJobA1.getAppName(), 0, Job.Status.SUCCEEDED);
+        BundleActionBean bundleActionA2 = addRecordToBundleActionTable(bundleJobA.getId(), coordJobA2.getId(),
+                coordJobA2.getAppName(), 0, Job.Status.SUCCEEDED);
+        BundleActionBean bundleActionB = addRecordToBundleActionTable(bundleJobB.getId(), coordJobB.getId(),
+                coordJobB.getAppName(), 0, Job.Status.SUCCEEDED);
+
+        List<String> children = new ArrayList<String>();
+        children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleJobA.getId(), 10)));
+        checkChildren(children, coordJobA1.getId(), coordJobA2.getId());
+
+        children = new ArrayList<String>();
+        children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleJobB.getId(), 10)));
+        checkChildren(children, coordJobB.getId());
+    }
+
+    public void testGetBundleParentTooMany() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+        CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob1.setAppName("coordJob1");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob1));
+        CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob2.setAppName("coordJob2");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob2));
+        CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob3.setAppName("coordJob3");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob3));
+        CoordinatorJobBean coordJob4 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob4.setAppName("coordJob4");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob4));
+        CoordinatorJobBean coordJob5 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob5.setAppName("coordJob5");
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob5));
+        CoordinatorActionBean coordAction1 = addRecordToCoordActionTable(coordJob1.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean coordAction2 = addRecordToCoordActionTable(coordJob2.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean coordAction3 = addRecordToCoordActionTable(coordJob3.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean coordAction4 = addRecordToCoordActionTable(coordJob4.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean coordAction5 = addRecordToCoordActionTable(coordJob5.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        BundleActionBean bundleAction1 = addRecordToBundleActionTable(bundleJob.getId(), coordJob1.getId(),
+                coordJob1.getAppName(), 0, Job.Status.SUCCEEDED);
+        BundleActionBean bundleAction2 = addRecordToBundleActionTable(bundleJob.getId(), coordJob2.getId(),
+                coordJob2.getAppName(), 0, Job.Status.SUCCEEDED);
+        BundleActionBean bundleAction3 = addRecordToBundleActionTable(bundleJob.getId(), coordJob3.getId(),
+                coordJob3.getAppName(), 0, Job.Status.SUCCEEDED);
+        BundleActionBean bundleAction4 = addRecordToBundleActionTable(bundleJob.getId(), coordJob4.getId(),
+                coordJob4.getAppName(), 0, Job.Status.SUCCEEDED);
+        BundleActionBean bundleAction5 = addRecordToBundleActionTable(bundleJob.getId(), coordJob5.getId(),
+                coordJob5.getAppName(), 0, Job.Status.SUCCEEDED);
+
+        List<String> children = new ArrayList<String>();
+        // Get the first 3
+        children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleJob.getId(), 3)));
+        assertEquals(3, children.size());
+        // Get the next 3 (though there's only 2 more)
+        children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleJob.getId(), 3, 3)));
+        assertEquals(5, children.size());
+        checkChildren(children, coordJob1.getId(), coordJob2.getId(), coordJob3.getId(), coordJob4.getId(), coordJob5.getId());
+    }
+
+    private void checkChildren(List<String> children, String... coordJobIDs) {
+        assertEquals(coordJobIDs.length, children.size());
+        Arrays.sort(coordJobIDs);
+        Collections.sort(children);
+
+        for (int i = 0; i < coordJobIDs.length; i++) {
+            assertEquals(coordJobIDs[i], children.get(i));
+        }
+    }
+}

Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java Fri Apr  5 17:25:08 2013
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.TestPurgeXCommand;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor extends XDataTestCase {
+    Services services;
+    private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testCount() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        String coordJobId = coordJob.getId();
+        int days = 1;
+        assertEquals(0, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, coordJobId);
+        wfJob1 = TestPurgeXCommand.setEndTime(wfJob1, "2009-12-01T01:00Z");
+        days = 1;
+        assertEquals(0, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime());
+        assertEquals(1, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED, coordJobId);
+        wfJob2 = TestPurgeXCommand.setEndTime(wfJob2, "2009-11-01T01:00Z");
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime());
+        assertEquals(1, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime());
+        assertEquals(2, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED, coordJobId);
+        wfJob3 = TestPurgeXCommand.setEndTime(wfJob3, "2009-10-01T01:00Z");
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime());
+        assertEquals(2, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime());
+        assertEquals(3, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob4 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP, coordJobId);
+        wfJob4 = TestPurgeXCommand.setEndTime(wfJob4, "2009-09-01T01:00Z");
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime());
+        assertEquals(4, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime());
+        assertEquals(4, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob5 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING, coordJobId);
+        wfJob5 = TestPurgeXCommand.setEndTime(wfJob5, "2009-08-01T01:00Z");
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime());
+        assertEquals(5, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime());
+        assertEquals(5, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+        WorkflowJobBean wfJob6 = addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED, coordJobId);
+        wfJob6 = TestPurgeXCommand.setEndTime(wfJob6, "2009-07-01T01:00Z");
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime());
+        assertEquals(6, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob6.getEndTime());
+        assertEquals(6, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+    }
+}

Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsDeleteJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsDeleteJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsDeleteJPAExecutor.java Fri Apr  5 17:25:08 2013
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsDeleteJPAExecutor extends XDataTestCase {
+    Services services;
+    private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testDeleteWorkflows() throws Exception {
+        WorkflowJobBean jobA = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean actionA1 = this.addRecordToWfActionTable(jobA.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean actionA2 = this.addRecordToWfActionTable(jobA.getId(), "2", WorkflowAction.Status.OK);
+
+        WorkflowJobBean jobB = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean actionB1 = this.addRecordToWfActionTable(jobB.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean actionB2 = this.addRecordToWfActionTable(jobB.getId(), "2", WorkflowAction.Status.OK);
+
+        WorkflowJobBean jobC = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean actionC1 = this.addRecordToWfActionTable(jobC.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean actionC2 = this.addRecordToWfActionTable(jobC.getId(), "2", WorkflowAction.Status.OK);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        List<String> deleteList = new ArrayList<String>();
+        deleteList.add(jobA.getId());
+        deleteList.add(jobB.getId());
+        deleteList.add(jobC.getId());
+        jpaService.execute(new WorkflowJobsDeleteJPAExecutor(deleteList));
+
+        try {
+            jpaService.execute(new WorkflowJobGetJPAExecutor(jobA.getId()));
+            fail("Workflow Job A should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionA1.getId()));
+            fail("Workflow Action A1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionA2.getId()));
+            fail("Workflow Action A2 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new WorkflowJobGetJPAExecutor(jobB.getId()));
+            fail("Workflow Job B should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionB1.getId()));
+            fail("Workflow Action B1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionB2.getId()));
+            fail("Workflow Action B2 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new WorkflowJobGetJPAExecutor(jobC.getId()));
+            fail("Workflow Job C should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionC1.getId()));
+            fail("Workflow Action C1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionC2.getId()));
+            fail("Workflow Action C2 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+    }
+
+    public void testDeleteWorkflowsRollback() throws Exception{
+        WorkflowJobBean jobA = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean actionA1 = this.addRecordToWfActionTable(jobA.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean actionA2 = this.addRecordToWfActionTable(jobA.getId(), "2", WorkflowAction.Status.OK);
+
+        WorkflowJobBean jobB = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean actionB1 = this.addRecordToWfActionTable(jobB.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean actionB2 = this.addRecordToWfActionTable(jobB.getId(), "2", WorkflowAction.Status.OK);
+
+        WorkflowJobBean jobC = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean actionC1 = this.addRecordToWfActionTable(jobC.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean actionC2 = this.addRecordToWfActionTable(jobC.getId(), "2", WorkflowAction.Status.OK);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        try {
+            // set fault injection to true, so transaction is roll backed
+            setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+            setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+
+            List<String> deleteList = new ArrayList<String>();
+            deleteList.add(jobA.getId());
+            deleteList.add(jobB.getId());
+            deleteList.add(jobC.getId());
+
+            try {
+                jpaService.execute(new WorkflowJobsDeleteJPAExecutor(deleteList));
+                fail("Should have skipped commit for failover testing");
+            }
+            catch (RuntimeException re) {
+                assertEquals("Skipping Commit for Failover Testing", re.getMessage());
+            }
+        }
+        finally {
+            // Remove fault injection
+            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+        }
+
+        try {
+            jpaService.execute(new WorkflowJobGetJPAExecutor(jobA.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Job A should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionA1.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Action A1 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionA2.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Action A2 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new WorkflowJobGetJPAExecutor(jobB.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Job B should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionB1.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Action B1 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionB2.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Action B2 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new WorkflowJobGetJPAExecutor(jobC.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Job C should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionC1.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Action C1 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(actionC2.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Action C2 should not have been deleted");
+        }
+    }
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetForPurgeJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetForPurgeJPAExecutor.java?rev=1465054&r1=1465053&r2=1465054&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetForPurgeJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetForPurgeJPAExecutor.java Fri Apr  5 17:25:08 2013
@@ -18,6 +18,9 @@
 package org.apache.oozie.executor.jpa;
 
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -54,19 +57,40 @@ public class TestWorkflowJobsGetForPurge
         super.tearDown();
     }
 
-    public void testWfJobsGetForPurge() throws Exception {
-        addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
-        _testGetJobsForPurge(1, 20);
+    public void testWfJobsGetForPurgeWithParent() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
+        WorkflowJobBean job3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        job3.setParentId(job1.getId());
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(job3));
+
+        List<String> list = new ArrayList<String>();
+        list.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(1, 20)));
+        // job3 shouldn't be in the list because it has a parent
+        checkWorkflows(list, job1.getId(), job2.getId());
     }
 
-    private void _testGetJobsForPurge(long olderThanDays, int limit) throws Exception {
+    public void testWfJobsGetForPurgeTooMany() throws Exception {
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
-        WorkflowJobsGetForPurgeJPAExecutor wfJobsForPurgeExe = new WorkflowJobsGetForPurgeJPAExecutor(olderThanDays, limit);
-        List<WorkflowJobBean> list = jpaService.execute(wfJobsForPurgeExe);
-        assertNotNull(list);
-        assertEquals(2, list.size());
+
+        WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
+        WorkflowJobBean job3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean job4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean job5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+
+        List<String> list = new ArrayList<String>();
+        // Get the first 3
+        list.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(1, 3)));
+        assertEquals(3, list.size());
+        // Get the next 3 (though there's only 2 more)
+        list.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(1, 3, 3)));
+        assertEquals(5, list.size());
+        checkWorkflows(list, job1.getId(), job2.getId(), job3.getId(), job4.getId(), job5.getId());
     }
 
     @Override
@@ -100,4 +124,13 @@ public class TestWorkflowJobsGetForPurge
         return wfBean;
     }
 
+    private void checkWorkflows(List<String> wfs, String... wfJobIDs) {
+        assertEquals(wfJobIDs.length, wfs.size());
+        Arrays.sort(wfJobIDs);
+        Collections.sort(wfs);
+
+        for (int i = 0; i < wfJobIDs.length; i++) {
+            assertEquals(wfJobIDs[i], wfs.get(i));
+        }
+    }
 }