You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2012/01/18 02:09:14 UTC
svn commit: r1232709 [1/2] - in /incubator/oozie/trunk: ./
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/bundle/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/command/w...
Author: angeloh
Date: Wed Jan 18 01:09:12 2012
New Revision: 1232709
URL: http://svn.apache.org/viewvc?rev=1232709&view=rev
Log:
OOZIE-591: Oozie continues to materialize new actions after end date modification (Mohamed Battisha vis Angelo)
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobsXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInfoXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobsXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/SLAEventsXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/DefinitionXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ExternalIdXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobsXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitPigXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowActionInfoXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestXCommand.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
incubator/oozie/trunk/release-log.txt
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java Wed Jan 18 01:09:12 2012
@@ -603,4 +603,32 @@ public abstract class Command<T, S exten
return this.key;
}
+ /**
+ * Get command lock key returning the key as an entity key, [not used] Just
+ * to be able to implement XCallable [to be deprecated]
+ *
+ * @return key
+ */
+ @Override
+ public String getEntityKey() {
+ return this.key;
+ }
+
+ /**
+ * set the mode of execution for the callable. True if in interrupt, false
+ * if not [to be deprecated]
+ */
+ public void setInterruptMode(boolean mode) {
+ }
+
+ /**
+ * [to be deprecated]
+ *
+ * @return the mode of execution. true if it is executed as an Interrupt,
+ * false otherwise
+ */
+ public boolean getInterruptMode() {
+ return false;
+ }
+
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java Wed Jan 18 01:09:12 2012
@@ -68,6 +68,7 @@ public abstract class XCommand<T> implem
private long createdTime;
private MemoryLocks.LockToken lock;
private boolean used = false;
+ private boolean inInterrupt = false;
private Map<Long, List<XCommand<?>>> commandQueue;
protected boolean dryrun = false;
@@ -241,14 +242,15 @@ public abstract class XCommand<T> implem
eagerVerifyPrecondition();
try {
T ret = null;
- if (isLockRequired()) {
+ if (isLockRequired() && !this.inInterrupt) {
Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
acquireLockCron.start();
acquireLock();
acquireLockCron.stop();
instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
}
- if (!isLockRequired() || (isLockRequired() && lock != null)) {
+ if (!isLockRequired() || (isLockRequired() && lock != null) || this.inInterrupt) {
+ this.executeInterrupts();
LOG.debug("Load state for [{0}]", getEntityKey());
loadState();
LOG = XLog.resetPrefix(LOG);
@@ -307,6 +309,40 @@ public abstract class XCommand<T> implem
}
/**
+ * Check for the existence of interrupts for the same lock key
+ * Execute them if exist.
+ *
+ */
+ protected void executeInterrupts()
+ {
+ if(!this.inInterrupt && (this.getEntityKey() != null)) {
+ CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
+ // getting all the list of interrupts to be executed
+ List <XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey());
+
+ if (callables != null) {
+ // executing the list of interrupts in the given order of insertion in the list
+ for (XCallable<?> callable : callables) {
+ LOG.trace("executing callable [{0}]", callable.getName());
+ try {
+ // executing the callable in interrupt mode
+ callable.setInterruptMode(true);
+ callable.call();
+ LOG.trace("executed callable [{0}]", callable.getName());
+ }
+ catch (Exception ex) {
+ LOG.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
+ }
+ finally {
+ // reseting the interrupt mode to false after the command is executed
+ callable.setInterruptMode(false);
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Return the time out when acquiring a lock.
* <p/>
* The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
@@ -334,7 +370,7 @@ public abstract class XCommand<T> implem
*
* @return the entity key for the command.
*/
- protected abstract String getEntityKey();
+ public abstract String getEntityKey();
/**
* Indicate if the the command requires to requeue itself if the lock is not acquired.
@@ -440,6 +476,22 @@ public abstract class XCommand<T> implem
}
/**
+ * set the mode of execution for the callable. True if in interrupt, false
+ * if not
+ */
+ public void setInterruptMode(boolean mode) {
+ this.inInterrupt = mode;
+ }
+
+ /**
+ * @return the mode of execution. true if it is executed as an Interrupt,
+ * false otherwise
+ */
+ public boolean getInterruptMode() {
+ return this.inInterrupt;
+ }
+
+ /**
* Get XLog log
*
* @return XLog
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java Wed Jan 18 01:09:12 2012
@@ -189,7 +189,7 @@ public class BundleJobChangeXCommand ext
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java Wed Jan 18 01:09:12 2012
@@ -133,7 +133,7 @@ public class BundleJobResumeXCommand ext
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return bundleId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java Wed Jan 18 01:09:12 2012
@@ -77,7 +77,7 @@ public class BundleJobSuspendXCommand ex
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobXCommand.java Wed Jan 18 01:09:12 2012
@@ -60,7 +60,7 @@ public class BundleJobXCommand extends X
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.id;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobsXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobsXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobsXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobsXCommand.java Wed Jan 18 01:09:12 2012
@@ -64,7 +64,7 @@ public class BundleJobsXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java Wed Jan 18 01:09:12 2012
@@ -55,7 +55,7 @@ public class BundleKillXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java Wed Jan 18 01:09:12 2012
@@ -41,7 +41,7 @@ public class BundlePauseXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return bundleJob.getId();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java Wed Jan 18 01:09:12 2012
@@ -99,7 +99,7 @@ public class BundlePurgeXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java Wed Jan 18 01:09:12 2012
@@ -233,7 +233,7 @@ public class BundleRerunXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java Wed Jan 18 01:09:12 2012
@@ -86,7 +86,7 @@ public class BundleStartXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java Wed Jan 18 01:09:12 2012
@@ -89,7 +89,7 @@ public class BundleStatusUpdateXCommand
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.bundleaction.getBundleActionId();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java Wed Jan 18 01:09:12 2012
@@ -194,7 +194,7 @@ public class BundleSubmitXCommand extend
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java Wed Jan 18 01:09:12 2012
@@ -43,7 +43,7 @@ public class BundleUnpauseXCommand exten
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return bundleJob.getId();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java Wed Jan 18 01:09:12 2012
@@ -115,7 +115,7 @@ public class CoordActionCheckXCommand ex
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return actionId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInfoXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInfoXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInfoXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInfoXCommand.java Wed Jan 18 01:09:12 2012
@@ -64,7 +64,7 @@ public class CoordActionInfoXCommand ext
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Wed Jan 18 01:09:12 2012
@@ -72,10 +72,12 @@ public class CoordActionInputCheckXComma
private CoordinatorActionBean coordAction = null;
private CoordinatorJobBean coordJob = null;
private JPAService jpaService = null;
+ private String jobId = null;
- public CoordActionInputCheckXCommand(String actionId) {
+ public CoordActionInputCheckXCommand(String actionId, String jobId) {
super("coord_action_input", "coord_action_input", 1);
this.actionId = ParamChecker.notEmpty(actionId, "actionId");
+ this.jobId = jobId;
}
/* (non-Javadoc)
@@ -90,7 +92,7 @@ public class CoordActionInputCheckXComma
Date nominalTime = coordAction.getNominalTime();
Date currentTime = new Date();
if (nominalTime.compareTo(currentTime) > 0) {
- queue(new CoordActionInputCheckXCommand(coordAction.getId()), Math.max((nominalTime.getTime() - currentTime
+ queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
.getTime()), getCoordInputCheckRequeueInterval()));
// update lastModifiedTime
coordAction.setLastModifiedTime(new Date());
@@ -140,7 +142,7 @@ public class CoordActionInputCheckXComma
queue(new CoordActionTimeOutXCommand(coordAction), 100);
}
else {
- queue(new CoordActionInputCheckXCommand(coordAction.getId()), getCoordInputCheckRequeueInterval());
+ queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval());
}
}
coordAction.setLastModifiedTime(new Date());
@@ -457,8 +459,8 @@ public class CoordActionInputCheckXComma
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
- return coordAction.getJobId();
+ public String getEntityKey() {
+ return this.jobId;
}
/* (non-Javadoc)
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java Wed Jan 18 01:09:12 2012
@@ -108,7 +108,7 @@ public class CoordActionNotificationXCom
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return actionBean.getId();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java Wed Jan 18 01:09:12 2012
@@ -109,7 +109,7 @@ public class CoordActionReadyXCommand ex
// change state of action to SUBMITTED
action.setStatus(CoordinatorAction.Status.SUBMITTED);
// queue action to start action
- queue(new CoordActionStartXCommand(action.getId(), user, authToken), 100);
+ queue(new CoordActionStartXCommand(action.getId(), user, authToken, action.getJobId()), 100);
try {
jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(action));
}
@@ -127,7 +127,7 @@ public class CoordActionReadyXCommand ex
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java Wed Jan 18 01:09:12 2012
@@ -64,13 +64,15 @@ public class CoordActionStartXCommand ex
private String authToken = null;
private CoordinatorActionBean coordAction = null;
private JPAService jpaService = null;
+ private String jobId = null;
- public CoordActionStartXCommand(String id, String user, String token) {
+ public CoordActionStartXCommand(String id, String user, String token, String jobId) {
//super("coord_action_start", "coord_action_start", 1, XLog.OPS);
super("coord_action_start", "coord_action_start", 1);
this.actionId = ParamChecker.notEmpty(id, "id");
this.user = ParamChecker.notEmpty(user, "user");
this.authToken = ParamChecker.notEmpty(token, "token");
+ this.jobId = jobId;
}
/**
@@ -235,8 +237,8 @@ public class CoordActionStartXCommand ex
}
@Override
- protected String getEntityKey() {
- return coordAction.getJobId();
+ public String getEntityKey() {
+ return this.jobId;
}
@Override
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java Wed Jan 18 01:09:12 2012
@@ -67,7 +67,7 @@ public class CoordActionTimeOutXCommand
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return actionBean.getJobId();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java Wed Jan 18 01:09:12 2012
@@ -120,7 +120,7 @@ public class CoordActionUpdateXCommand e
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return coordAction.getJobId();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java Wed Jan 18 01:09:12 2012
@@ -323,7 +323,7 @@ public class CoordChangeXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java Wed Jan 18 01:09:12 2012
@@ -88,7 +88,7 @@ public class CoordJobXCommand extends Co
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.id;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobsXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobsXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobsXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobsXCommand.java Wed Jan 18 01:09:12 2012
@@ -56,7 +56,7 @@ public class CoordJobsXCommand extends C
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java Wed Jan 18 01:09:12 2012
@@ -61,7 +61,7 @@ public class CoordKillXCommand extends K
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Wed Jan 18 01:09:12 2012
@@ -108,8 +108,8 @@ public class CoordMaterializeTransitionX
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
- return jobId;
+ public String getEntityKey() {
+ return this.jobId;
}
@Override
@@ -349,7 +349,7 @@ public class CoordMaterializeTransitionX
// TODO: time 100s should be configurable
queue(new CoordActionNotificationXCommand(actionBean), 100);
- queue(new CoordActionInputCheckXCommand(actionBean.getId()), 100);
+ queue(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100);
}
private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java Wed Jan 18 01:09:12 2012
@@ -44,7 +44,7 @@ public class CoordPauseXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return coordJob.getId();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java Wed Jan 18 01:09:12 2012
@@ -77,7 +77,7 @@ public class CoordPurgeXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Wed Jan 18 01:09:12 2012
@@ -374,7 +374,7 @@ public class CoordRerunXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return jobId;
}
@@ -460,7 +460,7 @@ public class CoordRerunXCommand extends
updateAction(coordJob, coordAction, actionXml);
queue(new CoordActionNotificationXCommand(coordAction), 100);
- queue(new CoordActionInputCheckXCommand(coordAction.getId()), 100);
+ queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
}
}
else {
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java Wed Jan 18 01:09:12 2012
@@ -62,7 +62,7 @@ public class CoordResumeXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java Wed Jan 18 01:09:12 2012
@@ -1034,7 +1034,7 @@ public class CoordSubmitXCommand extends
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java Wed Jan 18 01:09:12 2012
@@ -63,7 +63,7 @@ public class CoordSuspendXCommand extend
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java Wed Jan 18 01:09:12 2012
@@ -47,7 +47,7 @@ public class CoordUnpauseXCommand extend
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return coordJob.getId();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/SLAEventsXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/SLAEventsXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/SLAEventsXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/SLAEventsXCommand.java Wed Jan 18 01:09:12 2012
@@ -57,7 +57,7 @@ public class SLAEventsXCommand extends X
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return Long.toString(seqId);
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java Wed Jan 18 01:09:12 2012
@@ -121,7 +121,7 @@ public class ActionCheckXCommand extends
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java Wed Jan 18 01:09:12 2012
@@ -72,7 +72,7 @@ public class ActionEndXCommand extends A
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java Wed Jan 18 01:09:12 2012
@@ -67,7 +67,7 @@ public class ActionKillXCommand extends
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Wed Jan 18 01:09:12 2012
@@ -78,7 +78,7 @@ public class ActionStartXCommand extends
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java Wed Jan 18 01:09:12 2012
@@ -104,11 +104,11 @@ public class CompletedActionXCommand ext
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/DefinitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/DefinitionXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/DefinitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/DefinitionXCommand.java Wed Jan 18 01:09:12 2012
@@ -43,7 +43,7 @@ public class DefinitionXCommand extends
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ExternalIdXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ExternalIdXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ExternalIdXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ExternalIdXCommand.java Wed Jan 18 01:09:12 2012
@@ -40,7 +40,7 @@ public class ExternalIdXCommand extends
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.externalId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java Wed Jan 18 01:09:12 2012
@@ -90,7 +90,7 @@ public class JobXCommand extends Workflo
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.id;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobsXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobsXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobsXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobsXCommand.java Wed Jan 18 01:09:12 2012
@@ -77,7 +77,7 @@ public class JobsXCommand extends Workfl
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java Wed Jan 18 01:09:12 2012
@@ -67,7 +67,7 @@ public class KillXCommand extends Workfl
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.wfId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java Wed Jan 18 01:09:12 2012
@@ -75,7 +75,7 @@ public class NotificationXCommand extend
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return url;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java Wed Jan 18 01:09:12 2012
@@ -68,7 +68,7 @@ public class PurgeXCommand extends Workf
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java Wed Jan 18 01:09:12 2012
@@ -312,7 +312,7 @@ public class ReRunXCommand extends Workf
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java Wed Jan 18 01:09:12 2012
@@ -120,7 +120,7 @@ public class ResumeXCommand extends Work
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return id;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Wed Jan 18 01:09:12 2012
@@ -86,7 +86,7 @@ public class SignalXCommand extends Work
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.jobId;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java Wed Jan 18 01:09:12 2012
@@ -157,7 +157,7 @@ public class SubmitMRXCommand extends Su
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitPigXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitPigXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitPigXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitPigXCommand.java Wed Jan 18 01:09:12 2012
@@ -162,7 +162,7 @@ public class SubmitPigXCommand extends S
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java Wed Jan 18 01:09:12 2012
@@ -265,7 +265,7 @@ public class SubmitXCommand extends Work
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java Wed Jan 18 01:09:12 2012
@@ -166,7 +166,7 @@ public class SuspendXCommand extends Wor
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return this.wfid;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java Wed Jan 18 01:09:12 2012
@@ -87,7 +87,7 @@ public class WfEndXCommand extends Workf
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return job.getId();
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowActionInfoXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowActionInfoXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowActionInfoXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowActionInfoXCommand.java Wed Jan 18 01:09:12 2012
@@ -59,7 +59,7 @@ public class WorkflowActionInfoXCommand
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java Wed Jan 18 01:09:12 2012
@@ -18,6 +18,7 @@
package org.apache.oozie.service;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -31,7 +32,6 @@ import java.util.concurrent.RejectedExec
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
@@ -79,6 +79,8 @@ public class CallableQueueService implem
public static final String CONF_THREADS = CONF_PREFIX + "threads";
public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
+ public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
+ public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
public static final int CONCURRENCY_DELAY = 500;
@@ -88,6 +90,12 @@ public class CallableQueueService implem
private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
+ private final ConcurrentHashMap<String, List<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, List<XCallable<?>>>();
+
+ private final HashSet<String> interruptTypes = new HashSet<String>();
+
+ private int interruptMapMaxSize;
+
private int maxCallableConcurrency;
private boolean callableBegin(XCallable<?> callable) {
@@ -241,7 +249,6 @@ public class CallableQueueService implem
uniqueCallables.remove(callable.getKey());
}
}
-
}
class CompositeCallable implements XCallable<Void> {
@@ -282,6 +289,11 @@ public class CallableQueueService implem
}
@Override
+ public String getEntityKey() {
+ return "#composite#" + callables.get(0).getEntityKey();
+ }
+
+ @Override
public int getPriority() {
return priority;
}
@@ -291,6 +303,19 @@ public class CallableQueueService implem
return createdTime;
}
+ @Override
+ public void setInterruptMode(boolean mode) {
+ }
+
+ @Override
+ public boolean getInterruptMode() {
+ return false;
+ }
+
+ public List<XCallable<?>> getCallables() {
+ return this.callables;
+ }
+
public Void call() throws Exception {
XLog log = XLog.getLog(getClass());
@@ -379,14 +404,12 @@ public class CallableQueueService implem
uniqueCallables.remove(callable.getKey());
}
}
-
}
private XLog log = XLog.getLog(getClass());
private int queueSize;
private PriorityDelayQueue<CallableWrapper> queue;
- private AtomicLong delayQueueExecCounter = new AtomicLong(0);
private ThreadPoolExecutor executor;
private Instrumentation instrumentation;
@@ -414,7 +437,7 @@ public class CallableQueueService implem
* @param services services instance.
*/
@Override
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
public void init(Services services) {
Configuration conf = services.getConf();
@@ -422,6 +445,10 @@ public class CallableQueueService implem
int threads = conf.getInt(CONF_THREADS, 10);
boolean callableNextEligible = conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true);
+ for (String type : conf.getStringCollection(CONF_CALLABLE_INTERRUPT_TYPES)) {
+ interruptTypes.add(type);
+ }
+
if (!callableNextEligible) {
queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
@Override
@@ -431,11 +458,12 @@ public class CallableQueueService implem
};
}
else {
- // If the head of this queue has already reached max concurrency, continuously find next one
- // which has not yet reach max concurrency.Overrided method 'eligibleToPoll' to check if the
+ // If the head of this queue has already reached max concurrency,
+ // continuously find next one
+ // which has not yet reach max concurrency.Overrided method
+ // 'eligibleToPoll' to check if the
// element of this queue has reached the maximum concurrency.
- queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS,
- queueSize) {
+ queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
@Override
protected void debug(String msgTemplate, Object... msgArgs) {
log.trace(msgTemplate, msgArgs);
@@ -455,6 +483,8 @@ public class CallableQueueService implem
};
}
+ interruptMapMaxSize = conf.getInt(CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, 100);
+
// IMPORTANT: The ThreadPoolExecutor does not always the execute
// commands out of the queue, there are
// certain conditions where commands are pushed directly to a thread.
@@ -568,7 +598,6 @@ public class CallableQueueService implem
* @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
* were not queued.
*/
- @SuppressWarnings("unchecked")
public boolean queueSerial(List<? extends XCallable<?>> callables) {
return queueSerial(callables, 0);
}
@@ -578,8 +607,8 @@ public class CallableQueueService implem
*
* @param callable callable to queue for delayed execution
* @param delay time, in milliseconds, that the callable should be delayed.
- * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
- * was not queued.
+ * @return <code>true</code> if the callable was queued, <code>false</code>
+ * if the queue is full and the callable was not queued.
*/
public synchronized boolean queue(XCallable<?> callable, long delay) {
if (callable == null) {
@@ -590,6 +619,7 @@ public class CallableQueueService implem
log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
}
else {
+ checkInterruptTypes(callable);
queued = queue(new CallableWrapper(callable, delay), false);
if (queued) {
incrCounter(INSTR_QUEUED_COUNTER, 1);
@@ -613,7 +643,6 @@ public class CallableQueueService implem
* @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
* were not queued.
*/
- @SuppressWarnings("unchecked")
public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
boolean queued;
if (callables == null || callables.size() == 0) {
@@ -653,6 +682,62 @@ public class CallableQueueService implem
}
/**
+ * check the interrupt map for the existence of an interrupt commands if
+ * exist a List of Interrupt Callable for the same lock key will bereturned,
+ * otherwise it will return null
+ */
+ public List<XCallable<?>> checkInterrupts(String lockKey) {
+
+ if (lockKey != null) {
+ return interruptCommandsMap.remove(lockKey);
+ }
+ return null;
+ }
+
+ /**
+ * check if the callable is of an interrupt type and insert it into the map
+ * accordingly
+ *
+ * @param callable
+ */
+ public void checkInterruptTypes(XCallable<?> callable) {
+ if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
+ for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
+ if (interruptTypes.contains(singleCallable.getType())) {
+ insertCallableIntoInterruptMap(singleCallable);
+ }
+ }
+ }
+ else if (interruptTypes.contains(callable.getType())) {
+ insertCallableIntoInterruptMap(callable);
+ }
+ }
+
+ /**
+ * insert a new callable in the Interrupt Command Map add a new element to
+ * the list or create a new list accordingly
+ *
+ * @param callable
+ */
+ public void insertCallableIntoInterruptMap(XCallable<?> callable) {
+ if (interruptCommandsMap.size() < interruptMapMaxSize) {
+ List<XCallable<?>> newList = Collections.synchronizedList(new ArrayList<XCallable<?>>());
+ List<XCallable<?>> interruptList = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newList);
+ if (interruptList == null) {
+ interruptList = newList;
+ }
+ interruptList.add(callable);
+ log.trace("Inserting an interrupt element [{1}] to the interrupt map", interruptCommandsMap.size(),
+ callable.toString());
+ }
+ else {
+ log.warn(
+ "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]",
+ interruptCommandsMap.size(), callable.toString());
+ }
+ }
+
+ /**
* Get the list of strings of queue dump
*
* @return the list of string that representing each CallableWrapper
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java Wed Jan 18 01:09:12 2012
@@ -227,7 +227,7 @@ public class RecoveryService implements
INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
if (useXCommand) {
- queueCallable(new CoordActionInputCheckXCommand(caction.getId()));
+ queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
} else {
queueCallable(new CoordActionInputCheckCommand(caction.getId()));
}
@@ -238,8 +238,7 @@ public class RecoveryService implements
CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(caction.getJobId()));
if (useXCommand) {
- queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), coordJob
- .getAuthToken()));
+ queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), coordJob.getAuthToken(), caction.getJobId()));
} else {
queueCallable(new CoordActionStartCommand(caction.getId(), coordJob.getUser(), coordJob
.getAuthToken()));
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java Wed Jan 18 01:09:12 2012
@@ -62,4 +62,23 @@ public interface XCallable<T> extends Ca
*/
public String getKey();
+ /**
+ * Return the lock key of the callable
+ *
+ * @return the callable Lock key
+ */
+ public String getEntityKey();
+
+ /**
+ * set the mode of execution for the callable. True if in interrupt, false
+ * if not
+ */
+ public void setInterruptMode(boolean mode);
+
+ /**
+ * @return the mode of execution. true if it is executed as an Interrupt,
+ * false otherwise
+ */
+ public boolean getInterruptMode();
+
}
Modified: incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ incubator/oozie/trunk/core/src/main/resources/oozie-default.xml Wed Jan 18 01:09:12 2012
@@ -270,7 +270,34 @@
Oozie continuously find next one which has not yet reach max concurrency.
</description>
</property>
-
+
+ <property>
+ <name>oozie.service.CallableQueueService.InterruptMapMaxSize</name>
+ <value>500</value>
+ <description>
+ Maximum Size of the Interrupt Map, the interrupt element will not be inserted in the map if exceeded the size.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.CallableQueueService.InterruptTypes</name>
+ <value>
+ kill,
+ resume,
+ suspend,
+ bundle_kill,
+ bundle_resume,
+ bundle_suspend,
+ coord_kill,
+ coord_change,
+ coord_resume,
+ coord_suspend
+ </value>
+ <description>
+ Getting the types of XCommands that are considered to be of Interrupt type
+ </description>
+ </property>
+
<!-- CoordMaterializeTriggerService -->
<property>
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java Wed Jan 18 01:09:12 2012
@@ -73,6 +73,15 @@ public class TestCommand extends XTestCa
return sb.toString();
}
+ @Override
+ public void setInterruptMode(boolean mode) {
+ }
+
+ @Override
+ public boolean getInterruptMode() {
+ return false;
+ }
+
public Void call() throws Exception {
EXECUTED.add(name);
return null;
@@ -83,6 +92,11 @@ public class TestCommand extends XTestCa
return this.key;
}
+ @Override
+ public String getEntityKey() {
+ return null;
+ }
+
}
private static class MyCommand extends Command<Object, WorkflowStore> {
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestXCommand.java Wed Jan 18 01:09:12 2012
@@ -75,7 +75,7 @@ public class TestXCommand extends XTestC
}
@Override
- protected String getEntityKey() {
+ public String getEntityKey() {
return "key";
}
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java Wed Jan 18 01:09:12 2012
@@ -66,8 +66,8 @@ public class TestCoordActionInputCheckXC
long executed = 0;
int wait;
- public MyCoordActionInputCheckXCommand(String actionId, int wait) {
- super(actionId);
+ public MyCoordActionInputCheckXCommand(String actionId, int wait, String entityKey) {
+ super(actionId, entityKey);
this.wait = wait;
}
@@ -106,9 +106,9 @@ public class TestCoordActionInputCheckXC
createDir(getTestCaseDir() + "/2009/15/");
createDir(getTestCaseDir() + "/2009/08/");
- final MyCoordActionInputCheckXCommand callable1 = new MyCoordActionInputCheckXCommand(action1.getId(), 100);
- final MyCoordActionInputCheckXCommand callable2 = new MyCoordActionInputCheckXCommand(action1.getId(), 100);
- final MyCoordActionInputCheckXCommand callable3 = new MyCoordActionInputCheckXCommand(action1.getId(), 100);
+ final MyCoordActionInputCheckXCommand callable1 = new MyCoordActionInputCheckXCommand(action1.getId(), 100, "1");
+ final MyCoordActionInputCheckXCommand callable2 = new MyCoordActionInputCheckXCommand(action1.getId(), 100, "2");
+ final MyCoordActionInputCheckXCommand callable3 = new MyCoordActionInputCheckXCommand(action1.getId(), 100, "3");
List<MyCoordActionInputCheckXCommand> callables = Arrays.asList(callable1, callable2, callable3);
@@ -137,7 +137,7 @@ public class TestCoordActionInputCheckXC
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
createDir(getTestCaseDir() + "/2009/29/");
createDir(getTestCaseDir() + "/2009/15/");
- new CoordActionInputCheckXCommand(job.getId() + "@1").call();
+ new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
checkCoordAction(job.getId() + "@1");
}
@@ -162,7 +162,7 @@ public class TestCoordActionInputCheckXC
Services.get().getConf().setLong(CoordActionInputCheckXCommand.CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL,
testedValue);
- CoordActionInputCheckXCommand caicc = new CoordActionInputCheckXCommand(job.getId() + "@1");
+ CoordActionInputCheckXCommand caicc = new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId());
long effectiveValue = caicc.getCoordInputCheckRequeueInterval();
// Verify if two values are same.
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java Wed Jan 18 01:09:12 2012
@@ -77,7 +77,7 @@ public class TestCoordActionStartXComman
public void testActionStartCommand() throws IOException, JPAExecutorException, CommandException {
String actionId = new Date().getTime() + "-COORD-ActionStartCommand-C@1";
addRecordToActionTable(actionId, 1);
- new CoordActionStartXCommand(actionId, "me", "mytoken").call();
+ new CoordActionStartXCommand(actionId, "me", "mytoken", "myjob").call();
checkCoordAction(actionId);
}
@@ -97,7 +97,7 @@ public class TestCoordActionStartXComman
CoordinatorAction.Status.SUBMITTED, "coord-action-start-escape-strings.xml", 0);
String actionId = action.getId();
- new CoordActionStartXCommand(actionId, getTestUser(), "undef").call();
+ new CoordActionStartXCommand(actionId, getTestUser(), "undef","myjob").call();
final JPAService jpaService = Services.get().get(JPAService.class);
action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));