You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/08/22 21:32:05 UTC
svn commit: r1376204 [1/4] - in /incubator/oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/bundle/
core/src/main/java/org/apache/oozie/command/coord/ core/src...
Author: virag
Date: Wed Aug 22 19:32:02 2012
New Revision: 1376204
URL: http://svn.apache.org/viewvc?rev=1376204&view=rev
Log:
OOZIE-914 Make sure all commands do their JPA writes within a single JPA executor (mona via virag)
Added:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkDeleteForPurgeJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStartJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStatusJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkDeleteForPurgeJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateDeleteJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStartJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStatusJPAExecutor.java
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleActionBean.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/KillTransitionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/RerunTransitionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/ResumeTransitionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/StartTransitionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/SuspendTransitionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionsDeleteForPurgeJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobDeleteJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdateForStartJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteForPurgeJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionByActionNumberJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsDeleteForPurgeJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/store/SLAStore.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/db/SLADbOperations.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/db/SLADbXOperations.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionsDeleteForPurgeJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobDeleteJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForStartJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionsDeleteForPurgeJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionsDeleteForPurgeJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
incubator/oozie/trunk/release-log.txt
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleActionBean.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleActionBean.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleActionBean.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleActionBean.java Wed Aug 22 19:32:02 2012
@@ -35,9 +35,10 @@ import javax.persistence.Table;
import org.apache.hadoop.io.Writable;
import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.WritableUtils;
-import org.apache.openjpa.persistence.jdbc.Index;
+import org.json.simple.JSONObject;
@Entity
@Table(name = "BUNDLE_ACTIONS")
@@ -72,7 +73,7 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_BUNDLE_ACTIONS_OLDER_THAN", query = "select OBJECT(w) from BundleActionBean w order by w.lastModifiedTimestamp"),
@NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId = :bundleId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED' OR a.status = 'DONEWITHERROR')")})
-public class BundleActionBean implements Writable {
+public class BundleActionBean implements Writable, JsonBean {
@Id
@Column(name = "bundle_action_id")
@@ -364,4 +365,14 @@ public class BundleActionBean implements
setLastModifiedTime(new Date(d));
}
}
+
+ @Override
+ public JSONObject toJSONObject() {
+ return null;
+ }
+
+ @Override
+ public JSONObject toJSONObject(String timeZoneId) {
+ return null;
+ }
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/KillTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/KillTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/KillTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/KillTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -51,6 +51,7 @@ public abstract class KillTransitionXCom
transitToNext();
killChildren();
updateJob();
+ performWrites();
}
finally {
notifyParent();
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -71,6 +71,7 @@ public abstract class MaterializeTransit
try {
materialize();
updateJob();
+ performWrites();
} finally {
notifyParent();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/RerunTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/RerunTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/RerunTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/RerunTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -18,7 +18,6 @@
package org.apache.oozie.command;
import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.util.StatusUtils;
@@ -96,6 +95,7 @@ public abstract class RerunTransitionXCo
transitToNext();
rerunChildren();
updateJob();
+ performWrites();
}
finally {
notifyParent();
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/ResumeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/ResumeTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/ResumeTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/ResumeTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -67,6 +67,7 @@ public abstract class ResumeTransitionXC
try {
resumeChildren();
updateJob();
+ performWrites();
} finally {
notifyParent();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/StartTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/StartTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/StartTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/StartTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -79,6 +79,7 @@ public abstract class StartTransitionXCo
transitToNext();
updateJob();
StartChildren();
+ performWrites();
notifyParent();
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/SuspendTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/SuspendTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/SuspendTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/SuspendTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -73,6 +73,7 @@ public abstract class SuspendTransitionX
try {
suspendChildren();
updateJob();
+ performWrites();
} finally {
notifyParent();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,7 +17,11 @@
*/
package org.apache.oozie.command;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.oozie.client.Job;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.util.ParamChecker;
/**
@@ -29,6 +33,8 @@ import org.apache.oozie.util.ParamChecke
public abstract class TransitionXCommand<T> extends XCommand<T> {
protected Job job;
+ protected List<JsonBean> updateList = new ArrayList<JsonBean>();
+ protected List<JsonBean> insertList = new ArrayList<JsonBean>();
public TransitionXCommand(String name, String type, int priority) {
super(name, type, priority);
@@ -59,6 +65,13 @@ public abstract class TransitionXCommand
*/
public abstract void notifyParent() throws CommandException;
+ /**
+ * This will be used to perform atomically all the writes within this command.
+ *
+ * @throws CommandException
+ */
+ public abstract void performWrites() throws CommandException;
+
/* (non-Javadoc)
* @see org.apache.oozie.command.XCommand#execute()
*/
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.command.bundle;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@@ -29,14 +30,14 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.XCommand;
import org.apache.oozie.command.coord.CoordChangeXCommand;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
@@ -55,6 +56,7 @@ public class BundleJobChangeXCommand ext
private Date newEndTime = null;
boolean isChangePauseTime = false;
boolean isChangeEndTime = false;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
static {
@@ -179,10 +181,11 @@ public class BundleJobChangeXCommand ext
LOG.info("Queuing CoordChangeXCommand coord job = " + action.getCoordId() + " to change "
+ changeValue);
action.setPending(action.getPending() + 1);
- jpaService.execute(new BundleActionUpdateJPAExecutor(action));
+ updateList.add(action);
}
}
- jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
+ updateList.add(bundleJob);
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
}
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java Wed Aug 22 19:32:02 2012
@@ -23,16 +23,14 @@ import java.util.List;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.ErrorCode;
-import org.apache.oozie.XException;
import org.apache.oozie.client.Job;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.ResumeTransitionXCommand;
import org.apache.oozie.command.coord.CoordResumeXCommand;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -61,33 +59,28 @@ public class BundleJobResumeXCommand ext
* @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren()
*/
@Override
- public void resumeChildren() throws CommandException {
- try {
- for (BundleActionBean action : bundleActions) {
- if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.SUSPENDEDWITHERROR || action.getStatus() == Job.Status.PREPSUSPENDED) {
- // queue a CoordResumeXCommand
- if (action.getCoordId() != null) {
- queue(new CoordResumeXCommand(action.getCoordId()));
- updateBundleAction(action);
- LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]",
- action.getBundleActionId(), action.getStatus(), action.getPending(), action
- .getCoordId());
- }
- else {
- updateBundleAction(action);
- LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
- action.getBundleActionId(), action.getStatus(), action.getPending());
- }
+ public void resumeChildren() {
+ for (BundleActionBean action : bundleActions) {
+ if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.SUSPENDEDWITHERROR || action.getStatus() == Job.Status.PREPSUSPENDED) {
+ // queue a CoordResumeXCommand
+ if (action.getCoordId() != null) {
+ queue(new CoordResumeXCommand(action.getCoordId()));
+ updateBundleAction(action);
+ LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]",
+ action.getBundleActionId(), action.getStatus(), action.getPending(), action
+ .getCoordId());
+ }
+ else {
+ updateBundleAction(action);
+ LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
+ action.getBundleActionId(), action.getStatus(), action.getPending());
}
}
- LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId);
- }
- catch (XException ex) {
- throw new CommandException(ex);
}
+ LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId);
}
- private void updateBundleAction(BundleActionBean action) throws CommandException {
+ private void updateBundleAction(BundleActionBean action) {
if (action.getStatus() == Job.Status.PREPSUSPENDED) {
action.setStatus(Job.Status.PREP);
}
@@ -99,12 +92,7 @@ public class BundleJobResumeXCommand ext
}
action.incrementAndGetPending();
action.setLastModifiedTime(new Date());
- try {
- jpaService.execute(new BundleActionUpdateJPAExecutor(action));
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ updateList.add(action);
}
/* (non-Javadoc)
@@ -119,13 +107,21 @@ public class BundleJobResumeXCommand ext
* @see org.apache.oozie.command.TransitionXCommand#updateJob()
*/
@Override
- public void updateJob() throws CommandException {
+ public void updateJob() {
InstrumentUtils.incrJobCounter("bundle_resume", 1, null);
bundleJob.setSuspendedTime(null);
bundleJob.setLastModifiedTime(new Date());
LOG.debug("Resume bundle job id = " + bundleId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
+ updateList.add(bundleJob);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites()
+ */
+ @Override
+ public void performWrites() throws CommandException {
try {
- jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java Wed Aug 22 19:32:02 2012
@@ -23,16 +23,14 @@ import java.util.List;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.ErrorCode;
-import org.apache.oozie.XException;
import org.apache.oozie.client.Job;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.SuspendTransitionXCommand;
import org.apache.oozie.command.coord.CoordSuspendXCommand;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -74,6 +72,19 @@ public class BundleJobSuspendXCommand ex
}
/* (non-Javadoc)
+ * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites()
+ */
+ @Override
+ public void performWrites() throws CommandException {
+ try {
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ }
+
+ /* (non-Javadoc)
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
@@ -133,49 +144,39 @@ public class BundleJobSuspendXCommand ex
* @see org.apache.oozie.command.TransitionXCommand#updateJob()
*/
@Override
- public void updateJob() throws CommandException {
+ public void updateJob() {
InstrumentUtils.incrJobCounter("bundle_suspend", 1, null);
bundleJob.setSuspendedTime(new Date());
bundleJob.setLastModifiedTime(new Date());
LOG.debug("Suspend bundle job id = " + jobId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
- try {
- jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ updateList.add(bundleJob);
}
@Override
public void suspendChildren() throws CommandException {
- try {
- for (BundleActionBean action : this.bundleActions) {
- if (action.getStatus() == Job.Status.RUNNING || action.getStatus() == Job.Status.RUNNINGWITHERROR
- || action.getStatus() == Job.Status.PREP || action.getStatus() == Job.Status.PAUSED
- || action.getStatus() == Job.Status.PAUSEDWITHERROR) {
- // queue a CoordSuspendXCommand
- if (action.getCoordId() != null) {
- queue(new CoordSuspendXCommand(action.getCoordId()));
- updateBundleAction(action);
- LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordSuspendXCommand for [{3}]",
- action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
- } else {
- updateBundleAction(action);
- LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
- action.getBundleActionId(), action.getStatus(), action.getPending());
- }
-
+ for (BundleActionBean action : this.bundleActions) {
+ if (action.getStatus() == Job.Status.RUNNING || action.getStatus() == Job.Status.RUNNINGWITHERROR
+ || action.getStatus() == Job.Status.PREP || action.getStatus() == Job.Status.PAUSED
+ || action.getStatus() == Job.Status.PAUSEDWITHERROR) {
+ // queue a CoordSuspendXCommand
+ if (action.getCoordId() != null) {
+ queue(new CoordSuspendXCommand(action.getCoordId()));
+ updateBundleAction(action);
+ LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordSuspendXCommand for [{3}]",
+ action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
+ } else {
+ updateBundleAction(action);
+ LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
+ action.getBundleActionId(), action.getStatus(), action.getPending());
}
+
}
- LOG.debug("Suspended bundle actions for the bundle=[{0}]", jobId);
- }
- catch (XException ex) {
- throw new CommandException(ex);
}
+ LOG.debug("Suspended bundle actions for the bundle=[{0}]", jobId);
}
- private void updateBundleAction(BundleActionBean action) throws CommandException {
+ private void updateBundleAction(BundleActionBean action) {
if (action.getStatus() == Job.Status.PREP) {
action.setStatus(Job.Status.PREPSUSPENDED);
}
@@ -194,11 +195,6 @@ public class BundleJobSuspendXCommand ex
action.incrementAndGetPending();
action.setLastModifiedTime(new Date());
- try {
- jpaService.execute(new BundleActionUpdateJPAExecutor(action));
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ updateList.add(action);
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java Wed Aug 22 19:32:02 2012
@@ -30,10 +30,9 @@ import org.apache.oozie.command.CommandE
import org.apache.oozie.command.KillTransitionXCommand;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.coord.CoordKillXCommand;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -134,16 +133,11 @@ public class BundleKillXCommand extends
* @param action
* @throws CommandException
*/
- private void updateBundleAction(BundleActionBean action) throws CommandException {
+ private void updateBundleAction(BundleActionBean action) {
action.incrementAndGetPending();
action.setLastModifiedTime(new Date());
action.setStatus(Job.Status.KILLED);
- try {
- jpaService.execute(new BundleActionUpdateJPAExecutor(action));
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ updateList.add(action);
}
/* (non-Javadoc)
@@ -165,9 +159,17 @@ public class BundleKillXCommand extends
* @see org.apache.oozie.command.TransitionXCommand#updateJob()
*/
@Override
- public void updateJob() throws CommandException {
+ public void updateJob() {
+ updateList.add(bundleJob);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
+ */
+ @Override
+ public void performWrites() throws CommandException {
try {
- jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java Wed Aug 22 19:32:02 2012
@@ -102,4 +102,8 @@ public class BundlePauseXCommand extends
}
+ @Override
+ public void performWrites() throws CommandException {
+ }
+
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,21 +17,20 @@
*/
package org.apache.oozie.command.bundle;
+import java.util.Collection;
import java.util.List;
-import org.apache.oozie.BundleJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.XCommand;
-import org.apache.oozie.executor.jpa.BundleActionsDeleteForPurgeJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobDeleteJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkDeleteForPurgeJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
-import org.apache.oozie.util.XLog;
/**
* This class is used for bundle purge command
@@ -40,7 +39,7 @@ public class BundlePurgeXCommand extends
private JPAService jpaService = null;
private final int olderThan;
private final int limit;
- private List<BundleJobBean> jobList = null;
+ private List<? extends JsonBean> jobList = null;
public BundlePurgeXCommand(int olderThan, int limit) {
super("bundle_purge", "bundle_purge", 0);
@@ -77,15 +76,11 @@ public class BundlePurgeXCommand extends
int actionDeleted = 0;
if (jobList != null && jobList.size() != 0) {
- for (BundleJobBean bundle : jobList) {
- String jobId = bundle.getId();
- try {
- jpaService.execute(new BundleJobDeleteJPAExecutor(jobId));
- actionDeleted += jpaService.execute(new BundleActionsDeleteForPurgeJPAExecutor(jobId));
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ try {
+ actionDeleted = jpaService.execute(new BulkDeleteForPurgeJPAExecutor((Collection<JsonBean>) jobList));
+ }
+ catch (JPAExecutorException je) {
+ throw new CommandException(je);
}
LOG.debug("ENDED Bundle-Purge deleted jobs :" + jobList.size() + " and actions " + actionDeleted);
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java Wed Aug 22 19:32:02 2012
@@ -32,10 +32,9 @@ import org.apache.oozie.client.rest.Rest
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.RerunTransitionXCommand;
import org.apache.oozie.command.coord.CoordRerunXCommand;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
@@ -195,42 +194,44 @@ public class BundleRerunXCommand extends
* @param action the bundle action
* @throws CommandException thrown if failed to update bundle action
*/
- private void updateBundleAction(BundleActionBean action) throws CommandException {
+ private void updateBundleAction(BundleActionBean action) {
action.incrementAndGetPending();
action.setLastModifiedTime(new Date());
- try {
- jpaService.execute(new BundleActionUpdateJPAExecutor(action));
- }
- catch (JPAExecutorException je) {
- throw new CommandException(je);
- }
+ updateList.add(action);
}
/* (non-Javadoc)
* @see org.apache.oozie.command.TransitionXCommand#updateJob()
*/
@Override
- public void updateJob() throws CommandException {
- try {
- // rerun a paused bundle job will keep job status at paused and pending at previous pending
- if (getPrevStatus() != null) {
- Job.Status bundleJobStatus = getPrevStatus();
- if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
- bundleJob.setStatus(bundleJobStatus);
- if (prevPending) {
- bundleJob.setPending();
- }
- else {
- bundleJob.resetPending();
- }
+ public void updateJob() {
+ // rerun a paused bundle job will keep job status at paused and pending at previous pending
+ if (getPrevStatus() != null) {
+ Job.Status bundleJobStatus = getPrevStatus();
+ if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
+ bundleJob.setStatus(bundleJobStatus);
+ if (prevPending) {
+ bundleJob.setPending();
+ }
+ else {
+ bundleJob.resetPending();
}
}
- jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
- }
- catch (JPAExecutorException je) {
- throw new CommandException(je);
}
+ updateList.add(bundleJob);
+ }
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
+ */
+ @Override
+ public void performWrites() throws CommandException {
+ try {
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
}
/* (non-Javadoc)
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java Wed Aug 22 19:32:02 2012
@@ -32,13 +32,12 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.StartTransitionXCommand;
import org.apache.oozie.command.coord.CoordSubmitXCommand;
-import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -166,6 +165,19 @@ public class BundleStartXCommand extends
public void notifyParent() {
}
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.StartTransitionXCommand#performWrites()
+ */
+ @Override
+ public void performWrites() throws CommandException {
+ try {
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ }
+
/**
* Insert bundle actions
*
@@ -200,26 +212,25 @@ public class BundleStartXCommand extends
throw new CommandException(ErrorCode.E1301, jex);
}
- try {
- // if there is no coordinator for this bundle, failed it.
- if (map.isEmpty()) {
- bundleJob.setStatus(Job.Status.FAILED);
- bundleJob.resetPending();
+ // if there is no coordinator for this bundle, failed it.
+ if (map.isEmpty()) {
+ bundleJob.setStatus(Job.Status.FAILED);
+ bundleJob.resetPending();
+ try {
jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
- LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
- throw new CommandException(ErrorCode.E1318, jobId);
}
-
- for (Entry<String, Boolean> coordName : map.entrySet()) {
- BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
-
- jpaService.execute(new BundleActionInsertJPAExecutor(action));
+ catch (JPAExecutorException jex) {
+ throw new CommandException(jex);
}
- }
- catch (JPAExecutorException je) {
- throw new CommandException(je);
+
+ LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
+ throw new CommandException(ErrorCode.E1318, jobId);
}
+ for (Entry<String, Boolean> coordName : map.entrySet()) {
+ BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
+ insertList.add(action);
+ }
}
else {
throw new CommandException(ErrorCode.E0604, jobId);
@@ -260,8 +271,8 @@ public class BundleStartXCommand extends
queue(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue()));
- updateBundleAction(name.getValue());
}
+ updateBundleAction();
}
catch (JDOMException jex) {
throw new CommandException(ErrorCode.E1301, jex);
@@ -275,11 +286,12 @@ public class BundleStartXCommand extends
}
}
- private void updateBundleAction(String coordName) throws JPAExecutorException {
- BundleActionBean action = jpaService.execute(new BundleActionGetJPAExecutor(jobId, coordName));
- action.incrementAndGetPending();
- action.setLastModifiedTime(new Date());
- jpaService.execute(new BundleActionUpdateJPAExecutor(action));
+ private void updateBundleAction() throws JPAExecutorException {
+ for(JsonBean bAction : insertList) {
+ BundleActionBean action = (BundleActionBean) bAction;
+ action.incrementAndGetPending();
+ action.setLastModifiedTime(new Date());
+ }
}
/**
@@ -348,11 +360,6 @@ public class BundleStartXCommand extends
*/
@Override
public void updateJob() throws CommandException {
- try {
- jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
- }
- catch (JPAExecutorException je) {
- throw new CommandException(je);
- }
+ updateList.add(bundleJob);
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java Wed Aug 22 19:32:02 2012
@@ -20,7 +20,6 @@ package org.apache.oozie.command.bundle;
import java.util.Date;
import org.apache.oozie.BundleActionBean;
-import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
@@ -31,8 +30,6 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.command.StatusUpdateXCommand;
import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java Wed Aug 22 19:32:02 2012
@@ -521,4 +521,8 @@ public class BundleSubmitXCommand extend
@Override
public void updateJob() throws CommandException {
}
+
+ @Override
+ public void performWrites() throws CommandException {
+ }
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java Wed Aug 22 19:32:02 2012
@@ -116,4 +116,8 @@ public class BundleUnpauseXCommand exten
}
+ @Override
+ public void performWrites() throws CommandException {
+ }
+
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java Wed Aug 22 19:32:02 2012
@@ -18,10 +18,13 @@
package org.apache.oozie.command.coord;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.service.JPAService;
@@ -34,8 +37,10 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
@@ -47,6 +52,8 @@ public class CoordActionCheckXCommand ex
private int actionCheckDelay;
private CoordinatorActionBean coordAction = null;
private JPAService jpaService = null;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<JsonBean> insertList = new ArrayList<JsonBean>();
public CoordActionCheckXCommand(String actionId, int actionCheckDelay) {
super("coord_action_check", "coord_action_check", 0);
@@ -88,7 +95,8 @@ public class CoordActionCheckXCommand ex
else {
LOG.warn("Unexpected workflow " + wf.getId() + " STATUS " + wf.getStatus());
coordAction.setLastModifiedTime(new Date());
- jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction));
+ updateList.add(coordAction);
+ jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
return null;
}
}
@@ -97,12 +105,17 @@ public class CoordActionCheckXCommand ex
LOG.debug("Updating Coordintaor actionId :" + coordAction.getId() + "status to ="
+ coordAction.getStatus());
coordAction.setLastModifiedTime(new Date());
- jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction));
+ updateList.add(coordAction);
if (slaStatus != null) {
- SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
+ SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
SlaAppType.COORDINATOR_ACTION, LOG);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
}
+
+ jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
}
catch (XException ex) {
LOG.warn("CoordActionCheckCommand Failed ", ex);
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java Wed Aug 22 19:32:02 2012
@@ -19,18 +19,26 @@ package org.apache.oozie.command.coord;
import java.io.IOException;
import java.io.StringReader;
+import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
+import java.util.List;
import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.SLAEvent.SlaAppType;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.store.CoordinatorStore;
@@ -52,22 +60,31 @@ public class CoordActionMaterializeComma
private final XLog log = XLog.getLog(getClass());
private String user;
private String group;
+ private List<JsonBean> insertList = new ArrayList<JsonBean>();
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
+
/**
* Default timeout for catchup jobs, in minutes, after which coordinator input check will timeout
*/
public static final String CONF_DEFAULT_TIMEOUT_CATCHUP = Service.CONF_PREFIX + "coord.catchup.default.timeout";
public CoordActionMaterializeCommand(String jobId, Date startTime, Date endTime) {
- super("coord_action_mater", "coord_action_mater", 1, XLog.STD);
+ super("coord_action_mater", "coord_action_mater", 1, XLog.STD, false);
this.jobId = jobId;
this.startTime = startTime;
this.endTime = endTime;
}
@Override
- protected Void call(CoordinatorStore store) throws StoreException, CommandException {
- // CoordinatorJobBean job = store.getCoordinatorJob(jobId, true);
- CoordinatorJobBean job = store.getEntityManager().find(CoordinatorJobBean.class, jobId);
+ protected Void call(CoordinatorStore store) throws CommandException {
+ CoordJobGetJPAExecutor getCoordJob = new CoordJobGetJPAExecutor(jobId);
+ CoordinatorJobBean job;
+ try {
+ job = Services.get().get(JPAService.class).execute(getCoordJob);
+ }
+ catch (JPAExecutorException jex) {
+ throw new CommandException(jex);
+ }
setLogInfo(job);
if (job.getLastActionTime() != null && job.getLastActionTime().compareTo(endTime) >= 0) {
log.info("ENDED Coordinator materialization for jobId = " + jobId
@@ -88,7 +105,7 @@ public class CoordActionMaterializeComma
if (job.getStatus() == CoordinatorJob.Status.PREMATER) {
job.setStatus(CoordinatorJob.Status.RUNNING);
}
- store.updateCoordinatorJob(job);
+ updateList.add(job);
return null;
}
@@ -115,7 +132,7 @@ public class CoordActionMaterializeComma
catch (CommandException ex) {
log.warn("Exception occurs:" + ex + " Making the job failed ");
job.setStatus(CoordinatorJobBean.Status.FAILED);
- store.updateCoordinatorJob(job);
+ updateList.add(job);
}
catch (Exception e) {
log.error("Excepion thrown :", e);
@@ -234,8 +251,8 @@ public class CoordActionMaterializeComma
private void storeToDB(CoordinatorActionBean actionBean, String actionXml, CoordinatorStore store) throws Exception {
log.debug("In storeToDB() action Id " + actionBean.getId() + " Size of actionXml " + actionXml.length());
actionBean.setActionXml(actionXml);
- store.insertCoordinatorAction(actionBean);
- writeActionRegistration(actionXml, actionBean, store);
+ insertList.add(actionBean);
+ createActionRegistration(actionXml, actionBean, store);
// TODO: time 100s should be configurable
queueCallable(new CoordActionNotificationXCommand(actionBean), 100);
@@ -248,12 +265,15 @@ public class CoordActionMaterializeComma
* @param store
* @throws Exception
*/
- private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store)
+ private void createActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store)
throws Exception {
Element eAction = XmlUtils.parseXml(actionXml);
Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
- SLADbOperations.writeSlaRegistrationEvent(eSla, store, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user,
- group);
+ SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, store, actionBean.getId(),
+ SlaAppType.COORDINATOR_ACTION, user, group);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
}
/**
@@ -261,7 +281,7 @@ public class CoordActionMaterializeComma
* @param store
* @throws StoreException
*/
- private void updateJobTable(CoordinatorJobBean job, CoordinatorStore store) throws StoreException {
+ private void updateJobTable(CoordinatorJobBean job, CoordinatorStore store) {
// TODO: why do we need this? Isn't lastMatTime enough???
job.setLastActionTime(endTime);
job.setLastActionNumber(lastActionNumber);
@@ -278,7 +298,7 @@ public class CoordActionMaterializeComma
log.info("[" + job.getId() + "]: Update status from PREMATER to RUNNING");
}
job.setNextMaterializedTime(endTime);
- store.updateCoordinatorJob(job);
+ updateList.add(job);
}
@Override
@@ -288,6 +308,18 @@ public class CoordActionMaterializeComma
try {
if (lock(jobId)) {
call(store);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ if (jpaService != null) {
+ try {
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ }
+ catch (JPAExecutorException je) {
+ throw new CommandException(je);
+ }
+ }
+ else {
+ throw new CommandException(ErrorCode.E0610);
+ }
}
else {
queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime),
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java Wed Aug 22 19:32:02 2012
@@ -25,6 +25,7 @@ import org.apache.oozie.CoordinatorActio
import org.apache.oozie.DagEngineException;
import org.apache.oozie.DagEngine;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
@@ -40,15 +41,18 @@ import org.apache.oozie.util.XConfigurat
import org.apache.oozie.util.db.SLADbOperations;
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStartJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
-
import org.jdom.Element;
import org.jdom.JDOMException;
import java.io.IOException;
import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
public class CoordActionStartXCommand extends CoordinatorXCommand<Void> {
@@ -65,6 +69,8 @@ public class CoordActionStartXCommand ex
private CoordinatorActionBean coordAction = null;
private JPAService jpaService = null;
private String jobId = null;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<JsonBean> insertList = new ArrayList<JsonBean>();
public CoordActionStartXCommand(String id, String user, String token, String jobId) {
//super("coord_action_start", "coord_action_start", 1, XLog.OPS);
@@ -160,8 +166,11 @@ public class CoordActionStartXCommand ex
try {
boolean startJob = true;
Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
- SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
- SlaAppType.COORDINATOR_ACTION, log);
+ SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
+ SlaAppType.COORDINATOR_ACTION, log);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
// Normalize workflow appPath here;
JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
@@ -176,8 +185,15 @@ public class CoordActionStartXCommand ex
log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId);
WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId));
wfJob.setParentId(actionId);
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
- jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForStartJPAExecutor(coordAction));
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
+ updateList.add(coordAction);
+ try {
+ jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
+ }
+ catch (JPAExecutorException je) {
+ throw new CommandException(je);
+ }
}
else {
log.error(ErrorCode.E0610);
@@ -215,20 +231,22 @@ public class CoordActionStartXCommand ex
coordAction.setErrorMessage(errMsg);
coordAction.setErrorCode(errCode);
- JPAService jpaService = Services.get().get(JPAService.class);
- if (jpaService != null) {
- try {
- jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForStartJPAExecutor(coordAction));
- }
- catch (JPAExecutorException je) {
- throw new CommandException(je);
- }
+ updateList = new ArrayList<JsonBean>();
+ updateList.add(coordAction);
+ insertList = new ArrayList<JsonBean>();
+
+ SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED,
+ SlaAppType.COORDINATOR_ACTION, log);
+ if(slaEvent != null) {
+ insertList.add(slaEvent); //Update SLA events
+ }
+ try {
+ // call JPAExecutor to do the bulk writes
+ jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
}
- else {
- log.error(ErrorCode.E0610);
+ catch (JPAExecutorException je) {
+ throw new CommandException(je);
}
- SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED,
- SlaAppType.COORDINATOR_ACTION, log); //Update SLA events
queue(new CoordActionReadyXCommand(coordAction.getJobId()));
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,11 +17,13 @@
*/
package org.apache.oozie.command.coord;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.service.JPAService;
@@ -32,13 +34,11 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
@@ -46,6 +46,8 @@ public class CoordActionUpdateXCommand e
private CoordinatorActionBean coordAction = null;
private JPAService jpaService = null;
private int maxRetries = 1;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<JsonBean> insertList = new ArrayList<JsonBean>();
public CoordActionUpdateXCommand(WorkflowJobBean workflow) {
super("coord-action-update", "coord-action-update", 1);
@@ -93,7 +95,8 @@ public class CoordActionUpdateXCommand e
LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus());
// update lastModifiedTime
coordAction.setLastModifiedTime(new Date());
- jpaService.execute(new CoordActionUpdateStatusJPAExecutor(coordAction));
+ updateList.add(coordAction);
+ jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
// TODO - Uncomment this when bottom up rerun can change terminal state
/* CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
if (!coordJob.isPending()) {
@@ -107,7 +110,7 @@ public class CoordActionUpdateXCommand e
+ " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending());
coordAction.setLastModifiedTime(new Date());
- jpaService.execute(new CoordActionUpdateStatusJPAExecutor(coordAction));
+ updateList.add(coordAction);
// TODO - Uncomment this when bottom up rerun can change terminal state
/*CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
if (!coordJob.isPending()) {
@@ -116,13 +119,19 @@ public class CoordActionUpdateXCommand e
LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true");
}*/
if (slaStatus != null) {
- SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
+ SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
SlaAppType.COORDINATOR_ACTION, LOG);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
}
if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED
&& workflow.getStatus() != WorkflowJob.Status.RUNNING) {
queue(new CoordActionReadyXCommand(coordAction.getJobId()));
}
+
+ jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
+
LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
}
catch (XException ex) {
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,27 +17,31 @@
*/
package org.apache.oozie.command.coord;
+import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.CoordActionRemoveJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -57,6 +61,8 @@ public class CoordChangeXCommand extends
private CoordinatorJobBean coordJob;
private JPAService jpaService = null;
private Job.Status prevStatus;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<JsonBean> deleteList = new ArrayList<JsonBean>();
private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
static {
@@ -250,7 +256,8 @@ public class CoordChangeXCommand extends
private void deleteAction(int actionNum) throws CommandException {
try {
String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum));
- jpaService.execute(new CoordActionRemoveJPAExecutor(actionId));
+ CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ deleteList.add(bean);
} catch (JPAExecutorException e) {
throw new CommandException(e);
}
@@ -340,7 +347,8 @@ public class CoordChangeXCommand extends
coordJob.setDoneMaterialization();
}
- jpaService.execute(new CoordJobUpdateJPAExecutor(this.coordJob));
+ updateList.add(coordJob);
+ jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false));
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java Wed Aug 22 19:32:02 2012
@@ -28,10 +28,9 @@ import org.apache.oozie.command.wf.KillX
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.KillTransitionXCommand;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -102,7 +101,7 @@ public class CoordKillXCommand extends K
}
}
- private void updateCoordAction(CoordinatorActionBean action, boolean makePending) throws CommandException {
+ private void updateCoordAction(CoordinatorActionBean action, boolean makePending) {
action.setStatus(CoordinatorActionBean.Status.KILLED);
if (makePending) {
action.incrementAndGetPending();
@@ -111,43 +110,34 @@ public class CoordKillXCommand extends K
action.setPending(0);
}
action.setLastModifiedTime(new Date());
- try {
- jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
- } catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ updateList.add(action);
}
@Override
public void killChildren() throws CommandException {
- try {
- if (actionList != null) {
- for (CoordinatorActionBean action : actionList) {
- // queue a WorkflowKillXCommand to delete the workflow job and actions
- if (action.getExternalId() != null) {
- queue(new KillXCommand(action.getExternalId()));
- // As the kill command for children is queued, set pending flag for coord action to be true
- updateCoordAction(action, true);
- LOG.debug(
- "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]",
- action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
- }
- else {
- // As killing children is not required, set pending flag for coord action to be false
- updateCoordAction(action, false);
- LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]",
- action.getId(), action.getStatus(), action.getPending());
- }
+ if (actionList != null) {
+ for (CoordinatorActionBean action : actionList) {
+ // queue a WorkflowKillXCommand to delete the workflow job and actions
+ if (action.getExternalId() != null) {
+ queue(new KillXCommand(action.getExternalId()));
+ // As the kill command for children is queued, set pending flag for coord action to be true
+ updateCoordAction(action, true);
+ LOG.debug(
+ "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]",
+ action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
+ }
+ else {
+ // As killing children is not required, set pending flag for coord action to be false
+ updateCoordAction(action, false);
+ LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]",
+ action.getId(), action.getStatus(), action.getPending());
}
}
+ }
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ updateList.add(coordJob);
- LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
- }
- catch (JPAExecutorException ex) {
- throw new CommandException(ex);
- }
+ LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
}
@Override
@@ -161,11 +151,19 @@ public class CoordKillXCommand extends K
@Override
public void updateJob() throws CommandException {
+ updateList.add(coordJob);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
+ */
+ @Override
+ public void performWrites() throws CommandException {
try {
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
}
- catch (JPAExecutorException ex) {
- throw new CommandException(ex);
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.SLAEvent.SlaAppType;
@@ -36,10 +37,9 @@ import org.apache.oozie.command.Material
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
@@ -96,8 +96,16 @@ public class CoordMaterializeTransitionX
*/
@Override
public void updateJob() throws CommandException {
+ updateList.add(coordJob);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites()
+ */
+ @Override
+ public void performWrites() throws CommandException {
try {
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
}
catch (JPAExecutorException jex) {
throw new CommandException(jex);
@@ -345,7 +353,7 @@ public class CoordMaterializeTransitionX
+ actionXml.length());
actionBean.setActionXml(actionXml);
- jpaService.execute(new CoordActionInsertJPAExecutor(actionBean));
+ insertList.add(actionBean);
writeActionRegistration(actionXml, actionBean);
// TODO: time 100s should be configurable
@@ -356,8 +364,11 @@ public class CoordMaterializeTransitionX
private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {
Element eAction = XmlUtils.parseXml(actionXml);
Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
- SLADbOperations.writeSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
+ SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
.getUser(), coordJob.getGroup(), LOG);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
}
private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java Wed Aug 22 19:32:02 2012
@@ -111,4 +111,8 @@ public class CoordPauseXCommand extends
}
+ @Override
+ public void performWrites() throws CommandException {
+ }
+
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,15 +17,15 @@
*/
package org.apache.oozie.command.coord;
+import java.util.Collection;
import java.util.List;
-import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.CoordActionsDeleteForPurgeJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobDeleteJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkDeleteForPurgeJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
@@ -38,7 +38,7 @@ public class CoordPurgeXCommand extends
private JPAService jpaService = null;
private final int olderThan;
private final int limit;
- private List<CoordinatorJobBean> jobList = null;
+ private List<? extends JsonBean> jobList = null;
public CoordPurgeXCommand(int olderThan, int limit) {
super("coord_purge", "coord_purge", 0);
@@ -55,15 +55,11 @@ public class CoordPurgeXCommand extends
int actionDeleted = 0;
if (jobList != null && jobList.size() != 0) {
- for (CoordinatorJobBean coord : jobList) {
- String jobId = coord.getId();
- try {
- jpaService.execute(new CoordJobDeleteJPAExecutor(jobId));
- actionDeleted += jpaService.execute(new CoordActionsDeleteForPurgeJPAExecutor(jobId));
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ try {
+ actionDeleted = jpaService.execute(new BulkDeleteForPurgeJPAExecutor((Collection<JsonBean>) jobList));
+ }
+ catch (JPAExecutorException je) {
+ throw new CommandException(je);
}
LOG.debug("ENDED Coord-Purge deleted jobs :" + jobList.size() + " and actions " + actionDeleted);
}