You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2013/04/05 19:25:09 UTC
svn commit: r1465054 [3/4] - in /oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/bundle/
core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java...
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java?rev=1465054&r1=1465053&r2=1465054&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java Fri Apr 5 17:25:08 2013
@@ -36,7 +36,6 @@ import org.apache.oozie.client.OozieClie
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.service.ActionService;
@@ -385,16 +384,14 @@ public class TestActionErrors extends XD
conf.set("external-status", "ok");
conf.set("error", errorType);
- JPAService jpaService = Services.get().get(JPAService.class);
- CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
- CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", "wfId", "RUNNING", 0);
+ final String jobId = engine.submitJob(conf, false);
- final String jobId = engine.submitJob(conf, true);
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ final CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml", jobId, "RUNNING", 0);
- coordAction.setExternalId(jobId);
- CoordActionUpdateJPAExecutor coordActionUpdateExecutor = new CoordActionUpdateJPAExecutor(coordAction);
- jpaService.execute(coordActionUpdateExecutor);
+ engine.start(jobId);
waitFor(5000, new Predicate() {
public boolean evaluate() throws Exception {
@@ -417,6 +414,13 @@ public class TestActionErrors extends XD
assertEquals (WorkflowJob.Status.SUSPENDED, job.getStatus());
+ waitFor(5000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ CoordinatorActionBean coordAction2 = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(jobId));
+ return coordAction2.getStatus().equals(CoordinatorAction.Status.SUSPENDED);
+ }
+ });
+
coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(jobId));
assertEquals (CoordinatorAction.Status.SUSPENDED, coordAction.getStatus());
}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsDeleteJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsDeleteJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsDeleteJPAExecutor.java Fri Apr 5 17:25:08 2013
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestBundleJobsDeleteJPAExecutor extends XDataTestCase {
+ Services services;
+ private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testDeleteBundles() throws Exception {
+ BundleJobBean jobA = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+ BundleActionBean actionA1 = addRecordToBundleActionTable(jobA.getId(), "actionA1", 0, Job.Status.SUCCEEDED);
+ BundleActionBean actionA2 = addRecordToBundleActionTable(jobA.getId(), "actionA2", 0, Job.Status.SUCCEEDED);
+
+ BundleJobBean jobB = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+ BundleActionBean actionB1 = addRecordToBundleActionTable(jobB.getId(), "actionB1", 0, Job.Status.SUCCEEDED);
+ BundleActionBean actionB2 = addRecordToBundleActionTable(jobB.getId(), "actionB2", 0, Job.Status.SUCCEEDED);
+
+ BundleJobBean jobC = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+ BundleActionBean actionC1 = addRecordToBundleActionTable(jobC.getId(), "actionC1", 0, Job.Status.SUCCEEDED);
+ BundleActionBean actionC2 = addRecordToBundleActionTable(jobC.getId(), "actionC2", 0, Job.Status.SUCCEEDED);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ List<String> deleteList = new ArrayList<String>();
+ deleteList.add(jobA.getId());
+ deleteList.add(jobB.getId());
+ deleteList.add(jobC.getId());
+ jpaService.execute(new BundleJobsDeleteJPAExecutor(deleteList));
+
+ try {
+ jpaService.execute(new BundleJobGetJPAExecutor(jobA.getId()));
+ fail("Bundle Job A should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0604, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionA1.getBundleId(), actionA1.getCoordName()));
+ fail("Bundle Action A1 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionA2.getBundleId(), actionA2.getCoordName()));
+ fail("Bundle Action A2 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new BundleJobGetJPAExecutor(jobB.getId()));
+ fail("Bundle Job B should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0604, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionB1.getBundleId(), actionB1.getCoordName()));
+ fail("Bundle Action B1 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionB2.getBundleId(), actionB2.getCoordName()));
+ fail("Bundle Action B2 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new BundleJobGetJPAExecutor(jobC.getId()));
+ fail("Bundle Job C should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0604, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionC1.getBundleId(), actionC1.getCoordName()));
+ fail("Bundle Action C1 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionC2.getBundleId(), actionC2.getCoordName()));
+ fail("Bundle Action C2 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+ }
+
+ public void testDeleteBundlesRollback() throws Exception{
+ BundleJobBean jobA = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+ BundleActionBean actionA1 = addRecordToBundleActionTable(jobA.getId(), "actionA1", 0, Job.Status.SUCCEEDED);
+ BundleActionBean actionA2 = addRecordToBundleActionTable(jobA.getId(), "actionA2", 0, Job.Status.SUCCEEDED);
+
+ BundleJobBean jobB = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+ BundleActionBean actionB1 = addRecordToBundleActionTable(jobB.getId(), "actionB1", 0, Job.Status.SUCCEEDED);
+ BundleActionBean actionB2 = addRecordToBundleActionTable(jobB.getId(), "actionB2", 0, Job.Status.SUCCEEDED);
+
+ BundleJobBean jobC = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+ BundleActionBean actionC1 = addRecordToBundleActionTable(jobC.getId(), "actionC1", 0, Job.Status.SUCCEEDED);
+ BundleActionBean actionC2 = addRecordToBundleActionTable(jobC.getId(), "actionC2", 0, Job.Status.SUCCEEDED);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ try {
+ // set fault injection to true, so transaction is roll backed
+ setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+ setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+
+ List<String> deleteList = new ArrayList<String>();
+ deleteList.add(jobA.getId());
+ deleteList.add(jobB.getId());
+ deleteList.add(jobC.getId());
+
+ try {
+ jpaService.execute(new BundleJobsDeleteJPAExecutor(deleteList));
+ fail("Should have skipped commit for failover testing");
+ }
+ catch (RuntimeException re) {
+ assertEquals("Skipping Commit for Failover Testing", re.getMessage());
+ }
+ }
+ finally {
+ // Remove fault injection
+ FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+ }
+
+ try {
+ jpaService.execute(new BundleJobGetJPAExecutor(jobA.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Bundle Job A should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionA1.getBundleId(), actionA1.getCoordName()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Bundle Action A1 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionA2.getBundleId(), actionA2.getCoordName()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Bundle Action A2 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new BundleJobGetJPAExecutor(jobB.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Bundle Job B should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionB1.getBundleId(), actionB1.getCoordName()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Bundle Action B1 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionB2.getBundleId(), actionB2.getCoordName()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Bundle Action B2 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new BundleJobGetJPAExecutor(jobC.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Bundle Job C should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionC1.getBundleId(), actionC1.getCoordName()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Bundle Action C1 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new BundleActionGetJPAExecutor(actionC2.getBundleId(), actionC2.getCoordName()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Bundle Action C2 should not have been deleted");
+ }
+ }
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsGetForPurgeJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsGetForPurgeJPAExecutor.java?rev=1465054&r1=1465053&r2=1465054&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsGetForPurgeJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobsGetForPurgeJPAExecutor.java Fri Apr 5 17:25:08 2013
@@ -17,6 +17,9 @@
*/
package org.apache.oozie.executor.jpa;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -50,22 +53,24 @@ public class TestBundleJobsGetForPurgeJP
super.tearDown();
}
- public void testBundleJobsGetForPurgeJPAExecutor() throws Exception {
- this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
-
- _testBundleJobsForPurge(10, 1);
-
- this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-02T01:00Z"));
- _testBundleJobsForPurge(10, 2);
- }
-
- private void _testBundleJobsForPurge(int olderThan, int expected) throws Exception {
+ public void testBundleJobsGetForPurgeJPAExecutorTooMany() throws Exception {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- BundleJobsGetForPurgeJPAExecutor executor = new BundleJobsGetForPurgeJPAExecutor(olderThan, 50);
- List<BundleJobBean> jobList = jpaService.execute(executor);
- assertEquals(expected, jobList.size());
+ BundleJobBean job1 = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+ BundleJobBean job2 = this.addRecordToBundleJobTable(Job.Status.FAILED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+ BundleJobBean job3 = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+ BundleJobBean job4 = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+ BundleJobBean job5 = this.addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+
+ List<String> list = new ArrayList<String>();
+ // Get the first 3
+ list.addAll(jpaService.execute(new BundleJobsGetForPurgeJPAExecutor(1, 3)));
+ assertEquals(3, list.size());
+ // Get the next 3 (though there's only 2 more)
+ list.addAll(jpaService.execute(new BundleJobsGetForPurgeJPAExecutor(1, 3, 3)));
+ assertEquals(5, list.size());
+ checkBundles(list, job1.getId(), job2.getId(), job3.getId(), job4.getId(), job5.getId());
}
protected BundleJobBean addRecordToBundleJobTable(Job.Status jobStatus, Date lastModifiedTime) throws Exception {
@@ -85,4 +90,13 @@ public class TestBundleJobsGetForPurgeJP
return bundle;
}
+ private void checkBundles(List<String> bundles, String... bundleJobIDs) {
+ assertEquals(bundleJobIDs.length, bundles.size());
+ Arrays.sort(bundleJobIDs);
+ Collections.sort(bundles);
+
+ for (int i = 0; i < bundleJobIDs.length; i++) {
+ assertEquals(bundleJobIDs[i], bundles.get(i));
+ }
+ }
}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsCountNotForPurgeFromParentIdJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsCountNotForPurgeFromParentIdJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsCountNotForPurgeFromParentIdJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsCountNotForPurgeFromParentIdJPAExecutor.java Fri Apr 5 17:25:08 2013
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.command.TestPurgeXCommand;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordJobsCountNotForPurgeFromParentIdJPAExecutor extends XDataTestCase {
+ Services services;
+ private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testCount() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+ String bundleJobId = bundleJob.getId();
+ int days = 1;
+ assertEquals(0, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ coordJob1 = TestPurgeXCommand.setLastModifiedTime(coordJob1, "2009-12-01T01:00Z");
+ coordJob1.setAppName("coord1");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob1));
+ addRecordToBundleActionTable(bundleJobId, coordJob1.getId(), coordJob1.getAppName(), 0, Job.Status.SUCCEEDED);
+ days = 1;
+ assertEquals(0, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob1.getLastModifiedTime());
+ assertEquals(1, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.DONEWITHERROR, false, false);
+ coordJob2 = TestPurgeXCommand.setLastModifiedTime(coordJob2, "2009-11-01T01:00Z");
+ coordJob2.setAppName("coord2");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob2));
+ addRecordToBundleActionTable(bundleJobId, coordJob2.getId(), coordJob2.getAppName(), 0, Job.Status.DONEWITHERROR);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob1.getLastModifiedTime());
+ assertEquals(1, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob2.getLastModifiedTime());
+ assertEquals(2, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.FAILED, false, false);
+ coordJob3 = TestPurgeXCommand.setLastModifiedTime(coordJob3, "2009-10-01T01:00Z");
+ coordJob3.setAppName("coord3");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob3));
+ addRecordToBundleActionTable(bundleJobId, coordJob3.getId(), coordJob3.getAppName(), 0, Job.Status.FAILED);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob2.getLastModifiedTime());
+ assertEquals(2, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob3.getLastModifiedTime());
+ assertEquals(3, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob4 = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false);
+ coordJob4 = TestPurgeXCommand.setLastModifiedTime(coordJob4, "2009-09-01T01:00Z");
+ coordJob4.setAppName("coord4");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob4));
+ addRecordToBundleActionTable(bundleJobId, coordJob4.getId(), coordJob4.getAppName(), 0, Job.Status.KILLED);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob3.getLastModifiedTime());
+ assertEquals(3, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob4.getLastModifiedTime());
+ assertEquals(4, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob5 = addRecordToCoordJobTable(CoordinatorJob.Status.PAUSED, false, false);
+ coordJob5 = TestPurgeXCommand.setLastModifiedTime(coordJob5, "2009-08-01T01:00Z");
+ coordJob5.setAppName("coord5");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob5));
+ addRecordToBundleActionTable(bundleJobId, coordJob5.getId(), coordJob5.getAppName(), 0, Job.Status.PAUSED);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob4.getLastModifiedTime());
+ assertEquals(5, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob5.getLastModifiedTime());
+ assertEquals(5, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob6 = addRecordToCoordJobTable(CoordinatorJob.Status.PAUSEDWITHERROR, false, false);
+ coordJob5 = TestPurgeXCommand.setLastModifiedTime(coordJob6, "2009-07-01T01:00Z");
+ coordJob5.setAppName("coord6");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob6));
+ addRecordToBundleActionTable(bundleJobId, coordJob6.getId(), coordJob6.getAppName(), 0, Job.Status.PAUSEDWITHERROR);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob5.getLastModifiedTime());
+ assertEquals(6, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob6.getLastModifiedTime());
+ assertEquals(6, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob7 = addRecordToCoordJobTable(CoordinatorJob.Status.PREMATER, false, false);
+ coordJob5 = TestPurgeXCommand.setLastModifiedTime(coordJob7, "2009-06-01T01:00Z");
+ coordJob5.setAppName("coord7");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob6));
+ addRecordToBundleActionTable(bundleJobId, coordJob7.getId(), coordJob7.getAppName(), 0, Job.Status.PREMATER);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob6.getLastModifiedTime());
+ assertEquals(7, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob7.getLastModifiedTime());
+ assertEquals(7, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob8 = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false);
+ coordJob8 = TestPurgeXCommand.setLastModifiedTime(coordJob8, "2009-05-01T01:00Z");
+ coordJob8.setAppName("coord8");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob8));
+ addRecordToBundleActionTable(bundleJobId, coordJob8.getId(), coordJob8.getAppName(), 0, Job.Status.PREP);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob7.getLastModifiedTime());
+ assertEquals(8, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob8.getLastModifiedTime());
+ assertEquals(8, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob9 = addRecordToCoordJobTable(CoordinatorJob.Status.PREPPAUSED, false, false);
+ coordJob9 = TestPurgeXCommand.setLastModifiedTime(coordJob9, "2009-04-01T01:00Z");
+ coordJob9.setAppName("coord9");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob9));
+ addRecordToBundleActionTable(bundleJobId, coordJob9.getId(), coordJob9.getAppName(), 0, Job.Status.PREPPAUSED);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob8.getLastModifiedTime());
+ assertEquals(9, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob9.getLastModifiedTime());
+ assertEquals(9, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob10 = addRecordToCoordJobTable(CoordinatorJob.Status.PREPSUSPENDED, false, false);
+ coordJob10 = TestPurgeXCommand.setLastModifiedTime(coordJob10, "2009-03-01T01:00Z");
+ coordJob10.setAppName("coord10");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob10));
+ addRecordToBundleActionTable(bundleJobId, coordJob10.getId(), coordJob10.getAppName(), 0, Job.Status.PREPSUSPENDED);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob9.getLastModifiedTime());
+ assertEquals(10, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob10.getLastModifiedTime());
+ assertEquals(10, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob11 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ coordJob11 = TestPurgeXCommand.setLastModifiedTime(coordJob11, "2009-02-01T01:00Z");
+ coordJob11.setAppName("coord11");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob11));
+ addRecordToBundleActionTable(bundleJobId, coordJob11.getId(), coordJob11.getAppName(), 0, Job.Status.RUNNING);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob10.getLastModifiedTime());
+ assertEquals(11, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob11.getLastModifiedTime());
+ assertEquals(11, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob12 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNINGWITHERROR, false, false);
+ coordJob12 = TestPurgeXCommand.setLastModifiedTime(coordJob12, "2009-01-01T01:00Z");
+ coordJob12.setAppName("coord12");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob12));
+ addRecordToBundleActionTable(bundleJobId, coordJob12.getId(), coordJob12.getAppName(), 0, Job.Status.RUNNINGWITHERROR);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob11.getLastModifiedTime());
+ assertEquals(12, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob12.getLastModifiedTime());
+ assertEquals(12, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob13 = addRecordToCoordJobTable(CoordinatorJob.Status.SUSPENDED, false, false);
+ coordJob13 = TestPurgeXCommand.setLastModifiedTime(coordJob13, "2008-12-01T01:00Z");
+ coordJob13.setAppName("coord13");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob13));
+ addRecordToBundleActionTable(bundleJobId, coordJob13.getId(), coordJob13.getAppName(), 0, Job.Status.SUSPENDED);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob12.getLastModifiedTime());
+ assertEquals(13, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob13.getLastModifiedTime());
+ assertEquals(13, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+
+ CoordinatorJobBean coordJob14 = addRecordToCoordJobTable(CoordinatorJob.Status.SUSPENDEDWITHERROR, false, false);
+ coordJob14 = TestPurgeXCommand.setLastModifiedTime(coordJob14, "2008-11-01T01:00Z");
+ coordJob14.setAppName("coord14");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob14));
+ addRecordToBundleActionTable(bundleJobId, coordJob14.getId(), coordJob14.getAppName(), 0, Job.Status.SUSPENDEDWITHERROR);
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob13.getLastModifiedTime());
+ assertEquals(14, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(coordJob14.getLastModifiedTime());
+ assertEquals(14, (long) jpaService.execute(new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(days, bundleJobId)));
+ }
+}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java Fri Apr 5 17:25:08 2013
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordJobsDeleteJPAExecutor extends XDataTestCase {
+ Services services;
+ private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testDeleteCoords() throws Exception {
+ CoordinatorJobBean jobA = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ CoordinatorActionBean actionA1 = addRecordToCoordActionTable(jobA.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean actionA2 = addRecordToCoordActionTable(jobA.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+
+ CoordinatorJobBean jobB = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ CoordinatorActionBean actionB1 = addRecordToCoordActionTable(jobB.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean actionB2 = addRecordToCoordActionTable(jobB.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+
+ CoordinatorJobBean jobC = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ CoordinatorActionBean actionC1 = addRecordToCoordActionTable(jobC.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean actionC2 = addRecordToCoordActionTable(jobC.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ List<String> deleteList = new ArrayList<String>();
+ deleteList.add(jobA.getId());
+ deleteList.add(jobB.getId());
+ deleteList.add(jobC.getId());
+ jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteList));
+
+ try {
+ jpaService.execute(new CoordJobGetJPAExecutor(jobA.getId()));
+ fail("Coordinator Job A should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0604, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionA1.getId()));
+ fail("Coordinator Action A1 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionA2.getId()));
+ fail("Coordinator Action A2 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new CoordJobGetJPAExecutor(jobB.getId()));
+ fail("Coordinator Job B should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0604, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionB1.getId()));
+ fail("Coordinator Action B1 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionB2.getId()));
+ fail("Coordinator Action B2 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new CoordJobGetJPAExecutor(jobC.getId()));
+ fail("Coordinator Job C should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0604, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionC1.getId()));
+ fail("Coordinator Action C1 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionC2.getId()));
+ fail("Coordinator Action C2 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+ }
+
+ public void testDeleteCoordsRollback() throws Exception{
+ CoordinatorJobBean jobA = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ CoordinatorActionBean actionA1 = addRecordToCoordActionTable(jobA.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean actionA2 = addRecordToCoordActionTable(jobA.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+
+ CoordinatorJobBean jobB = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ CoordinatorActionBean actionB1 = addRecordToCoordActionTable(jobB.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean actionB2 = addRecordToCoordActionTable(jobB.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+
+ CoordinatorJobBean jobC = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ CoordinatorActionBean actionC1 = addRecordToCoordActionTable(jobC.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean actionC2 = addRecordToCoordActionTable(jobC.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ try {
+ // set fault injection to true, so transaction is roll backed
+ setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+ setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+
+ List<String> deleteList = new ArrayList<String>();
+ deleteList.add(jobA.getId());
+ deleteList.add(jobB.getId());
+ deleteList.add(jobC.getId());
+
+ try {
+ jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteList));
+ fail("Should have skipped commit for failover testing");
+ }
+ catch (RuntimeException re) {
+ assertEquals("Skipping Commit for Failover Testing", re.getMessage());
+ }
+ }
+ finally {
+ // Remove fault injection
+ FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+ }
+
+ try {
+ jpaService.execute(new CoordJobGetJPAExecutor(jobA.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Coordinator Job A should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionA1.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Coordinator Action A1 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionA2.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Coordinator Action A2 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new CoordJobGetJPAExecutor(jobB.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Coordinator Job B should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionB1.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Coordinator Action B1 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionB2.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Coordinator Action B2 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new CoordJobGetJPAExecutor(jobC.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Coordinator Job C should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionC1.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Coordinator Action C1 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(actionC2.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Coordinator Action C2 should not have been deleted");
+ }
+ }
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetForPurgeJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetForPurgeJPAExecutor.java?rev=1465054&r1=1465053&r2=1465054&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetForPurgeJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetForPurgeJPAExecutor.java Fri Apr 5 17:25:08 2013
@@ -19,6 +19,9 @@ package org.apache.oozie.executor.jpa;
import java.io.IOException;
import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Properties;
@@ -61,22 +64,51 @@ public class TestCoordJobsGetForPurgeJPA
super.tearDown();
}
- public void testCoordJobsGetForPurgeJPAExecutor() throws Exception {
- String jobId = "00000-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
- insertJob(jobId, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
- _testCoordJobsForPurge(10, 1);
+ public void testCoordJobsGetForPurgeJPAExecutorWithParent() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ String jobId1 = "00001-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+ insertJob(jobId1, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+ String jobId2 = "00002-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+ CoordinatorJobBean job2 = insertJob(jobId2, CoordinatorJob.Status.SUCCEEDED,
+ DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+ job2.setBundleId("some_bundle_parent_id");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(job2));
+
+ CoordJobsGetForPurgeJPAExecutor executor = new CoordJobsGetForPurgeJPAExecutor(10, 50);
+ List<String> jobList = jpaService.execute(executor);
+ // job2 shouldn't be in the list because it has a parent
+ assertEquals(1, jobList.size());
+ assertEquals(jobId1, jobList.get(0));
}
- private void _testCoordJobsForPurge(int olderThan, int expected) throws Exception {
+ public void testCoordJobsGetForPurgeJPAExecutorTooMany() throws Exception {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- CoordJobsGetForPurgeJPAExecutor executor = new CoordJobsGetForPurgeJPAExecutor(olderThan, 50);
- List<CoordinatorJobBean> jobList = jpaService.execute(executor);
- assertEquals(expected, jobList.size());
+ String jobId1 = "00001-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+ insertJob(jobId1, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+ String jobId2 = "00002-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+ insertJob(jobId2, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+ String jobId3 = "00003-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+ insertJob(jobId3, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+ String jobId4 = "00004-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+ insertJob(jobId4, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+ String jobId5 = "00005-" + new Date().getTime() + "-TestCoordJobsGetForPurgeJPAExecutor-C";
+ insertJob(jobId5, CoordinatorJob.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
+
+ List<String> list = new ArrayList<String>();
+ // Get the first 3
+ list.addAll(jpaService.execute(new CoordJobsGetForPurgeJPAExecutor(1, 3)));
+ assertEquals(3, list.size());
+ // Get the next 3 (though there's only 2 more)
+ list.addAll(jpaService.execute(new CoordJobsGetForPurgeJPAExecutor(1, 3, 3)));
+ assertEquals(5, list.size());
+ checkCoordinators(list, jobId1, jobId2, jobId3, jobId4, jobId5);
}
- private void insertJob(String jobId, CoordinatorJob.Status status, Date d) throws Exception {
+ private CoordinatorJobBean insertJob(String jobId, CoordinatorJob.Status status, Date d) throws Exception {
Path appPath = new Path(getFsTestCaseDir(), "coord");
String appXml = getCoordJobXml(appPath);
@@ -120,6 +152,7 @@ public class TestCoordJobsGetForPurgeJPA
fail("Unable to insert the test job record to table");
throw ce;
}
+ return coordJob;
}
private String getCoordJobXml(Path appPath) {
@@ -152,4 +185,14 @@ public class TestCoordJobsGetForPurgeJPA
return conf;
}
+
+ private void checkCoordinators(List<String> coords, String... coordJobIDs) {
+ assertEquals(coordJobIDs.length, coords.size());
+ Arrays.sort(coordJobIDs);
+ Collections.sort(coords);
+
+ for (int i = 0; i < coordJobIDs.length; i++) {
+ assertEquals(coordJobIDs[i], coords.get(i));
+ }
+ }
}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetFromParentIdJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetFromParentIdJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetFromParentIdJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsGetFromParentIdJPAExecutor.java Fri Apr 5 17:25:08 2013
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordJobsGetFromParentIdJPAExecutor extends XDataTestCase {
+ Services services;
+ private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testGetBundleParent() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ BundleJobBean bundleJobA = addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+ BundleJobBean bundleJobB = addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+ CoordinatorJobBean coordJobA1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ CoordinatorJobBean coordJobA2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ coordJobA2.setAppName("something_different");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJobA2));
+ CoordinatorJobBean coordJobB = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ CoordinatorActionBean coordActionA1 = addRecordToCoordActionTable(coordJobA1.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean coordActionA2 = addRecordToCoordActionTable(coordJobA2.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean coordActionB = addRecordToCoordActionTable(coordJobB.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ BundleActionBean bundleActionA1 = addRecordToBundleActionTable(bundleJobA.getId(), coordJobA1.getId(),
+ coordJobA1.getAppName(), 0, Job.Status.SUCCEEDED);
+ BundleActionBean bundleActionA2 = addRecordToBundleActionTable(bundleJobA.getId(), coordJobA2.getId(),
+ coordJobA2.getAppName(), 0, Job.Status.SUCCEEDED);
+ BundleActionBean bundleActionB = addRecordToBundleActionTable(bundleJobB.getId(), coordJobB.getId(),
+ coordJobB.getAppName(), 0, Job.Status.SUCCEEDED);
+
+ List<String> children = new ArrayList<String>();
+ children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleJobA.getId(), 10)));
+ checkChildren(children, coordJobA1.getId(), coordJobA2.getId());
+
+ children = new ArrayList<String>();
+ children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleJobB.getId(), 10)));
+ checkChildren(children, coordJobB.getId());
+ }
+
+ public void testGetBundleParentTooMany() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, false);
+ CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ coordJob1.setAppName("coordJob1");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob1));
+ CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ coordJob2.setAppName("coordJob2");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob2));
+ CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ coordJob3.setAppName("coordJob3");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob3));
+ CoordinatorJobBean coordJob4 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ coordJob4.setAppName("coordJob4");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob4));
+ CoordinatorJobBean coordJob5 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ coordJob5.setAppName("coordJob5");
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob5));
+ CoordinatorActionBean coordAction1 = addRecordToCoordActionTable(coordJob1.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean coordAction2 = addRecordToCoordActionTable(coordJob2.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean coordAction3 = addRecordToCoordActionTable(coordJob3.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean coordAction4 = addRecordToCoordActionTable(coordJob4.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean coordAction5 = addRecordToCoordActionTable(coordJob5.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", 0);
+ BundleActionBean bundleAction1 = addRecordToBundleActionTable(bundleJob.getId(), coordJob1.getId(),
+ coordJob1.getAppName(), 0, Job.Status.SUCCEEDED);
+ BundleActionBean bundleAction2 = addRecordToBundleActionTable(bundleJob.getId(), coordJob2.getId(),
+ coordJob2.getAppName(), 0, Job.Status.SUCCEEDED);
+ BundleActionBean bundleAction3 = addRecordToBundleActionTable(bundleJob.getId(), coordJob3.getId(),
+ coordJob3.getAppName(), 0, Job.Status.SUCCEEDED);
+ BundleActionBean bundleAction4 = addRecordToBundleActionTable(bundleJob.getId(), coordJob4.getId(),
+ coordJob4.getAppName(), 0, Job.Status.SUCCEEDED);
+ BundleActionBean bundleAction5 = addRecordToBundleActionTable(bundleJob.getId(), coordJob5.getId(),
+ coordJob5.getAppName(), 0, Job.Status.SUCCEEDED);
+
+ List<String> children = new ArrayList<String>();
+ // Get the first 3
+ children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleJob.getId(), 3)));
+ assertEquals(3, children.size());
+ // Get the next 3 (though there's only 2 more)
+ children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleJob.getId(), 3, 3)));
+ assertEquals(5, children.size());
+ checkChildren(children, coordJob1.getId(), coordJob2.getId(), coordJob3.getId(), coordJob4.getId(), coordJob5.getId());
+ }
+
+ private void checkChildren(List<String> children, String... coordJobIDs) {
+ assertEquals(coordJobIDs.length, children.size());
+ Arrays.sort(coordJobIDs);
+ Collections.sort(children);
+
+ for (int i = 0; i < coordJobIDs.length; i++) {
+ assertEquals(coordJobIDs[i], children.get(i));
+ }
+ }
+}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor.java Fri Apr 5 17:25:08 2013
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.TestPurgeXCommand;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsCountNotForPurgeFromParentIdJPAExecutor extends XDataTestCase {
+ Services services;
+ private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testCount() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ String coordJobId = coordJob.getId();
+ int days = 1;
+ assertEquals(0, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, coordJobId);
+ wfJob1 = TestPurgeXCommand.setEndTime(wfJob1, "2009-12-01T01:00Z");
+ days = 1;
+ assertEquals(0, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime());
+ assertEquals(1, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED, coordJobId);
+ wfJob2 = TestPurgeXCommand.setEndTime(wfJob2, "2009-11-01T01:00Z");
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime());
+ assertEquals(1, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime());
+ assertEquals(2, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED, coordJobId);
+ wfJob3 = TestPurgeXCommand.setEndTime(wfJob3, "2009-10-01T01:00Z");
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime());
+ assertEquals(2, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime());
+ assertEquals(3, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob4 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP, coordJobId);
+ wfJob4 = TestPurgeXCommand.setEndTime(wfJob4, "2009-09-01T01:00Z");
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime());
+ assertEquals(4, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime());
+ assertEquals(4, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob5 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING, coordJobId);
+ wfJob5 = TestPurgeXCommand.setEndTime(wfJob5, "2009-08-01T01:00Z");
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime());
+ assertEquals(5, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime());
+ assertEquals(5, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+
+ WorkflowJobBean wfJob6 = addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED, coordJobId);
+ wfJob6 = TestPurgeXCommand.setEndTime(wfJob6, "2009-07-01T01:00Z");
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime());
+ assertEquals(6, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+ days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob6.getEndTime());
+ assertEquals(6, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(days, coordJobId)));
+ }
+}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsDeleteJPAExecutor.java?rev=1465054&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsDeleteJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsDeleteJPAExecutor.java Fri Apr 5 17:25:08 2013
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestWorkflowJobsDeleteJPAExecutor extends XDataTestCase {
+ Services services;
+ private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testDeleteWorkflows() throws Exception {
+ WorkflowJobBean jobA = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean actionA1 = this.addRecordToWfActionTable(jobA.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean actionA2 = this.addRecordToWfActionTable(jobA.getId(), "2", WorkflowAction.Status.OK);
+
+ WorkflowJobBean jobB = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean actionB1 = this.addRecordToWfActionTable(jobB.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean actionB2 = this.addRecordToWfActionTable(jobB.getId(), "2", WorkflowAction.Status.OK);
+
+ WorkflowJobBean jobC = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean actionC1 = this.addRecordToWfActionTable(jobC.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean actionC2 = this.addRecordToWfActionTable(jobC.getId(), "2", WorkflowAction.Status.OK);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ List<String> deleteList = new ArrayList<String>();
+ deleteList.add(jobA.getId());
+ deleteList.add(jobB.getId());
+ deleteList.add(jobC.getId());
+ jpaService.execute(new WorkflowJobsDeleteJPAExecutor(deleteList));
+
+ try {
+ jpaService.execute(new WorkflowJobGetJPAExecutor(jobA.getId()));
+ fail("Workflow Job A should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0604, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionA1.getId()));
+ fail("Workflow Action A1 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionA2.getId()));
+ fail("Workflow Action A2 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new WorkflowJobGetJPAExecutor(jobB.getId()));
+ fail("Workflow Job B should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0604, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionB1.getId()));
+ fail("Workflow Action B1 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionB2.getId()));
+ fail("Workflow Action B2 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new WorkflowJobGetJPAExecutor(jobC.getId()));
+ fail("Workflow Job C should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0604, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionC1.getId()));
+ fail("Workflow Action C1 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionC2.getId()));
+ fail("Workflow Action C2 should have been deleted");
+ }
+ catch (JPAExecutorException je) {
+ assertEquals(ErrorCode.E0605, je.getErrorCode());
+ }
+ }
+
+ public void testDeleteWorkflowsRollback() throws Exception{
+ WorkflowJobBean jobA = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean actionA1 = this.addRecordToWfActionTable(jobA.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean actionA2 = this.addRecordToWfActionTable(jobA.getId(), "2", WorkflowAction.Status.OK);
+
+ WorkflowJobBean jobB = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean actionB1 = this.addRecordToWfActionTable(jobB.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean actionB2 = this.addRecordToWfActionTable(jobB.getId(), "2", WorkflowAction.Status.OK);
+
+ WorkflowJobBean jobC = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean actionC1 = this.addRecordToWfActionTable(jobC.getId(), "1", WorkflowAction.Status.OK);
+ WorkflowActionBean actionC2 = this.addRecordToWfActionTable(jobC.getId(), "2", WorkflowAction.Status.OK);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ try {
+ // set fault injection to true, so transaction is roll backed
+ setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+ setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+
+ List<String> deleteList = new ArrayList<String>();
+ deleteList.add(jobA.getId());
+ deleteList.add(jobB.getId());
+ deleteList.add(jobC.getId());
+
+ try {
+ jpaService.execute(new WorkflowJobsDeleteJPAExecutor(deleteList));
+ fail("Should have skipped commit for failover testing");
+ }
+ catch (RuntimeException re) {
+ assertEquals("Skipping Commit for Failover Testing", re.getMessage());
+ }
+ }
+ finally {
+ // Remove fault injection
+ FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+ }
+
+ try {
+ jpaService.execute(new WorkflowJobGetJPAExecutor(jobA.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Workflow Job A should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionA1.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Workflow Action A1 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionA2.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Workflow Action A2 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new WorkflowJobGetJPAExecutor(jobB.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Workflow Job B should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionB1.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Workflow Action B1 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionB2.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Workflow Action B2 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new WorkflowJobGetJPAExecutor(jobC.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Workflow Job C should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionC1.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Workflow Action C1 should not have been deleted");
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(actionC2.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("Workflow Action C2 should not have been deleted");
+ }
+ }
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetForPurgeJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetForPurgeJPAExecutor.java?rev=1465054&r1=1465053&r2=1465054&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetForPurgeJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetForPurgeJPAExecutor.java Fri Apr 5 17:25:08 2013
@@ -18,6 +18,9 @@
package org.apache.oozie.executor.jpa;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -54,19 +57,40 @@ public class TestWorkflowJobsGetForPurge
super.tearDown();
}
- public void testWfJobsGetForPurge() throws Exception {
- addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
- addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
- _testGetJobsForPurge(1, 20);
+ public void testWfJobsGetForPurgeWithParent() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
+ WorkflowJobBean job3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ job3.setParentId(job1.getId());
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(job3));
+
+ List<String> list = new ArrayList<String>();
+ list.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(1, 20)));
+ // job3 shouldn't be in the list because it has a parent
+ checkWorkflows(list, job1.getId(), job2.getId());
}
- private void _testGetJobsForPurge(long olderThanDays, int limit) throws Exception {
+ public void testWfJobsGetForPurgeTooMany() throws Exception {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- WorkflowJobsGetForPurgeJPAExecutor wfJobsForPurgeExe = new WorkflowJobsGetForPurgeJPAExecutor(olderThanDays, limit);
- List<WorkflowJobBean> list = jpaService.execute(wfJobsForPurgeExe);
- assertNotNull(list);
- assertEquals(2, list.size());
+
+ WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
+ WorkflowJobBean job3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ WorkflowJobBean job4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ WorkflowJobBean job5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+
+ List<String> list = new ArrayList<String>();
+ // Get the first 3
+ list.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(1, 3)));
+ assertEquals(3, list.size());
+ // Get the next 3 (though there's only 2 more)
+ list.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(1, 3, 3)));
+ assertEquals(5, list.size());
+ checkWorkflows(list, job1.getId(), job2.getId(), job3.getId(), job4.getId(), job5.getId());
}
@Override
@@ -100,4 +124,13 @@ public class TestWorkflowJobsGetForPurge
return wfBean;
}
+ private void checkWorkflows(List<String> wfs, String... wfJobIDs) {
+ assertEquals(wfJobIDs.length, wfs.size());
+ Arrays.sort(wfJobIDs);
+ Collections.sort(wfs);
+
+ for (int i = 0; i < wfJobIDs.length; i++) {
+ assertEquals(wfJobIDs[i], wfs.get(i));
+ }
+ }
}