You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/09/08 04:47:41 UTC
svn commit: r1520829 [4/6] - in /oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/bundle/
core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java...
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java Sun Sep 8 02:47:39 2013
@@ -39,13 +39,15 @@ import org.apache.oozie.command.bundle.B
import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
+import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.util.DateUtils;
@@ -637,7 +639,7 @@ public class StatusTransitService implem
bundleJob.resetPending();
LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
}
- jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
+ BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, bundleJob);
}
private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
@@ -662,7 +664,7 @@ public class StatusTransitService implem
if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
LOG.debug("Updating coord job " + coordJob.getId());
coordJob.setLastModifiedTime(new Date());
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob);
}
// update bundle action only when status changes in coord job
if (coordJob.getBundleId() != null) {
@@ -687,7 +689,7 @@ public class StatusTransitService implem
}
boolean hasChange = prevPending != coordJob.isPending();
if (saveToDB && hasChange) {
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob);
}
return hasChange;
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java Sun Sep 8 02:47:39 2013
@@ -40,16 +40,18 @@ import org.apache.oozie.client.event.Job
import org.apache.oozie.client.event.SLAEvent.EventStatus;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
-
-import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusActualTimesJPAExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.ServiceException;
@@ -112,7 +114,9 @@ public class SLACalculatorMemory impleme
break;
}
if (isJobModified) {
- jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(summaryBean));
+ summaryBean.setLastModifiedTime(new Date());
+ SLASummaryQueryExecutor.getInstance().executeUpdate(
+ SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean);
}
}
catch (Exception e) {
@@ -352,7 +356,9 @@ public class SLACalculatorMemory impleme
slaSummaryBean.setActualStart(slaCalc.getActualStart());
slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
- jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaSummaryBean));
+ slaSummaryBean.setLastModifiedTime(new Date());
+ SLASummaryQueryExecutor.getInstance().executeUpdate(
+ SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
if (eventProc == 7) {
historySet.add(jobId);
slaMap.remove(jobId);
@@ -397,7 +403,7 @@ public class SLACalculatorMemory impleme
List<JsonBean> insertList = new ArrayList<JsonBean>();
insertList.add(reg);
insertList.add(new SLASummaryBean(slaCalc));
- jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
LOG.trace("SLA Registration Event - Job:" + jobId);
return true;
}
@@ -442,10 +448,11 @@ public class SLACalculatorMemory impleme
slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
slaMap.put(jobId, slaCalc);
- List<JsonBean> updateList = new ArrayList<JsonBean>();
- updateList.add(reg);
- updateList.add(new SLASummaryBean(slaCalc));
- jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
+ List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+ updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg));
+ updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
+ new SLASummaryBean(slaCalc)));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
LOG.trace("SLA Registration Event - Job:" + jobId);
return true;
}
@@ -524,7 +531,9 @@ public class SLACalculatorMemory impleme
}
if (hasSla) {
- jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaInfo));
+ slaInfo.setLastModifiedTime(new Date());
+ SLASummaryQueryExecutor.getInstance().executeUpdate(
+ SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo);
}
return hasSla;
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java Sun Sep 8 02:47:39 2013
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.sla;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -46,6 +47,8 @@ import org.json.simple.JSONObject;
@Table(name = "SLA_REGISTRATION")
@NamedQueries({
+ @NamedQuery(name = "UPDATE_SLA_REG_ALL", query = "update SLARegistrationBean w set w.jobId = :jobId, w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration, w.slaConfig = :slaConfig, w.notificationMsg = :notificationMsg, w.upstreamApps = :upstreamApps, w.appType = :appType, w.appName = :appName, w.user = :user, w.parentId = :parentId, w.jobData = :jobData where w.jobId = :jobId"),
+
@NamedQuery(name = "GET_SLA_REG_ON_RESTART", query = "select w.notificationMsg, w.upstreamApps, w.slaConfig, w.jobData from SLARegistrationBean w where w.jobId = :id"),
@NamedQuery(name = "GET_SLA_REG_ALL", query = "select OBJECT(w) from SLARegistrationBean w where w.jobId = :id") })
@@ -160,6 +163,10 @@ public class SLARegistrationBean impleme
return DateUtils.toDate(nominalTimeTS);
}
+ public Timestamp getNominalTimestamp() {
+ return this.nominalTimeTS;
+ }
+
public void setNominalTime(Date nominalTime) {
this.nominalTimeTS = DateUtils.convertDateToTimestamp(nominalTime);
}
@@ -168,6 +175,10 @@ public class SLARegistrationBean impleme
return DateUtils.toDate(expectedStartTS);
}
+ public Timestamp getExpectedStartTimestamp() {
+ return this.expectedStartTS;
+ }
+
public void setExpectedStart(Date expectedStart) {
this.expectedStartTS = DateUtils.convertDateToTimestamp(expectedStart);
}
@@ -176,6 +187,10 @@ public class SLARegistrationBean impleme
return DateUtils.toDate(expectedEndTS);
}
+ public Timestamp getExpectedEndTimestamp() {
+ return this.expectedEndTS;
+ }
+
public void setExpectedEnd(Date expectedEnd) {
this.expectedEndTS = DateUtils.convertDateToTimestamp(expectedEnd);
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java Sun Sep 8 02:47:39 2013
@@ -47,6 +47,8 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES", query = "update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.jobStatus = :jobStatus, w.lastModifiedTS = :lastModifiedTS, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration where w.jobId = :jobId"),
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_ALL", query = "update SLASummaryBean w set w.jobId = :jobId, w.appName = :appName, w.appType = :appType, w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration, w.jobStatus = :jobStatus, w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.lastModifiedTS = :lastModTime, w.user = :user, w.parentId = :parentId, w.eventProcessed = :eventProcessed, w.actualDuration = :actualDuration, w.actualEndTS = :actualEndTS, w.actualStartTS = :actualStartTS where w.jobId = :jobId"),
+
@NamedQuery(name = "GET_SLA_SUMMARY", query = "select OBJECT(w) from SLASummaryBean w where w.jobId = :id"),
@NamedQuery(name = "GET_SLA_SUMMARY_RECORDS_RESTART", query = "select OBJECT(w) from SLASummaryBean w where w.eventProcessed <= 7 AND w.lastModifiedTS >= :lastModifiedTime") })
@@ -174,6 +176,10 @@ public class SLASummaryBean implements J
return DateUtils.toDate(nominalTimeTS);
}
+ public Timestamp getNominalTimestamp() {
+ return this.nominalTimeTS;
+ }
+
public void setNominalTime(Date nominalTime) {
this.nominalTimeTS = DateUtils.convertDateToTimestamp(nominalTime);
}
@@ -183,6 +189,10 @@ public class SLASummaryBean implements J
return DateUtils.toDate(expectedStartTS);
}
+ public Timestamp getExpectedStartTimestamp() {
+ return this.expectedStartTS;
+ }
+
public void setExpectedStart(Date expectedStart) {
this.expectedStartTS = DateUtils.convertDateToTimestamp(expectedStart);
}
@@ -191,6 +201,9 @@ public class SLASummaryBean implements J
return DateUtils.toDate(expectedEndTS);
}
+ public Timestamp getExpectedEndTimestamp() {
+ return this.expectedEndTS;
+ }
public void setExpectedEnd(Date expectedEnd) {
this.expectedEndTS = DateUtils.convertDateToTimestamp(expectedEnd);
}
@@ -207,6 +220,10 @@ public class SLASummaryBean implements J
return DateUtils.toDate(actualStartTS);
}
+ public Timestamp getActualStartTimestamp() {
+ return this.actualStartTS;
+ }
+
public void setActualStart(Date actualStart) {
this.actualStartTS = DateUtils.convertDateToTimestamp(actualStart);
}
@@ -215,6 +232,10 @@ public class SLASummaryBean implements J
return DateUtils.toDate(actualEndTS);
}
+ public Timestamp getActualEndTimestamp() {
+ return this.actualEndTS;
+ }
+
public void setActualEnd(Date actualEnd) {
this.actualEndTS = DateUtils.convertDateToTimestamp(actualEnd);
}
@@ -295,6 +316,10 @@ public class SLASummaryBean implements J
return DateUtils.toDate(lastModifiedTS);
}
+ public Timestamp getLastModifiedTimestamp() {
+ return this.lastModifiedTS;
+ }
+
public void setLastModifiedTime(Date lastModified) {
this.lastModifiedTS = DateUtils.convertDateToTimestamp(lastModified);
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java Sun Sep 8 02:47:39 2013
@@ -34,6 +34,10 @@ import org.apache.oozie.CoordinatorJobIn
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.Job.Status;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
@@ -351,14 +355,12 @@ public class CoordinatorStore extends St
* @param action Action Bean
* @throws StoreException if action doesn't exist
*/
- public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
+ public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException, JPAExecutorException {
ParamChecker.notNull(action, "CoordinatorActionBean");
doOperation("updateCoordinatorAction", new Callable<Void>() {
- public Void call() throws StoreException {
- Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION");
- q.setParameter("id", action.getId());
- setActionQueryParameters(action, q);
- q.executeUpdate();
+ public Void call() throws StoreException, JPAExecutorException {
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION, action);
return null;
}
});
@@ -370,17 +372,12 @@ public class CoordinatorStore extends St
* @param action Action Bean
* @throws StoreException if action doesn't exist
*/
- public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException {
+ public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException, JPAExecutorException {
ParamChecker.notNull(action, "CoordinatorActionBean");
doOperation("updateCoordinatorAction", new Callable<Void>() {
- public Void call() throws StoreException {
- Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION_MIN");
- q.setParameter("id", action.getId());
- q.setParameter("missingDependencies", action.getMissingDependencies());
- q.setParameter("lastModifiedTime", new Date());
- q.setParameter("status", action.getStatus().toString());
- q.setParameter("actionXml", action.getActionXml());
- q.executeUpdate();
+ public Void call() throws StoreException, JPAExecutorException {
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, action);
return null;
}
});
@@ -395,11 +392,8 @@ public class CoordinatorStore extends St
public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException {
ParamChecker.notNull(job, "CoordinatorJobBean");
doOperation("updateJob", new Callable<Void>() {
- public Void call() throws StoreException {
- Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB");
- q.setParameter("id", job.getId());
- setJobQueryParameters(job, q);
- q.executeUpdate();
+ public Void call() throws StoreException, JPAExecutorException {
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
return null;
}
});
@@ -408,12 +402,8 @@ public class CoordinatorStore extends St
public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException {
ParamChecker.notNull(job, "CoordinatorJobBean");
doOperation("updateJobStatus", new Callable<Void>() {
- public Void call() throws StoreException {
- Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB_STATUS");
- q.setParameter("id", job.getId());
- q.setParameter("status", job.getStatus().toString());
- q.setParameter("lastModifiedTime", new Date());
- q.executeUpdate();
+ public Void call() throws StoreException, JPAExecutorException {
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_MODTIME, job);
return null;
}
});
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/store/WorkflowStore.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/store/WorkflowStore.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/store/WorkflowStore.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/store/WorkflowStore.java Sun Sep 8 02:47:39 2013
@@ -35,6 +35,9 @@ import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.WorkflowsInfo;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob.Status;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
@@ -198,11 +201,9 @@ public class WorkflowStore extends Store
public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
ParamChecker.notNull(wfBean, "WorkflowJobBean");
doOperation("updateWorkflow", new Callable<Void>() {
- public Void call() throws SQLException, StoreException, WorkflowException {
- Query q = entityManager.createNamedQuery("UPDATE_WORKFLOW");
- q.setParameter("id", wfBean.getId());
- setWFQueryParameters(wfBean, q);
- q.executeUpdate();
+ public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQueryExecutor.WorkflowJobQuery.UPDATE_WORKFLOW, wfBean);
return null;
}
});
@@ -274,11 +275,9 @@ public class WorkflowStore extends Store
public void updateAction(final WorkflowActionBean action) throws StoreException {
ParamChecker.notNull(action, "WorkflowActionBean");
doOperation("updateAction", new Callable<Void>() {
- public Void call() throws SQLException, StoreException, WorkflowException {
- Query q = entityManager.createNamedQuery("UPDATE_ACTION");
- q.setParameter("id", action.getId());
- setActionQueryParameters(action, q);
- q.executeUpdate();
+ public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
+ WorkflowActionQueryExecutor.getInstance().executeUpdate(
+ WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action);
return null;
}
});
@@ -935,7 +934,7 @@ public class WorkflowStore extends Store
action.setExecutionPath(a.getExecutionPath());
action.setLastCheckTime(a.getLastCheckTime());
action.setLogToken(a.getLogToken());
- if (a.getPending() == true) {
+ if (a.isPending() == true) {
action.setPending();
}
action.setPendingAge(a.getPendingAge());
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java Sun Sep 8 02:47:39 2013
@@ -18,9 +18,6 @@
package org.apache.oozie.command;
import java.util.Date;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.fail;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -43,12 +40,14 @@ import org.apache.oozie.executor.jpa.Bun
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
@@ -1393,16 +1392,16 @@ public class TestPurgeXCommand extends X
CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob2.setAppName("coord2");
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob2));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob2);
CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob3.setAppName("coord3");
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob3));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob3);
CoordinatorJobBean coordJob4 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob4.setAppName("coord4");
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob4));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob4);
CoordinatorJobBean coordJob5 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob5.setAppName("coord5");
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob5));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob5);
WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
@@ -1803,16 +1802,16 @@ public class TestPurgeXCommand extends X
CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob2.setAppName("coord2");
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob2));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob2);
CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob3.setAppName("coord3");
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob3));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob3);
CoordinatorJobBean coordJob4 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob4.setAppName("coord4");
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob4));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob4);
CoordinatorJobBean coordJob5 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob5.setAppName("coord5");
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob5));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob5);
WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
@@ -3822,7 +3821,7 @@ public class TestPurgeXCommand extends X
try {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- jpaService.execute(new CoordJobUpdateJPAExecutor(job));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, job);
}
catch (JPAExecutorException je) {
je.printStackTrace();
@@ -3837,7 +3836,7 @@ public class TestPurgeXCommand extends X
try {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(job));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, job);
}
catch (JPAExecutorException je) {
je.printStackTrace();
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java Sun Sep 8 02:47:39 2013
@@ -27,7 +27,8 @@ import org.apache.oozie.client.Job;
import org.apache.oozie.executor.jpa.BundleActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
@@ -84,7 +85,7 @@ public class TestBundleChangeXCommand ex
coordJob.setBundleId(bundleJob.getId());
final JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_BUNDLEID, coordJob);
BundleActionBean bundleAction = new BundleActionBean();
bundleAction.setBundleActionId("11111");
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java Sun Sep 8 02:47:39 2013
@@ -267,6 +267,7 @@ public class TestBundleStartXCommand ext
assertEquals(Job.Status.FAILED, actions.get(0).getStatus());
Runnable runnable = new StatusTransitRunnable();
runnable.run();
+ sleep(2000);
job = jpaService.execute(bundleJobGetExecutor);
assertEquals(job.getStatus(), Job.Status.KILLED);
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java Sun Sep 8 02:47:39 2013
@@ -36,8 +36,8 @@ import org.apache.oozie.coord.CoordELFun
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
@@ -235,7 +235,9 @@ public class TestCoordActionInputCheckXC
+ "\">");
action.setActionXml(actionXML);
action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
- jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK,
+ action);
action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
@@ -280,7 +282,9 @@ public class TestCoordActionInputCheckXC
.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
final String pushMissingDependency = "file://" + getTestCaseDir() + "/2009/02/05";
action.setPushMissingDependencies(pushMissingDependency);
- jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK,
+ action);
// Update action creation time
String actionXML = action.getActionXml();
@@ -289,7 +293,9 @@ public class TestCoordActionInputCheckXC
+ "\">");
action.setActionXml(actionXML);
action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
- jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK,
+ action);
action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
@@ -362,7 +368,9 @@ public class TestCoordActionInputCheckXC
+ "\">");
action.setActionXml(actionXML);
action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
- jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK,
+ action);
action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
@@ -406,7 +414,9 @@ public class TestCoordActionInputCheckXC
.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
final String pushMissingDependency = "file://" + getTestCaseDir() + "/2009/02/05";
action.setPushMissingDependencies(pushMissingDependency);
- jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK,
+ action);
// Update action creation time
String actionXML = action.getActionXml();
@@ -415,7 +425,9 @@ public class TestCoordActionInputCheckXC
+ "\">");
action.setActionXml(actionXML);
action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
- jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK,
+ action);
action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
@@ -700,7 +712,7 @@ public class TestCoordActionInputCheckXC
action.setStatus(CoordinatorAction.Status.WAITING);
try {
jpaService = Services.get().get(JPAService.class);
- jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, action);
}
catch (JPAExecutorException se) {
fail("Action ID " + coordJob.getId() + "@1" + " was not stored properly in db");
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java Sun Sep 8 02:47:39 2013
@@ -25,7 +25,8 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
@@ -143,7 +144,7 @@ public class TestCoordActionsKillXComman
"coord-action-get.xml", 0);
action2.setNominalTime(DateUtils.parseDateOozieTZ("2009-12-15T02:00Z"));
action2.setExternalId(null);
- jpaService.execute(new CoordActionUpdateJPAExecutor(action2));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action2);
WorkflowJobBean wf = new WorkflowJobBean();
WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", new StartNodeDef(
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java Sun Sep 8 02:47:39 2013
@@ -31,11 +31,11 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.Job;
import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.JPAService;
@@ -350,7 +350,7 @@ public class TestCoordChangeXCommand ext
insertList.add(slaSummaryBean3);
JPAService jpaService = Services.get().get(JPAService.class);
- jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
new CoordChangeXCommand(job.getId(), pauseTimeChangeStr).call();
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java Sun Sep 8 02:47:39 2013
@@ -32,7 +32,8 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.PartitionDependencyManagerService;
@@ -101,7 +102,7 @@ public class TestCoordKillXCommand exten
// RUNNINGWITHERROR if it had loaded status and had it as RUNNING in memory when CoordKill
// executes and updates status to KILLED in database.
job.setStatus(CoordinatorJob.Status.RUNNINGWITHERROR);
- jpaService.execute(new CoordJobUpdateJPAExecutor(job));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job);
job = jpaService.execute(coordJobGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
final CoordMaterializeTransitionXCommand transitionCmd = new CoordMaterializeTransitionXCommand(job.getId(), 3600);
@@ -199,7 +200,7 @@ public class TestCoordKillXCommand exten
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
job.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
- jpaService.execute(new CoordJobUpdateJPAExecutor(job));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, job);
CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action.getId());
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java Sun Sep 8 02:47:39 2013
@@ -45,8 +45,9 @@ import org.apache.oozie.coord.CoordELFun
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchemaService;
@@ -845,7 +846,7 @@ public class TestCoordRerunXCommand exte
final JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
@@ -894,7 +895,7 @@ public class TestCoordRerunXCommand exte
final JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
@@ -944,7 +945,7 @@ public class TestCoordRerunXCommand exte
final JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
CoordinatorAction.Status.SUCCEEDED, "coord-rerun-action1.xml", 0);
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java Sun Sep 8 02:47:39 2013
@@ -20,7 +20,8 @@ package org.apache.oozie.command.coord;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
@@ -127,7 +128,7 @@ public class TestCoordResumeXCommand ext
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false);
job.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
JPAService jpaService = Services.get().get(JPAService.class);
- jpaService.execute(new CoordJobUpdateJPAExecutor(job));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, job);
assertNotNull(jpaService);
CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java Sun Sep 8 02:47:39 2013
@@ -329,7 +329,7 @@ public class TestActionErrors extends XD
assertEquals("TEST_ERROR", action.getErrorCode());
assertEquals(expErrorMsg, action.getErrorMessage());
assertEquals(expStatus1, action.getStatus());
- assertTrue(action.getPending() == false);
+ assertTrue(action.isPending() == false);
assertTrue(engine.getJob(jobId).getStatus() == WorkflowJob.Status.SUSPENDED);
@@ -410,7 +410,7 @@ public class TestActionErrors extends XD
assertEquals("TEST_ERROR", action.getErrorCode());
assertEquals(expErrorMsg, action.getErrorMessage());
assertEquals(expStatus1, action.getStatus());
- assertFalse(action.getPending());
+ assertFalse(action.isPending());
assertEquals (WorkflowJob.Status.SUSPENDED, job.getStatus());
@@ -509,7 +509,7 @@ public class TestActionErrors extends XD
assertEquals("TEST_ERROR", action.getErrorCode());
assertEquals(expErrorMsg, action.getErrorMessage());
assertEquals(expStatus2, action.getStatus());
- assertTrue(action.getPending() == false);
+ assertTrue(action.isPending() == false);
assertEquals(WorkflowJob.Status.SUSPENDED, engine.getJob(jobId).getStatus());
store2.commitTrx();
store2.closeTrx();
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java Sun Sep 8 02:47:39 2013
@@ -133,7 +133,7 @@ public class TestActionStartXCommand ext
WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
WorkflowActionBean action = super.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP);
- assertFalse(action.getPending());
+ assertFalse(action.isPending());
assertNull(inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP));
ActionStartXCommand startCmd = new ActionStartXCommand(action.getId(), "map-reduce");
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Sun Sep 8 02:47:39 2013
@@ -64,16 +64,20 @@ import org.apache.oozie.command.wf.Suspe
import org.apache.oozie.command.wf.WorkflowXCommand;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -196,7 +200,7 @@ public class TestEventGeneration extends
LiteWorkflowInstance wfInstance = (LiteWorkflowInstance) job.getWorkflowInstance();
wfInstance.start();
job.setWfInstance(wfInstance);
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(job));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, job);
WorkflowActionBean wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(job.getId() + "@one"));
new SignalXCommand(job.getId(), wfAction.getId()).call();
job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
@@ -271,9 +275,9 @@ public class TestEventGeneration extends
action = jpaService.execute(coordGetCmd);
WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
action.setStatus(CoordinatorAction.Status.RUNNING);
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
new CoordActionCheckXCommand(action.getId(), 0).call();
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
@@ -290,9 +294,8 @@ public class TestEventGeneration extends
// Action Failure
action.setStatus(CoordinatorAction.Status.RUNNING);
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
- wfJob.setStatus(WorkflowJob.Status.FAILED);
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
new CoordActionCheckXCommand(action.getId(), 0).call();
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.FAILED, action.getStatus());
@@ -308,14 +311,14 @@ public class TestEventGeneration extends
// Action start on Coord Resume
coord.setStatus(CoordinatorJobBean.Status.SUSPENDED);
- jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, coord);
action.setStatus(CoordinatorAction.Status.SUSPENDED);
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
wfJob.setStatus(WorkflowJob.Status.SUSPENDED);
WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
wfJob.setWorkflowInstance(wfInstance);
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob);
new CoordResumeXCommand(coord.getId()).call();
Thread.sleep(5000);
CoordinatorActionEvent cevent = (CoordinatorActionEvent) queue.poll();
@@ -328,7 +331,7 @@ public class TestEventGeneration extends
// Action going to WAITING on Coord Rerun
action.setStatus(CoordinatorAction.Status.SUCCEEDED);
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
queue.clear();
new CoordRerunXCommand(coord.getId(), RestConstants.JOB_COORD_SCOPE_ACTION, "1", false, true)
.call();
@@ -395,7 +398,7 @@ public class TestEventGeneration extends
action.setStatus(WorkflowAction.Status.KILLED);
action.setPendingOnly();
action.setEndTime(null); //its already set by XTestCase add action record method above
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+ WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION_END, action);
new ActionKillXCommand(action.getId()).call();
action = jpaService.execute(wfActionGetCmd);
assertEquals(WorkflowAction.Status.KILLED, action.getStatus());
@@ -442,7 +445,8 @@ public class TestEventGeneration extends
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
WorkflowJobBean wjb = new WorkflowJobBean();
wjb.setId(action.getExternalId());
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
+ wjb.setLastModifiedTime(new Date());
+ WorkflowJobQueryExecutor.getInstance().insert(wjb);
CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
@Override
@@ -454,7 +458,7 @@ public class TestEventGeneration extends
// CASE 1: Only pull missing deps
action.setMissingDependencies("pull");
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
myCmd.call();
CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
assertNotNull(event);
@@ -463,7 +467,7 @@ public class TestEventGeneration extends
// CASE 2: Only hcat (push) missing deps
action.setMissingDependencies(null);
action.setPushMissingDependencies("push");
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
myCmd.call();
event = (CoordinatorActionEvent) queue.poll();
assertNotNull(event);
@@ -471,7 +475,7 @@ public class TestEventGeneration extends
// CASE 3: Both types
action.setMissingDependencies("pull");
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
myCmd.call();
event = (CoordinatorActionEvent) queue.poll();
assertNotNull(event);
@@ -533,7 +537,7 @@ public class TestEventGeneration extends
WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
action.getId());
action.setExternalId(wf.getId());
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action);
String waId = _createWorkflowAction(wf.getId(), "wf-action");
new ActionStartXCommand(waId, action.getType()).call();
@@ -613,7 +617,7 @@ public class TestEventGeneration extends
executionPath, true);
wfAction.setPending();
wfAction.setSignalValue(WorkflowAction.Status.OK.name());
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
+ WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION, wfAction);
return workflow;
}
@@ -623,7 +627,7 @@ public class TestEventGeneration extends
writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
String coordXml = coord.getJobXml();
coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
- jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord);
}
private String _createWorkflowAction(String wfId, String actionName) throws JPAExecutorException {
Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBatchQueryExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBatchQueryExecutor.java?rev=1520829&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBatchQueryExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBatchQueryExecutor.java Sun Sep 8 02:47:39 2013
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestBatchQueryExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testExecuteBatchUpdateInsertDelete() throws Exception {
+ BatchQueryExecutor executor = BatchQueryExecutor.getInstance();
+ // for update
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+ WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.PREP);
+ // for insert
+ CoordinatorActionBean coordAction = new CoordinatorActionBean();
+ coordAction.setId("testCoordAction1");
+ JPAService jpaService = Services.get().get(JPAService.class);
+
+ // update the status
+ coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+ wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+
+ // update the list for doing bulk writes
+ List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+ updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_MODTIME, coordJob));
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wfJob));
+
+ // insert beans
+ Collection<JsonBean> insertList = new ArrayList<JsonBean>();
+ insertList.add(coordAction);
+
+ // delete beans
+ Collection<JsonBean> deleteList = new ArrayList<JsonBean>();
+ deleteList.add(wfAction);
+
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, deleteList);
+
+ // check update after running ExecuteBatchUpdateInsertDelete
+ coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordJob.getId());
+ assertEquals("RUNNING", coordJob.getStatusStr());
+ wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, wfJob.getId());
+ assertEquals("SUCCEEDED", wfJob.getStatusStr());
+ coordAction = CoordActionQueryExecutor.getInstance()
+ .get(CoordActionQuery.GET_COORD_ACTION, coordAction.getId());
+ assertEquals("testCoordAction1", coordAction.getId());
+ try {
+ wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, wfJob.getId());
+ fail();
+ }
+ catch (JPAExecutorException ex) {
+ assertEquals(ex.getErrorCode().toString(), "E0605");
+ }
+ }
+
+ public void testExecuteBatchUpdateInsertDeleteRollBack() throws Exception {
+ BatchQueryExecutor executor = BatchQueryExecutor.getInstance();
+ WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+ job.setStatus(WorkflowJob.Status.RUNNING);
+
+ Collection<JsonBean> insertList = new ArrayList<JsonBean>();
+ insertList.add(action1);
+ insertList.add(action2);
+
+ List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+ // Add two actions to insert list
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, job));
+
+ // set fault injection to true, so transaction is roll backed
+ setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+ setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+ FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+ try {
+ executor.executeBatchInsertUpdateDelete(insertList, updateList, null);
+ fail("Expected exception due to commit failure but didn't get any");
+ }
+ catch (Exception e) {
+ }
+ FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+
+ // Check whether transactions are rolled back or not
+
+ WorkflowJobBean wfBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, job.getId());
+ // status should not be RUNNING
+ assertEquals("PREP", wfBean.getStatusStr());
+
+ WorkflowActionBean waBean;
+ try {
+ waBean = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, action1.getId());
+ fail("Expected exception but didnt get any");
+ }
+ catch (JPAExecutorException jpaee) {
+ assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+ }
+
+ try {
+ waBean = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, action2.getId());
+ fail("Expected exception but didnt get any");
+ }
+ catch (JPAExecutorException jpaee) {
+ assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+ }
+
+ }
+}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java?rev=1520829&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java Sun Sep 8 02:47:39 2013
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordJobQueryExecutor extends XDataTestCase {
+ Services services;
+ JPAService jpaService;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ jpaService = Services.get().get(JPAService.class);
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testGetQuery() throws Exception {
+ EntityManager em = jpaService.getEntityManager();
+ CoordinatorJobBean cjBean = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+
+ // UPDATE_COORD_JOB
+ Query query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB, cjBean, em);
+ assertEquals(query.getParameterValue("appName"), cjBean.getAppName());
+ assertEquals(query.getParameterValue("appPath"), cjBean.getAppPath());
+ assertEquals(query.getParameterValue("concurrency"), cjBean.getConcurrency());
+ assertEquals(query.getParameterValue("conf"), cjBean.getConf());
+ assertEquals(query.getParameterValue("externalId"), cjBean.getExternalId());
+ assertEquals(query.getParameterValue("frequency"), cjBean.getFrequency());
+ assertEquals(query.getParameterValue("lastActionNumber"), cjBean.getLastActionNumber());
+ assertEquals(query.getParameterValue("timeOut"), cjBean.getTimeout());
+ assertEquals(query.getParameterValue("timeZone"), cjBean.getTimeZone());
+ assertEquals(query.getParameterValue("createdTime"), cjBean.getCreatedTimestamp());
+ assertEquals(query.getParameterValue("endTime"), cjBean.getEndTimestamp());
+ assertEquals(query.getParameterValue("execution"), cjBean.getExecution());
+ assertEquals(query.getParameterValue("jobXml"), cjBean.getJobXml());
+ assertEquals(query.getParameterValue("lastAction"), cjBean.getLastActionTimestamp());
+ assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+ assertEquals(query.getParameterValue("nextMaterializedTime"), cjBean.getNextMaterializedTimestamp());
+ assertEquals(query.getParameterValue("origJobXml"), cjBean.getOrigJobXml());
+ assertEquals(query.getParameterValue("slaXml"), cjBean.getSlaXml());
+ assertEquals(query.getParameterValue("startTime"), cjBean.getStartTimestamp());
+ assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+ assertEquals(query.getParameterValue("timeUnit"), cjBean.getTimeUnit().toString());
+ assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+ // UPDATE_COORD_JOB_STATUS
+ query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_STATUS, cjBean, em);
+ assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+ assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+ // UPDATE_COORD_JOB_BUNDLEID
+ cjBean.setBundleId("bundleID-test");
+ query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_BUNDLEID, cjBean, em);
+ assertEquals(query.getParameterValue("bundleId"), cjBean.getBundleId());
+ assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+ // UPDATE_COORD_JOB_STATUS_PENDING
+ query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING,
+ cjBean, em);
+ assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+ assertEquals(query.getParameterValue("pending"), cjBean.isPending() ? 1 : 0);
+ assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+ // UPDATE_COORD_JOB_STATUS_MODTIME
+ query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_STATUS_MODTIME,
+ cjBean, em);
+ assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+ assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+ assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+ // UPDATE_COORD_JOB_STATUS_PENDING_MODTIME
+ query = CoordJobQueryExecutor.getInstance().getUpdateQuery(
+ CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, cjBean, em);
+ assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+ assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+ assertEquals(query.getParameterValue("pending"), cjBean.isPending() ? 1 : 0);
+ assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+ // UPDATE_COORD_JOB_LAST_MODIFIED_TIME
+ query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
+ cjBean, em);
+ assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+ assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+ // UPDATE_COORD_JOB_STATUS_PENDING_TIME
+ query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME,
+ cjBean, em);
+ assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+ assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+ assertEquals(query.getParameterValue("doneMaterialization"), cjBean.isDoneMaterialization() ? 1 : 0);
+ assertEquals(query.getParameterValue("suspendedTime"), cjBean.getSuspendedTimestamp());
+ assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+ assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+ // UPDATE_COORD_JOB_MATERIALIZE
+ query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, cjBean,
+ em);
+ assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+ assertEquals(query.getParameterValue("pending"), cjBean.isPending() ? 1 : 0);
+ assertEquals(query.getParameterValue("doneMaterialization"), cjBean.isDoneMaterialization() ? 1 : 0);
+ assertEquals(query.getParameterValue("lastActionTime"), cjBean.getLastActionTimestamp());
+ assertEquals(query.getParameterValue("lastActionNumber"), cjBean.getLastActionNumber());
+ assertEquals(query.getParameterValue("nextMatdTime"), cjBean.getNextMaterializedTimestamp());
+ assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+ // UPDATE_COORD_JOB_CHANGE
+ query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, cjBean, em);
+ assertEquals(query.getParameterValue("endTime"), cjBean.getEndTimestamp());
+ assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+ assertEquals(query.getParameterValue("pending"), cjBean.isPending() ? 1 : 0);
+ assertEquals(query.getParameterValue("doneMaterialization"), cjBean.isDoneMaterialization() ? 1 : 0);
+ assertEquals(query.getParameterValue("concurrency"), cjBean.getConcurrency());
+ assertEquals(query.getParameterValue("pauseTime"), cjBean.getPauseTimestamp());
+ assertEquals(query.getParameterValue("lastActionNumber"), cjBean.getLastActionNumber());
+ assertEquals(query.getParameterValue("lastActionTime"), cjBean.getLastActionTimestamp());
+ assertEquals(query.getParameterValue("nextMatdTime"), cjBean.getNextMaterializedTimestamp());
+ assertEquals(query.getParameterValue("id"), cjBean.getId());
+ em.close();
+ }
+
+ public void testExecuteUpdate() throws Exception {
+
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job);
+ CoordinatorJobBean job1 = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
+ assertEquals(job1.getStatus(), CoordinatorJob.Status.RUNNING);
+
+ job1.setStatus(CoordinatorJob.Status.SUCCEEDED);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, job1);
+ CoordinatorJobBean job2 = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job1.getId());
+ assertEquals(job2.getStatus(), CoordinatorJob.Status.SUCCEEDED);
+
+ }
+
+ public void testGet() throws Exception {
+ // TODO
+ }
+
+ public void testGetList() throws Exception {
+ // TODO
+ }
+
+ public void testInsert() throws Exception {
+ CoordinatorJobBean bean = new CoordinatorJobBean();
+ bean.setId("test-oozie");
+ bean.setAppName("testApp");
+ bean.setUser("oozie");
+ CoordJobQueryExecutor.getInstance().insert(bean);
+ CoordinatorJobBean retBean = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, "test-oozie");
+ assertEquals(retBean.getAppName(), "testApp");
+ assertEquals(retBean.getUser(), "oozie");
+ }
+}