You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/09/08 04:47:41 UTC
svn commit: r1520829 [2/6] - in /oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/bundle/
core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java...
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java Sun Sep 8 02:47:39 2013
@@ -30,6 +30,7 @@ import org.apache.oozie.CoordinatorActio
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
+import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
@@ -38,13 +39,15 @@ import org.apache.oozie.command.CommandE
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.SLARegistrationBean;
@@ -66,7 +69,7 @@ public class CoordChangeXCommand extends
private CoordinatorJobBean coordJob;
private JPAService jpaService = null;
private Job.Status prevStatus;
- private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
private List<JsonBean> deleteList = new ArrayList<JsonBean>();
private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
@@ -276,7 +279,13 @@ public class CoordChangeXCommand extends
LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId());
deleteList.add(slaSummaryBean);
}
- deleteList.add(bean);
+ if (bean.getStatus() == CoordinatorAction.Status.WAITING
+ || bean.getStatus() == CoordinatorAction.Status.READY) {
+ deleteList.add(bean);
+ }
+ else {
+ throw new CommandException(ErrorCode.E1022, bean.getId());
+ }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
@@ -367,8 +376,8 @@ public class CoordChangeXCommand extends
coordJob.setDoneMaterialization();
}
- updateList.add(coordJob);
- jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false));
+ updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
return null;
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java Sun Sep 8 02:47:39 2013
@@ -30,9 +30,12 @@ import org.apache.oozie.command.CommandE
import org.apache.oozie.command.KillTransitionXCommand;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.dependency.DependencyChecker;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -124,7 +127,7 @@ public class CoordKillXCommand extends K
CoordinatorXCommand.generateEvent(action, coordJob.getUser(), coordJob.getAppName(), null);
}
action.setLastModifiedTime(new Date());
- updateList.add(action);
+ updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,action));
}
@Override
@@ -154,8 +157,6 @@ public class CoordKillXCommand extends K
}
}
coordJob.setDoneMaterialization();
- updateList.add(coordJob);
-
LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
}
@@ -170,13 +171,13 @@ public class CoordKillXCommand extends K
@Override
public void updateJob() throws CommandException {
- updateList.add(coordJob);
+ updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
}
@Override
public void performWrites() throws CommandException {
try {
- jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Sun Sep 8 02:47:39 2013
@@ -39,10 +39,12 @@ import org.apache.oozie.command.Material
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -102,7 +104,7 @@ public class CoordMaterializeTransitionX
*/
@Override
public void updateJob() throws CommandException {
- updateList.add(coordJob);
+ updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE,coordJob));
}
/* (non-Javadoc)
@@ -111,7 +113,7 @@ public class CoordMaterializeTransitionX
@Override
public void performWrites() throws CommandException {
try {
- jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
// register the partition related dependencies of actions
for (JsonBean actionBean : insertList) {
if (actionBean instanceof CoordinatorActionBean) {
@@ -277,7 +279,7 @@ public class CoordMaterializeTransitionX
LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e);
coordJob.setStatus(Job.Status.FAILED);
try {
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, coordJob);
}
catch (JPAExecutorException jex) {
throw new CommandException(ErrorCode.E1011, jex);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java Sun Sep 8 02:47:39 2013
@@ -24,14 +24,12 @@ import org.apache.oozie.command.CommandE
import org.apache.oozie.command.PauseTransitionXCommand;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Services;
import org.apache.oozie.util.LogUtils;
public class CoordPauseXCommand extends PauseTransitionXCommand {
- private final JPAService jpaService = Services.get().get(JPAService.class);
private final CoordinatorJobBean coordJob;
private CoordinatorJob.Status prevStatus = null;
@@ -98,7 +96,7 @@ public class CoordPauseXCommand extends
@Override
public void updateJob() throws CommandException {
try {
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, coordJob);
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Sun Sep 8 02:47:39 2013
@@ -37,8 +37,8 @@ import org.apache.oozie.dependency.Depen
import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
@@ -252,15 +252,17 @@ public class CoordPushDependencyCheckXCo
if (jpaService != null) {
try {
if (isChangeInDependency) {
- jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
- if (EventHandlerService.isEnabled()
- && coordAction.getStatus() != CoordinatorAction.Status.READY) {
- //since event is not to be generated unless action RUNNING via StartX
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, coordAction);
+ if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
+ // since event is not to be generated unless action
+ // RUNNING via StartX
generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
}
}
else {
- jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
}
}
catch (JPAExecutorException jex) {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Sun Sep 8 02:47:39 2013
@@ -42,8 +42,11 @@ import org.apache.oozie.command.RerunTra
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.coord.CoordUtils;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -201,7 +204,7 @@ public class CoordRerunXCommand extends
coordAction.setExternalStatus("");
coordAction.setRerunTime(new Date());
coordAction.setLastModifiedTime(new Date());
- updateList.add(coordAction);
+ updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, coordAction));
writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
}
@@ -379,7 +382,7 @@ public class CoordRerunXCommand extends
coordJob.resetPending();
}
}
- updateList.add(coordJob);
+ updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob));
}
/* (non-Javadoc)
@@ -388,7 +391,7 @@ public class CoordRerunXCommand extends
@Override
public void performWrites() throws CommandException {
try {
- jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
if (EventHandlerService.isEnabled()) {
generateEvents(coordJob);
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java Sun Sep 8 02:47:39 2013
@@ -31,10 +31,13 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.command.ResumeTransitionXCommand;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.command.wf.ResumeXCommand;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.InstrumentUtils;
@@ -104,7 +107,7 @@ public class CoordResumeXCommand extends
coordJob.setLastModifiedTime(new Date());
LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = "
+ coordJob.isPending());
- updateList.add(coordJob);
+ updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
}
@Override
@@ -142,7 +145,8 @@ public class CoordResumeXCommand extends
coordJob.resetPending();
LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = "
+ coordJob.getStatus());
- updateList.add(coordJob);
+ updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME,
+ coordJob));
}
}
}
@@ -159,7 +163,7 @@ public class CoordResumeXCommand extends
@Override
public void performWrites() throws CommandException {
try {
- jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
}
catch (JPAExecutorException e) {
throw new CommandException(e);
@@ -170,7 +174,7 @@ public class CoordResumeXCommand extends
action.setStatus(CoordinatorActionBean.Status.RUNNING);
action.incrementAndGetPending();
action.setLastModifiedTime(new Date());
- updateList.add(action);
+ updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action));
}
@Override
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java Sun Sep 8 02:47:39 2013
@@ -53,7 +53,7 @@ import org.apache.oozie.coord.CoordELEva
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.coord.CoordinatorJobException;
import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.HadoopAccessorException;
@@ -1094,7 +1094,7 @@ public class CoordSubmitXCommand extends
if (!dryrun) {
coordJob.setLastModifiedTime(new Date());
try {
- jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
+ CoordJobQueryExecutor.getInstance().insert(coordJob);
}
catch (JPAExecutorException jpaee) {
coordJob.setId(null);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java Sun Sep 8 02:47:39 2013
@@ -31,9 +31,12 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.command.SuspendTransitionXCommand;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -139,8 +142,8 @@ public class CoordSuspendXCommand extend
coordJob.resetPending();
LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = "
+ coordJob.getStatus());
- updateList.add(coordJob);
- }
+ updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
+ }
}
}
@@ -159,13 +162,13 @@ public class CoordSuspendXCommand extend
coordJob.setLastModifiedTime(new Date());
coordJob.setSuspendedTime(new Date());
LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending());
- updateList.add(coordJob);
+ updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
}
@Override
public void performWrites() throws CommandException {
try {
- jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
}
catch (JPAExecutorException jex) {
throw new CommandException(jex);
@@ -176,7 +179,7 @@ public class CoordSuspendXCommand extend
action.setStatus(CoordinatorActionBean.Status.SUSPENDED);
action.incrementAndGetPending();
action.setLastModifiedTime(new Date());
- updateList.add(action);
+ updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action));
}
@Override
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java Sun Sep 8 02:47:39 2013
@@ -25,7 +25,8 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.command.UnpauseTransitionXCommand;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -113,7 +114,7 @@ public class CoordUnpauseXCommand extend
@Override
public void updateJob() throws CommandException {
try {
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, coordJob);
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java Sun Sep 8 02:47:39 2013
@@ -30,14 +30,16 @@ import org.apache.oozie.action.ActionExe
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.ActionCheckerService;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.EventHandlerService;
@@ -62,7 +64,7 @@ public class ActionCheckXCommand extends
private WorkflowActionBean wfAction = null;
private JPAService jpaService = null;
private ActionExecutor executor = null;
- private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
private boolean generateEvent = false;
public ActionCheckXCommand(String actionId) {
@@ -140,12 +142,13 @@ public class ActionCheckXCommand extends
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) {
- throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr());
+ throw new PreconditionException(ErrorCode.E0815, wfAction.isPending(), wfAction.getStatusStr());
}
if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
wfAction.setLastCheckTime(new Date());
try {
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
+ WorkflowActionQueryExecutor.getInstance().executeUpdate(
+ WorkflowActionQuery.UPDATE_ACTION_FOR_LAST_CHECKED_TIME, wfAction);
}
catch (JPAExecutorException e) {
throw new CommandException(e);
@@ -192,9 +195,10 @@ public class ActionCheckXCommand extends
}
}
wfAction.setLastCheckTime(new Date());
- updateList.add(wfAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
+ wfJob));
}
catch (ActionExecutorException ex) {
LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
@@ -220,14 +224,15 @@ public class ActionCheckXCommand extends
break;
}
wfAction.setLastCheckTime(new Date());
- updateList = new ArrayList<JsonBean>();
- updateList.add(wfAction);
+ updateList = new ArrayList<UpdateEntry>();
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
+ wfJob));
}
finally {
try {
- jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
if (generateEvent && EventHandlerService.isEnabled()) {
generateEvent(wfAction, wfJob.getUser());
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java Sun Sep 8 02:47:39 2013
@@ -39,10 +39,13 @@ import org.apache.oozie.client.SLAEvent.
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -65,7 +68,7 @@ public class ActionEndXCommand extends A
private WorkflowActionBean wfAction = null;
private JPAService jpaService = null;
private ActionExecutor executor = null;
- private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
public ActionEndXCommand(String actionId, String type) {
@@ -125,7 +128,7 @@ public class ActionEndXCommand extends A
}
}
else {
- throw new PreconditionException(ErrorCode.E0812, wfAction.getPending(), wfAction.getStatusStr());
+ throw new PreconditionException(ErrorCode.E0812, wfAction.isPending(), wfAction.getStatusStr());
}
executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
@@ -211,16 +214,16 @@ public class ActionEndXCommand extends A
if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
LOG.debug("Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
- + ", Set pending=" + wfAction.getPending());
+ + ", Set pending=" + wfAction.isPending());
if(slaEvent != null) {
insertList.add(slaEvent);
}
queue(new SignalXCommand(jobId, actionId));
}
}
- updateList.add(wfAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
}
catch (ActionExecutorException ex) {
LOG.warn(
@@ -255,13 +258,13 @@ public class ActionEndXCommand extends A
DagELFunctions.setActionInfo(wfInstance, wfAction);
wfJob.setWorkflowInstance(wfInstance);
- updateList.add(wfAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
}
finally {
try {
- jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
generateEvent(wfAction, wfJob.getUser());
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java Sun Sep 8 02:47:39 2013
@@ -31,10 +31,13 @@ import org.apache.oozie.client.SLAEvent.
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.control.ControlNodeActionExecutor;
@@ -58,7 +61,7 @@ public class ActionKillXCommand extends
private WorkflowJobBean wfJob;
private WorkflowActionBean wfAction;
private JPAService jpaService = null;
- private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
public ActionKillXCommand(String actionId, String type) {
@@ -137,9 +140,9 @@ public class ActionKillXCommand extends
wfAction.setStatus(WorkflowActionBean.Status.KILLED);
wfAction.setEndTime(new Date());
- updateList.add(wfAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END, wfAction));
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_MODTIME, wfJob));
// Add SLA status event (KILLED) for WF_ACTION
SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
SlaAppType.WORKFLOW_ACTION);
@@ -156,9 +159,9 @@ public class ActionKillXCommand extends
wfAction.setEndTime(new Date());
wfJob.setStatus(WorkflowJobBean.Status.KILLED);
- updateList.add(wfAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END, wfAction));
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob));
// What will happen to WF and COORD_ACTION, NOTIFICATION?
SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
SlaAppType.WORKFLOW_ACTION);
@@ -170,7 +173,7 @@ public class ActionKillXCommand extends
}
finally {
try {
- jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
generateEvent(wfAction, wfJob.getUser());
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Sun Sep 8 02:47:39 2013
@@ -40,10 +40,13 @@ import org.apache.oozie.client.SLAEvent.
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -70,7 +73,7 @@ public class ActionStartXCommand extends
private WorkflowActionBean wfAction = null;
private JPAService jpaService = null;
private ActionExecutor executor = null;
- private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
public ActionStartXCommand(String actionId, String type) {
@@ -127,7 +130,7 @@ public class ActionStartXCommand extends
}
}
else {
- throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr());
+ throw new PreconditionException(ErrorCode.E0816, wfAction.isPending(), wfAction.getStatusStr());
}
executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
@@ -242,9 +245,9 @@ public class ActionStartXCommand extends
LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
- updateList.add(wfAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
// Add SLA status event (STARTED) for WF_ACTION
SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
SlaAppType.WORKFLOW_ACTION);
@@ -295,13 +298,13 @@ public class ActionStartXCommand extends
}
break;
}
- updateList.add(wfAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
}
finally {
try {
- jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
generateEvent(wfAction, wfJob.getUser());
}
@@ -319,9 +322,9 @@ public class ActionStartXCommand extends
private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
throws CommandException {
failJob(context);
- updateList.add(wfAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
Status.FAILED, SlaAppType.WORKFLOW_ACTION);
if(slaEvent1 != null) {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java Sun Sep 8 02:47:39 2013
@@ -29,10 +29,13 @@ import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -61,7 +64,7 @@ public class KillXCommand extends Workfl
private List<WorkflowActionBean> actionList;
private ActionService actionService;
private JPAService jpaService = null;
- private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
public KillXCommand(String wfId) {
@@ -141,8 +144,7 @@ public class KillXCommand extends Workfl
|| action.getStatus() == WorkflowActionBean.Status.DONE) {
action.setPending();
action.setStatus(WorkflowActionBean.Status.KILLED);
-
- updateList.add(action);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
queue(new ActionKillXCommand(action.getId(), action.getType()));
}
@@ -160,7 +162,7 @@ public class KillXCommand extends Workfl
if(slaEvent != null) {
insertList.add(slaEvent);
}
- updateList.add(action);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
if (EventHandlerService.isEnabled()
&& !(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) {
generateEvent(action, wfJob.getUser());
@@ -168,8 +170,8 @@ public class KillXCommand extends Workfl
}
}
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
- jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, wfJob));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
if (EventHandlerService.isEnabled()) {
generateEvent(wfJob);
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java Sun Sep 8 02:47:39 2013
@@ -43,10 +43,12 @@ import org.apache.oozie.client.WorkflowJ
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
@@ -87,7 +89,7 @@ public class ReRunXCommand extends Workf
private WorkflowJobBean wfBean;
private List<WorkflowActionBean> actions;
private JPAService jpaService;
- private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
private List<JsonBean> deleteList = new ArrayList<JsonBean>();
private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
@@ -220,9 +222,9 @@ public class ReRunXCommand extends Workf
try {
wfBean.setLastModifiedTime(new Date());
- updateList.add(wfBean);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_RERUN, wfBean));
// call JPAExecutor to do the bulk writes
- jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
}
catch (JPAExecutorException je) {
throw new CommandException(je);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java Sun Sep 8 02:47:39 2013
@@ -32,14 +32,16 @@ import org.apache.oozie.action.control.J
import org.apache.oozie.action.control.KillActionExecutor;
import org.apache.oozie.action.control.StartActionExecutor;
import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.JPAService;
@@ -56,7 +58,7 @@ public class ResumeXCommand extends Work
private String id;
private JPAService jpaService = null;
private WorkflowJobBean workflow = null;
- private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
public ResumeXCommand(String id) {
super("resume", "resume", 1);
@@ -81,7 +83,8 @@ public class ResumeXCommand extends Work
// START_MANUAL or END_RETRY or END_MANUAL
if (action.isRetryOrManual()) {
action.setPendingOnly();
- updateList.add(action);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(
+ WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
}
if (action.isPending()) {
@@ -127,8 +130,9 @@ public class ResumeXCommand extends Work
}
workflow.setLastModifiedTime(new Date());
- updateList.add(workflow);
- jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, workflow));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
if (EventHandlerService.isEnabled()) {
generateEvent(workflow);
}
@@ -189,7 +193,8 @@ public class ResumeXCommand extends Work
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED) {
- throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr() + " is not SUSPENDED");
+ throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr()
+ + " is not SUSPENDED");
}
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Sun Sep 8 02:47:39 2013
@@ -31,10 +31,13 @@ import org.apache.oozie.XException;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.ELService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -70,7 +73,7 @@ public class SignalXCommand extends Work
private String actionId;
private WorkflowJobBean wfJob;
private WorkflowActionBean wfAction;
- private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
private boolean generateEvent = false;
private String wfJobErrorCode;
@@ -188,7 +191,8 @@ public class SignalXCommand extends Work
wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
queue(new NotificationXCommand(wfJob, wfAction));
}
- updateList.add(wfAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS,
+ wfAction));
WorkflowInstance.Status endStatus = workflowInstance.getStatus();
if (endStatus != initialStatus) {
generateEvent = true;
@@ -204,7 +208,8 @@ public class SignalXCommand extends Work
actionToKill.setPending();
actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
- updateList.add(actionToKill);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(
+ WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToKill));
queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
}
@@ -223,7 +228,8 @@ public class SignalXCommand extends Work
if(slaEvent != null) {
insertList.add(slaEvent);
}
- updateList.add(actionToFail);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(
+ WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToFail));
}
}
catch (JPAExecutorException je) {
@@ -268,15 +274,18 @@ public class SignalXCommand extends Work
try {
String tmpNodeConf = nodeDef.getConf();
String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
- LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
- jobId, actionId, tmpNodeConf, actionConf);
+ LOG.debug(
+ "Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], " +
+ "after resolve [{3}]",
+ jobId, actionId, tmpNodeConf, actionConf);
if (wfAction.getErrorCode() != null) {
wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
}
else {
wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
}
- updateList.add(wfAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(
+ WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS_ERROR, wfAction));
}
catch (Exception ex) {
LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
@@ -301,7 +310,7 @@ public class SignalXCommand extends Work
oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId()));
oldAction.setPending();
- updateList.add(oldAction);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING, oldAction));
queue(new SignalXCommand(jobId, oldAction.getId()));
}
@@ -320,7 +329,8 @@ public class SignalXCommand extends Work
.getDefinition(), wfJob.getConf());
newAction.setSlaXml(actionSlaXml);
insertList.add(newAction);
- LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred());
+ LOG.debug("SignalXCommand: Name: " + newAction.getName() + ", Id: " + newAction.getId()
+ + ", Authcode:" + newAction.getCred());
queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
}
}
@@ -332,9 +342,10 @@ public class SignalXCommand extends Work
try {
wfJob.setLastModifiedTime(new Date());
- updateList.add(wfJob);
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob));
// call JPAExecutor to do the bulk writes
- jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
if (generateEvent && EventHandlerService.isEnabled()) {
generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java Sun Sep 8 02:47:39 2013
@@ -42,7 +42,7 @@ import org.apache.oozie.util.ParamChecke
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.command.CommandException;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.ELService;
import org.apache.oozie.store.StoreException;
@@ -225,7 +225,7 @@ public class SubmitXCommand extends Work
JPAService jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
try {
- jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
}
catch (JPAExecutorException je) {
throw new CommandException(je);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java Sun Sep 8 02:47:39 2013
@@ -25,13 +25,15 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -46,7 +48,7 @@ public class SuspendXCommand extends Wor
private final String wfid;
private WorkflowJobBean wfJobBean;
private JPAService jpaService;
- private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
public SuspendXCommand(String id) {
super("suspend", "suspend", 1);
@@ -59,8 +61,9 @@ public class SuspendXCommand extends Wor
try {
suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList);
this.wfJobBean.setLastModifiedTime(new Date());
- updateList.add(this.wfJobBean);
- jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
+ this.wfJobBean));
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
queue(new NotificationXCommand(this.wfJobBean));
}
catch (WorkflowException e) {
@@ -87,7 +90,7 @@ public class SuspendXCommand extends Wor
* @throws CommandException thrown if unable set pending false for actions
*/
public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id,
- String actionId, List<JsonBean> updateList) throws WorkflowException, CommandException {
+ String actionId, List<UpdateEntry> updateList) throws WorkflowException, CommandException {
if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
workflow.getWorkflowInstance().suspend();
WorkflowInstance wfInstance = workflow.getWorkflowInstance();
@@ -112,7 +115,7 @@ public class SuspendXCommand extends Wor
* @throws CommandException thrown if failed to update workflow action
*/
private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId,
- List<JsonBean> updateList) throws CommandException {
+ List<UpdateEntry> updateList) throws CommandException {
List<WorkflowActionBean> actions;
try {
actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id));
@@ -128,7 +131,8 @@ public class SuspendXCommand extends Wor
if (updateList != null) { // will be null when suspendJob
// invoked statically via
// handleNonTransient()
- updateList.add(action);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(
+ WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
}
}
}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BatchQueryExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BatchQueryExecutor.java?rev=1520829&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BatchQueryExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BatchQueryExecutor.java Sun Sep 8 02:47:39 2013
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
+import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.JPAService.QueryEntry;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.sla.SLASummaryBean;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Query Executor that provides API to run multiple update/insert queries in one
+ * transaction. This guarantees entire change to be rolled back when one of
+ * queries fails.
+ */
+public class BatchQueryExecutor {
+
+ private static BatchQueryExecutor instance = new BatchQueryExecutor();
+ private static JPAService jpaService;
+
+ public static class UpdateEntry<E extends Enum<E>> {
+ E namedQuery;
+ JsonBean bean;
+
+ public UpdateEntry(E namedQuery, JsonBean bean) {
+ this.bean = bean;
+ this.namedQuery = namedQuery;
+ }
+
+ public JsonBean getBean() {
+ return this.bean;
+ }
+
+ public E getQueryName() {
+ return this.namedQuery;
+ }
+ }
+
+ private BatchQueryExecutor() {
+ Services services = Services.get();
+ if (services != null) {
+ jpaService = services.get(JPAService.class);
+ }
+ }
+
+ public static BatchQueryExecutor getInstance() {
+ if (instance == null) {
+ instance = new BatchQueryExecutor();
+ }
+ return BatchQueryExecutor.instance;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList, Collection<UpdateEntry> updateList,
+ Collection<JsonBean> deleteList) throws JPAExecutorException {
+ List<QueryEntry> queryList = new ArrayList<QueryEntry>();
+ EntityManager em = jpaService.getEntityManager();
+
+ if (updateList != null) {
+ for (UpdateEntry entry : updateList) {
+ Query query = null;
+ JsonBean bean = entry.getBean();
+ if (bean instanceof WorkflowJobBean) {
+ query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery(
+ (WorkflowJobQuery) entry.getQueryName(), (WorkflowJobBean) entry.getBean(), em);
+ }
+ else if (bean instanceof WorkflowActionBean) {
+ query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery(
+ (WorkflowActionQuery) entry.getQueryName(), (WorkflowActionBean) entry.getBean(), em);
+ }
+ else if (bean instanceof CoordinatorJobBean) {
+ query = CoordJobQueryExecutor.getInstance().getUpdateQuery((CoordJobQuery) entry.getQueryName(),
+ (CoordinatorJobBean) entry.getBean(), em);
+ }
+ else if (bean instanceof CoordinatorActionBean) {
+ query = CoordActionQueryExecutor.getInstance().getUpdateQuery(
+ (CoordActionQuery) entry.getQueryName(), (CoordinatorActionBean) entry.getBean(), em);
+ }
+ else if (bean instanceof BundleJobBean) {
+ query = BundleJobQueryExecutor.getInstance().getUpdateQuery((BundleJobQuery) entry.getQueryName(),
+ (BundleJobBean) entry.getBean(), em);
+ }
+ else if (bean instanceof BundleActionBean) {
+ query = BundleActionQueryExecutor.getInstance().getUpdateQuery(
+ (BundleActionQuery) entry.getQueryName(), (BundleActionBean) entry.getBean(), em);
+ }
+ else if (bean instanceof SLARegistrationBean) {
+ query = SLARegistrationQueryExecutor.getInstance().getUpdateQuery(
+ (SLARegQuery) entry.getQueryName(), (SLARegistrationBean) entry.getBean(), em);
+ }
+ else if (bean instanceof SLASummaryBean) {
+ query = SLASummaryQueryExecutor.getInstance().getUpdateQuery(
+ (SLASummaryQuery) entry.getQueryName(), (SLASummaryBean) entry.getBean(), em);
+ }
+ else {
+ throw new JPAExecutorException(ErrorCode.E0603, "BatchQueryExecutor faield to construct a query");
+ }
+ queryList.add(new QueryEntry(entry.getQueryName(), query));
+ }
+ }
+ jpaService.executeBatchInsertUpdateDelete(insertList, queryList, deleteList, em);
+ }
+
+ @VisibleForTesting
+ public static void destroy() {
+ if (instance != null) {
+ jpaService = null;
+ instance = null;
+ }
+ }
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java?rev=1520829&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java Sun Sep 8 02:47:39 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.List;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Query Executor that provides API to run query for Bundle Action
+ */
+public class BundleActionQueryExecutor extends
+ QueryExecutor<BundleActionBean, BundleActionQueryExecutor.BundleActionQuery> {
+
+ public enum BundleActionQuery {
+ UPDATE_BUNDLE_ACTION_PENDING_MODTIME,
+ UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME,
+ UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID,
+ GET_BUNDLE_ACTION
+ };
+
+ private static BundleActionQueryExecutor instance = new BundleActionQueryExecutor();
+ private static JPAService jpaService;
+
+ private BundleActionQueryExecutor() {
+ Services services = Services.get();
+ if (services != null) {
+ jpaService = services.get(JPAService.class);
+ }
+ }
+
+ public static QueryExecutor<BundleActionBean, BundleActionQueryExecutor.BundleActionQuery> getInstance() {
+ if (instance == null) {
+ // It will not be null in normal execution. Required for testcase as
+ // they reinstantiate JPAService everytime
+ instance = new BundleActionQueryExecutor();
+ }
+ return BundleActionQueryExecutor.instance;
+ }
+
+ @Override
+ public Query getUpdateQuery(BundleActionQuery namedQuery, BundleActionBean baBean, EntityManager em)
+ throws JPAExecutorException {
+
+ Query query = em.createNamedQuery(namedQuery.name());
+ switch (namedQuery) {
+ case UPDATE_BUNDLE_ACTION_PENDING_MODTIME:
+ query.setParameter("lastModifiedTimestamp", baBean.getLastModifiedTimestamp());
+ query.setParameter("pending", baBean.getPending());
+ query.setParameter("bundleActionId", baBean.getBundleActionId());
+ break;
+ case UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME:
+ query.setParameter("status", baBean.getStatusStr());
+ query.setParameter("lastModifiedTimestamp", baBean.getLastModifiedTimestamp());
+ query.setParameter("pending", baBean.getPending());
+ query.setParameter("bundleActionId", baBean.getBundleActionId());
+ break;
+ case UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID:
+ query.setParameter("status", baBean.getStatusStr());
+ query.setParameter("lastModifiedTimestamp", baBean.getLastModifiedTimestamp());
+ query.setParameter("pending", baBean.getPending());
+ query.setParameter("coordId", baBean.getCoordId());
+ query.setParameter("bundleActionId", baBean.getBundleActionId());
+ break;
+ default:
+ throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ + namedQuery.name());
+ }
+ return query;
+ }
+
+ @Override
+ public Query getSelectQuery(BundleActionQuery namedQuery, EntityManager em, Object... parameters)
+ throws JPAExecutorException {
+ Query query = em.createNamedQuery(namedQuery.name());
+ switch (namedQuery) {
+ case GET_BUNDLE_ACTION:
+ query.setParameter("bundleActionId", parameters[0]);
+ break;
+ default:
+ throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ + namedQuery.name());
+ }
+ return query;
+ }
+
+ @Override
+ public int executeUpdate(BundleActionQuery namedQuery, BundleActionBean jobBean) throws JPAExecutorException {
+ EntityManager em = jpaService.getEntityManager();
+ Query query = getUpdateQuery(namedQuery, jobBean, em);
+ int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
+ return ret;
+ }
+
+ @Override
+ public BundleActionBean get(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
+ EntityManager em = jpaService.getEntityManager();
+ Query query = getSelectQuery(namedQuery, em, parameters);
+ BundleActionBean bean = (BundleActionBean) jpaService.executeGet(namedQuery.name(), query, em);
+ if (bean == null) {
+ throw new JPAExecutorException(ErrorCode.E0604, query.toString());
+ }
+ return bean;
+ }
+
+ @Override
+ public List<BundleActionBean> getList(BundleActionQuery namedQuery, Object... parameters)
+ throws JPAExecutorException {
+ // TODO
+ return null;
+ }
+
+ @VisibleForTesting
+ public static void destroy() {
+ if (instance != null) {
+ jpaService = null;
+ instance = null;
+ }
+ }
+}