You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2012/01/18 02:09:14 UTC

svn commit: r1232709 [1/2] - in /incubator/oozie/trunk: ./ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/bundle/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/command/w...

Author: angeloh
Date: Wed Jan 18 01:09:12 2012
New Revision: 1232709

URL: http://svn.apache.org/viewvc?rev=1232709&view=rev
Log:
OOZIE-591: Oozie continues to materialize new actions after end date modification (Mohamed Battisha vis Angelo)

Modified:
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobsXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInfoXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobsXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/SLAEventsXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/DefinitionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ExternalIdXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobsXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitPigXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowActionInfoXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
    incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestXCommand.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
    incubator/oozie/trunk/release-log.txt

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java Wed Jan 18 01:09:12 2012
@@ -603,4 +603,32 @@ public abstract class Command<T, S exten
         return this.key;
     }
 
+    /**
+     * Get command lock key returning the key as an entity key, [not used] Just
+     * to be able to implement XCallable [to be deprecated]
+     *
+     * @return key
+     */
+    @Override
+    public String getEntityKey() {
+        return this.key;
+    }
+
+    /**
+     * set the mode of execution for the callable. True if in interrupt, false
+     * if not [to be deprecated]
+     */
+    public void setInterruptMode(boolean mode) {
+    }
+
+    /**
+     * [to be deprecated]
+     *
+     * @return the mode of execution. true if it is executed as an Interrupt,
+     *         false otherwise
+     */
+    public boolean getInterruptMode() {
+        return false;
+    }
+
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java Wed Jan 18 01:09:12 2012
@@ -68,6 +68,7 @@ public abstract class XCommand<T> implem
     private long createdTime;
     private MemoryLocks.LockToken lock;
     private boolean used = false;
+    private boolean inInterrupt = false;
 
     private Map<Long, List<XCommand<?>>> commandQueue;
     protected boolean dryrun = false;
@@ -241,14 +242,15 @@ public abstract class XCommand<T> implem
             eagerVerifyPrecondition();
             try {
                 T ret = null;
-                if (isLockRequired()) {
+                if (isLockRequired() && !this.inInterrupt) {
                     Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
                     acquireLockCron.start();
                     acquireLock();
                     acquireLockCron.stop();
                     instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
                 }
-                if (!isLockRequired() || (isLockRequired() && lock != null)) {
+                if (!isLockRequired() || (isLockRequired() && lock != null) || this.inInterrupt) {
+                    this.executeInterrupts();
                     LOG.debug("Load state for [{0}]", getEntityKey());
                     loadState();
                     LOG = XLog.resetPrefix(LOG);
@@ -307,6 +309,40 @@ public abstract class XCommand<T> implem
     }
 
     /**
+     * Check for the existence of interrupts for the same lock key
+     * Execute them if exist.
+     *
+     */
+    protected void executeInterrupts()
+    {
+        if(!this.inInterrupt && (this.getEntityKey() != null)) {
+            CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
+            // getting all the list of interrupts to be executed
+            List <XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey());
+
+            if (callables != null) {
+                // executing the list of interrupts in the given order of insertion in the list
+                for (XCallable<?> callable : callables) {
+                    LOG.trace("executing callable [{0}]", callable.getName());
+                    try {
+                        // executing the callable in interrupt mode
+                        callable.setInterruptMode(true);
+                        callable.call();
+                        LOG.trace("executed callable [{0}]", callable.getName());
+                    }
+                    catch (Exception ex) {
+                     LOG.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
+                    }
+                    finally {
+                        // reseting the interrupt mode to false after the command is executed
+                        callable.setInterruptMode(false);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
      * Return the time out when acquiring a lock.
      * <p/>
      * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
@@ -334,7 +370,7 @@ public abstract class XCommand<T> implem
      *
      * @return the entity key for the command.
      */
-    protected abstract String getEntityKey();
+    public abstract String getEntityKey();
 
     /**
      * Indicate if the the command requires to requeue itself if the lock is not acquired.
@@ -440,6 +476,22 @@ public abstract class XCommand<T> implem
     }
 
     /**
+     * set the mode of execution for the callable. True if in interrupt, false
+     * if not
+     */
+    public void setInterruptMode(boolean mode) {
+        this.inInterrupt = mode;
+    }
+
+    /**
+     * @return the mode of execution. true if it is executed as an Interrupt,
+     *         false otherwise
+     */
+    public boolean getInterruptMode() {
+        return this.inInterrupt;
+    }
+
+    /**
      * Get XLog log
      *
      * @return XLog

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java Wed Jan 18 01:09:12 2012
@@ -189,7 +189,7 @@ public class BundleJobChangeXCommand ext
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java Wed Jan 18 01:09:12 2012
@@ -133,7 +133,7 @@ public class BundleJobResumeXCommand ext
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return bundleId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java Wed Jan 18 01:09:12 2012
@@ -77,7 +77,7 @@ public class BundleJobSuspendXCommand ex
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobXCommand.java Wed Jan 18 01:09:12 2012
@@ -60,7 +60,7 @@ public class BundleJobXCommand extends X
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.id;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobsXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobsXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobsXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobsXCommand.java Wed Jan 18 01:09:12 2012
@@ -64,7 +64,7 @@ public class BundleJobsXCommand extends 
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java Wed Jan 18 01:09:12 2012
@@ -55,7 +55,7 @@ public class BundleKillXCommand extends 
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java Wed Jan 18 01:09:12 2012
@@ -41,7 +41,7 @@ public class BundlePauseXCommand extends
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return bundleJob.getId();
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java Wed Jan 18 01:09:12 2012
@@ -99,7 +99,7 @@ public class BundlePurgeXCommand extends
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java Wed Jan 18 01:09:12 2012
@@ -233,7 +233,7 @@ public class BundleRerunXCommand extends
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java Wed Jan 18 01:09:12 2012
@@ -86,7 +86,7 @@ public class BundleStartXCommand extends
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java Wed Jan 18 01:09:12 2012
@@ -89,7 +89,7 @@ public class BundleStatusUpdateXCommand 
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.bundleaction.getBundleActionId();
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java Wed Jan 18 01:09:12 2012
@@ -194,7 +194,7 @@ public class BundleSubmitXCommand extend
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java Wed Jan 18 01:09:12 2012
@@ -43,7 +43,7 @@ public class BundleUnpauseXCommand exten
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return bundleJob.getId();
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java Wed Jan 18 01:09:12 2012
@@ -115,7 +115,7 @@ public class CoordActionCheckXCommand ex
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return actionId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInfoXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInfoXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInfoXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInfoXCommand.java Wed Jan 18 01:09:12 2012
@@ -64,7 +64,7 @@ public class CoordActionInfoXCommand ext
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Wed Jan 18 01:09:12 2012
@@ -72,10 +72,12 @@ public class CoordActionInputCheckXComma
     private CoordinatorActionBean coordAction = null;
     private CoordinatorJobBean coordJob = null;
     private JPAService jpaService = null;
+    private String jobId = null;
 
-    public CoordActionInputCheckXCommand(String actionId) {
+    public CoordActionInputCheckXCommand(String actionId, String jobId) {
         super("coord_action_input", "coord_action_input", 1);
         this.actionId = ParamChecker.notEmpty(actionId, "actionId");
+        this.jobId = jobId;
     }
 
     /* (non-Javadoc)
@@ -90,7 +92,7 @@ public class CoordActionInputCheckXComma
         Date nominalTime = coordAction.getNominalTime();
         Date currentTime = new Date();
         if (nominalTime.compareTo(currentTime) > 0) {
-            queue(new CoordActionInputCheckXCommand(coordAction.getId()), Math.max((nominalTime.getTime() - currentTime
+            queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
                     .getTime()), getCoordInputCheckRequeueInterval()));
             // update lastModifiedTime
             coordAction.setLastModifiedTime(new Date());
@@ -140,7 +142,7 @@ public class CoordActionInputCheckXComma
                     queue(new CoordActionTimeOutXCommand(coordAction), 100);
                 }
                 else {
-                    queue(new CoordActionInputCheckXCommand(coordAction.getId()), getCoordInputCheckRequeueInterval());
+                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval());
                 }
             }
             coordAction.setLastModifiedTime(new Date());
@@ -457,8 +459,8 @@ public class CoordActionInputCheckXComma
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
-        return coordAction.getJobId();
+    public String getEntityKey() {
+        return this.jobId;
     }
 
     /* (non-Javadoc)

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java Wed Jan 18 01:09:12 2012
@@ -108,7 +108,7 @@ public class CoordActionNotificationXCom
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return actionBean.getId();
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java Wed Jan 18 01:09:12 2012
@@ -109,7 +109,7 @@ public class CoordActionReadyXCommand ex
                 // change state of action to SUBMITTED
                 action.setStatus(CoordinatorAction.Status.SUBMITTED);
                 // queue action to start action
-                queue(new CoordActionStartXCommand(action.getId(), user, authToken), 100);
+                queue(new CoordActionStartXCommand(action.getId(), user, authToken, action.getJobId()), 100);
                 try {
                     jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(action));
                 }
@@ -127,7 +127,7 @@ public class CoordActionReadyXCommand ex
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java Wed Jan 18 01:09:12 2012
@@ -64,13 +64,15 @@ public class CoordActionStartXCommand ex
     private String authToken = null;
     private CoordinatorActionBean coordAction = null;
     private JPAService jpaService = null;
+    private String jobId = null;
 
-    public CoordActionStartXCommand(String id, String user, String token) {
+    public CoordActionStartXCommand(String id, String user, String token, String jobId) {
         //super("coord_action_start", "coord_action_start", 1, XLog.OPS);
         super("coord_action_start", "coord_action_start", 1);
         this.actionId = ParamChecker.notEmpty(id, "id");
         this.user = ParamChecker.notEmpty(user, "user");
         this.authToken = ParamChecker.notEmpty(token, "token");
+        this.jobId = jobId;
     }
 
     /**
@@ -235,8 +237,8 @@ public class CoordActionStartXCommand ex
     }
 
     @Override
-    protected String getEntityKey() {
-        return coordAction.getJobId();
+    public String getEntityKey() {
+        return this.jobId;
     }
 
     @Override

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java Wed Jan 18 01:09:12 2012
@@ -67,7 +67,7 @@ public class CoordActionTimeOutXCommand 
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return actionBean.getJobId();
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java Wed Jan 18 01:09:12 2012
@@ -120,7 +120,7 @@ public class CoordActionUpdateXCommand e
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return coordAction.getJobId();
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java Wed Jan 18 01:09:12 2012
@@ -323,7 +323,7 @@ public class CoordChangeXCommand extends
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java Wed Jan 18 01:09:12 2012
@@ -88,7 +88,7 @@ public class CoordJobXCommand extends Co
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.id;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobsXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobsXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobsXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobsXCommand.java Wed Jan 18 01:09:12 2012
@@ -56,7 +56,7 @@ public class CoordJobsXCommand extends C
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java Wed Jan 18 01:09:12 2012
@@ -61,7 +61,7 @@ public class CoordKillXCommand extends K
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Wed Jan 18 01:09:12 2012
@@ -108,8 +108,8 @@ public class CoordMaterializeTransitionX
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
-        return jobId;
+    public String getEntityKey() {
+        return this.jobId;
     }
 
     @Override
@@ -349,7 +349,7 @@ public class CoordMaterializeTransitionX
 
         // TODO: time 100s should be configurable
         queue(new CoordActionNotificationXCommand(actionBean), 100);
-        queue(new CoordActionInputCheckXCommand(actionBean.getId()), 100);
+        queue(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100);
     }
 
     private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java Wed Jan 18 01:09:12 2012
@@ -44,7 +44,7 @@ public class CoordPauseXCommand extends 
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return coordJob.getId();
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java Wed Jan 18 01:09:12 2012
@@ -77,7 +77,7 @@ public class CoordPurgeXCommand extends 
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Wed Jan 18 01:09:12 2012
@@ -374,7 +374,7 @@ public class CoordRerunXCommand extends 
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return jobId;
     }
 
@@ -460,7 +460,7 @@ public class CoordRerunXCommand extends 
                     updateAction(coordJob, coordAction, actionXml);
 
                     queue(new CoordActionNotificationXCommand(coordAction), 100);
-                    queue(new CoordActionInputCheckXCommand(coordAction.getId()), 100);
+                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
                 }
             }
             else {

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java Wed Jan 18 01:09:12 2012
@@ -62,7 +62,7 @@ public class CoordResumeXCommand extends
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java Wed Jan 18 01:09:12 2012
@@ -1034,7 +1034,7 @@ public class CoordSubmitXCommand extends
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java Wed Jan 18 01:09:12 2012
@@ -63,7 +63,7 @@ public class CoordSuspendXCommand extend
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java Wed Jan 18 01:09:12 2012
@@ -47,7 +47,7 @@ public class CoordUnpauseXCommand extend
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return coordJob.getId();
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/SLAEventsXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/SLAEventsXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/SLAEventsXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/SLAEventsXCommand.java Wed Jan 18 01:09:12 2012
@@ -57,7 +57,7 @@ public class SLAEventsXCommand extends X
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return Long.toString(seqId);
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java Wed Jan 18 01:09:12 2012
@@ -121,7 +121,7 @@ public class ActionCheckXCommand extends
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java Wed Jan 18 01:09:12 2012
@@ -72,7 +72,7 @@ public class ActionEndXCommand extends A
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java Wed Jan 18 01:09:12 2012
@@ -67,7 +67,7 @@ public class ActionKillXCommand extends 
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Wed Jan 18 01:09:12 2012
@@ -78,7 +78,7 @@ public class ActionStartXCommand extends
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/CompletedActionXCommand.java Wed Jan 18 01:09:12 2012
@@ -104,11 +104,11 @@ public class CompletedActionXCommand ext
 
     /*
      * (non-Javadoc)
-     * 
+     *
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/DefinitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/DefinitionXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/DefinitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/DefinitionXCommand.java Wed Jan 18 01:09:12 2012
@@ -43,7 +43,7 @@ public class DefinitionXCommand extends 
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ExternalIdXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ExternalIdXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ExternalIdXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ExternalIdXCommand.java Wed Jan 18 01:09:12 2012
@@ -40,7 +40,7 @@ public class ExternalIdXCommand extends 
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.externalId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobXCommand.java Wed Jan 18 01:09:12 2012
@@ -90,7 +90,7 @@ public class JobXCommand extends Workflo
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.id;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobsXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobsXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobsXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/JobsXCommand.java Wed Jan 18 01:09:12 2012
@@ -77,7 +77,7 @@ public class JobsXCommand extends Workfl
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java Wed Jan 18 01:09:12 2012
@@ -67,7 +67,7 @@ public class KillXCommand extends Workfl
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.wfId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/NotificationXCommand.java Wed Jan 18 01:09:12 2012
@@ -75,7 +75,7 @@ public class NotificationXCommand extend
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return url;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java Wed Jan 18 01:09:12 2012
@@ -68,7 +68,7 @@ public class PurgeXCommand extends Workf
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java Wed Jan 18 01:09:12 2012
@@ -312,7 +312,7 @@ public class ReRunXCommand extends Workf
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java Wed Jan 18 01:09:12 2012
@@ -120,7 +120,7 @@ public class ResumeXCommand extends Work
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return id;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Wed Jan 18 01:09:12 2012
@@ -86,7 +86,7 @@ public class SignalXCommand extends Work
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.jobId;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java Wed Jan 18 01:09:12 2012
@@ -157,7 +157,7 @@ public class SubmitMRXCommand extends Su
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitPigXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitPigXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitPigXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitPigXCommand.java Wed Jan 18 01:09:12 2012
@@ -162,7 +162,7 @@ public class SubmitPigXCommand extends S
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java Wed Jan 18 01:09:12 2012
@@ -265,7 +265,7 @@ public class SubmitXCommand extends Work
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java Wed Jan 18 01:09:12 2012
@@ -166,7 +166,7 @@ public class SuspendXCommand extends Wor
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return this.wfid;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java Wed Jan 18 01:09:12 2012
@@ -87,7 +87,7 @@ public class WfEndXCommand extends Workf
     }
 
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return job.getId();
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowActionInfoXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowActionInfoXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowActionInfoXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowActionInfoXCommand.java Wed Jan 18 01:09:12 2012
@@ -59,7 +59,7 @@ public class WorkflowActionInfoXCommand 
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    protected String getEntityKey() {
+    public String getEntityKey() {
         return null;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java Wed Jan 18 01:09:12 2012
@@ -18,6 +18,7 @@
 package org.apache.oozie.service;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,7 +32,6 @@ import java.util.concurrent.RejectedExec
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
@@ -79,6 +79,8 @@ public class CallableQueueService implem
     public static final String CONF_THREADS = CONF_PREFIX + "threads";
     public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
     public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
+    public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
+    public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
 
     public static final int CONCURRENCY_DELAY = 500;
 
@@ -88,6 +90,12 @@ public class CallableQueueService implem
 
     private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
 
+    private final ConcurrentHashMap<String, List<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, List<XCallable<?>>>();
+
+    private final HashSet<String> interruptTypes = new HashSet<String>();
+
+    private int interruptMapMaxSize;
+
     private int maxCallableConcurrency;
 
     private boolean callableBegin(XCallable<?> callable) {
@@ -241,7 +249,6 @@ public class CallableQueueService implem
                 uniqueCallables.remove(callable.getKey());
             }
         }
-
     }
 
     class CompositeCallable implements XCallable<Void> {
@@ -282,6 +289,11 @@ public class CallableQueueService implem
         }
 
         @Override
+        public String getEntityKey() {
+            return "#composite#" + callables.get(0).getEntityKey();
+        }
+
+        @Override
         public int getPriority() {
             return priority;
         }
@@ -291,6 +303,19 @@ public class CallableQueueService implem
             return createdTime;
         }
 
+        @Override
+        public void setInterruptMode(boolean mode) {
+        }
+
+        @Override
+        public boolean getInterruptMode() {
+            return false;
+        }
+
+        public List<XCallable<?>> getCallables() {
+            return this.callables;
+        }
+
         public Void call() throws Exception {
             XLog log = XLog.getLog(getClass());
 
@@ -379,14 +404,12 @@ public class CallableQueueService implem
                 uniqueCallables.remove(callable.getKey());
             }
         }
-
     }
 
     private XLog log = XLog.getLog(getClass());
 
     private int queueSize;
     private PriorityDelayQueue<CallableWrapper> queue;
-    private AtomicLong delayQueueExecCounter = new AtomicLong(0);
     private ThreadPoolExecutor executor;
     private Instrumentation instrumentation;
 
@@ -414,7 +437,7 @@ public class CallableQueueService implem
      * @param services services instance.
      */
     @Override
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     public void init(Services services) {
         Configuration conf = services.getConf();
 
@@ -422,6 +445,10 @@ public class CallableQueueService implem
         int threads = conf.getInt(CONF_THREADS, 10);
         boolean callableNextEligible = conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true);
 
+        for (String type : conf.getStringCollection(CONF_CALLABLE_INTERRUPT_TYPES)) {
+            interruptTypes.add(type);
+        }
+
         if (!callableNextEligible) {
             queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
                 @Override
@@ -431,11 +458,12 @@ public class CallableQueueService implem
             };
         }
         else {
-            // If the head of this queue has already reached max concurrency, continuously find next one
-            // which has not yet reach max concurrency.Overrided method 'eligibleToPoll' to check if the
+            // If the head of this queue has already reached max concurrency,
+            // continuously find next one
+            // which has not yet reach max concurrency.Overrided method
+            // 'eligibleToPoll' to check if the
             // element of this queue has reached the maximum concurrency.
-            queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS,
-                    queueSize) {
+            queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
                 @Override
                 protected void debug(String msgTemplate, Object... msgArgs) {
                     log.trace(msgTemplate, msgArgs);
@@ -455,6 +483,8 @@ public class CallableQueueService implem
             };
         }
 
+        interruptMapMaxSize = conf.getInt(CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, 100);
+
         // IMPORTANT: The ThreadPoolExecutor does not always the execute
         // commands out of the queue, there are
         // certain conditions where commands are pushed directly to a thread.
@@ -568,7 +598,6 @@ public class CallableQueueService implem
      * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
      *         were not queued.
      */
-    @SuppressWarnings("unchecked")
     public boolean queueSerial(List<? extends XCallable<?>> callables) {
         return queueSerial(callables, 0);
     }
@@ -578,8 +607,8 @@ public class CallableQueueService implem
      *
      * @param callable callable to queue for delayed execution
      * @param delay time, in milliseconds, that the callable should be delayed.
-     * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
-     *         was not queued.
+     * @return <code>true</code> if the callable was queued, <code>false</code>
+     *         if the queue is full and the callable was not queued.
      */
     public synchronized boolean queue(XCallable<?> callable, long delay) {
         if (callable == null) {
@@ -590,6 +619,7 @@ public class CallableQueueService implem
             log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
         }
         else {
+            checkInterruptTypes(callable);
             queued = queue(new CallableWrapper(callable, delay), false);
             if (queued) {
                 incrCounter(INSTR_QUEUED_COUNTER, 1);
@@ -613,7 +643,6 @@ public class CallableQueueService implem
      * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
      *         were not queued.
      */
-    @SuppressWarnings("unchecked")
     public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
         boolean queued;
         if (callables == null || callables.size() == 0) {
@@ -653,6 +682,62 @@ public class CallableQueueService implem
     }
 
     /**
+     * check the interrupt map for the existence of an interrupt commands if
+     * exist a List of Interrupt Callable for the same lock key will bereturned,
+     * otherwise it will return null
+     */
+    public List<XCallable<?>> checkInterrupts(String lockKey) {
+
+        if (lockKey != null) {
+            return interruptCommandsMap.remove(lockKey);
+        }
+        return null;
+    }
+
+    /**
+     * check if the callable is of an interrupt type and insert it into the map
+     * accordingly
+     *
+     * @param callable
+     */
+    public void checkInterruptTypes(XCallable<?> callable) {
+        if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
+            for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
+                if (interruptTypes.contains(singleCallable.getType())) {
+                    insertCallableIntoInterruptMap(singleCallable);
+                }
+            }
+        }
+        else if (interruptTypes.contains(callable.getType())) {
+            insertCallableIntoInterruptMap(callable);
+        }
+    }
+
+    /**
+     * insert a new callable in the Interrupt Command Map add a new element to
+     * the list or create a new list accordingly
+     *
+     * @param callable
+     */
+    public void insertCallableIntoInterruptMap(XCallable<?> callable) {
+        if (interruptCommandsMap.size() < interruptMapMaxSize) {
+            List<XCallable<?>> newList = Collections.synchronizedList(new ArrayList<XCallable<?>>());
+            List<XCallable<?>> interruptList = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newList);
+            if (interruptList == null) {
+                interruptList = newList;
+            }
+            interruptList.add(callable);
+            log.trace("Inserting an interrupt element [{1}] to the interrupt map", interruptCommandsMap.size(),
+                    callable.toString());
+        }
+        else {
+            log.warn(
+                    "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]",
+                    interruptCommandsMap.size(), callable.toString());
+        }
+    }
+
+    /**
      * Get the list of strings of queue dump
      *
      * @return the list of string that representing each CallableWrapper

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java Wed Jan 18 01:09:12 2012
@@ -227,7 +227,7 @@ public class RecoveryService implements 
                                                                                 INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
                     if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
                         if (useXCommand) {
-                            queueCallable(new CoordActionInputCheckXCommand(caction.getId()));
+                            queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
                         } else {
                             queueCallable(new CoordActionInputCheckCommand(caction.getId()));
                         }
@@ -238,8 +238,7 @@ public class RecoveryService implements 
                         CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(caction.getJobId()));
 
                         if (useXCommand) {
-                            queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), coordJob
-                                    .getAuthToken()));
+                            queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), coordJob.getAuthToken(), caction.getJobId()));
                         } else {
                             queueCallable(new CoordActionStartCommand(caction.getId(), coordJob.getUser(), coordJob
                                     .getAuthToken()));

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java Wed Jan 18 01:09:12 2012
@@ -62,4 +62,23 @@ public interface XCallable<T> extends Ca
      */
     public String getKey();
 
+    /**
+     * Return the lock key of the callable
+     *
+     * @return the callable Lock key
+     */
+    public String getEntityKey();
+
+    /**
+     * set the mode of execution for the callable. True if in interrupt, false
+     * if not
+     */
+    public void setInterruptMode(boolean mode);
+
+    /**
+     * @return the mode of execution. true if it is executed as an Interrupt,
+     *         false otherwise
+     */
+    public boolean getInterruptMode();
+
 }

Modified: incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ incubator/oozie/trunk/core/src/main/resources/oozie-default.xml Wed Jan 18 01:09:12 2012
@@ -270,7 +270,34 @@
             Oozie continuously find next one which has not yet reach max concurrency.
         </description>
     </property>
-    
+
+    <property>
+        <name>oozie.service.CallableQueueService.InterruptMapMaxSize</name>
+        <value>500</value>
+        <description>
+            Maximum Size of the Interrupt Map, the interrupt element will not be inserted in the map if exceeded the size.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.CallableQueueService.InterruptTypes</name>
+        <value>
+            kill,
+            resume,
+            suspend,
+            bundle_kill,
+            bundle_resume,
+            bundle_suspend,
+            coord_kill,
+            coord_change,
+            coord_resume,
+            coord_suspend
+        </value>
+        <description>
+            Getting the types of XCommands that are considered to be of Interrupt type
+        </description>
+    </property>
+
     <!--  CoordMaterializeTriggerService -->
 
 	<property>

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java Wed Jan 18 01:09:12 2012
@@ -73,6 +73,15 @@ public class TestCommand extends XTestCa
             return sb.toString();
         }
 
+        @Override
+        public void setInterruptMode(boolean mode) {
+        }
+
+        @Override
+        public boolean getInterruptMode() {
+            return false;
+        }
+
         public Void call() throws Exception {
             EXECUTED.add(name);
             return null;
@@ -83,6 +92,11 @@ public class TestCommand extends XTestCa
             return this.key;
         }
 
+        @Override
+        public String getEntityKey() {
+            return null;
+        }
+
     }
 
     private static class MyCommand extends Command<Object, WorkflowStore> {

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestXCommand.java Wed Jan 18 01:09:12 2012
@@ -75,7 +75,7 @@ public class TestXCommand extends XTestC
         }
 
         @Override
-        protected String getEntityKey() {
+        public String getEntityKey() {
             return "key";
         }
 

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java Wed Jan 18 01:09:12 2012
@@ -66,8 +66,8 @@ public class TestCoordActionInputCheckXC
         long executed = 0;
         int wait;
 
-        public MyCoordActionInputCheckXCommand(String actionId, int wait) {
-            super(actionId);
+        public MyCoordActionInputCheckXCommand(String actionId, int wait, String entityKey) {
+            super(actionId, entityKey);
             this.wait = wait;
         }
 
@@ -106,9 +106,9 @@ public class TestCoordActionInputCheckXC
         createDir(getTestCaseDir() + "/2009/15/");
         createDir(getTestCaseDir() + "/2009/08/");
 
-        final MyCoordActionInputCheckXCommand callable1 = new MyCoordActionInputCheckXCommand(action1.getId(), 100);
-        final MyCoordActionInputCheckXCommand callable2 = new MyCoordActionInputCheckXCommand(action1.getId(), 100);
-        final MyCoordActionInputCheckXCommand callable3 = new MyCoordActionInputCheckXCommand(action1.getId(), 100);
+        final MyCoordActionInputCheckXCommand callable1 = new MyCoordActionInputCheckXCommand(action1.getId(), 100, "1");
+        final MyCoordActionInputCheckXCommand callable2 = new MyCoordActionInputCheckXCommand(action1.getId(), 100, "2");
+        final MyCoordActionInputCheckXCommand callable3 = new MyCoordActionInputCheckXCommand(action1.getId(), 100, "3");
 
         List<MyCoordActionInputCheckXCommand> callables = Arrays.asList(callable1, callable2, callable3);
 
@@ -137,7 +137,7 @@ public class TestCoordActionInputCheckXC
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
         createDir(getTestCaseDir() + "/2009/29/");
         createDir(getTestCaseDir() + "/2009/15/");
-        new CoordActionInputCheckXCommand(job.getId() + "@1").call();
+        new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
         checkCoordAction(job.getId() + "@1");
     }
 
@@ -162,7 +162,7 @@ public class TestCoordActionInputCheckXC
         Services.get().getConf().setLong(CoordActionInputCheckXCommand.CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL,
                 testedValue);
 
-        CoordActionInputCheckXCommand caicc = new CoordActionInputCheckXCommand(job.getId() + "@1");
+        CoordActionInputCheckXCommand caicc = new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId());
 
         long effectiveValue = caicc.getCoordInputCheckRequeueInterval();
         // Verify if two values are same.

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java Wed Jan 18 01:09:12 2012
@@ -77,7 +77,7 @@ public class TestCoordActionStartXComman
     public void testActionStartCommand() throws IOException, JPAExecutorException, CommandException {
         String actionId = new Date().getTime() + "-COORD-ActionStartCommand-C@1";
         addRecordToActionTable(actionId, 1);
-        new CoordActionStartXCommand(actionId, "me", "mytoken").call();
+        new CoordActionStartXCommand(actionId, "me", "mytoken", "myjob").call();
         checkCoordAction(actionId);
     }
 
@@ -97,7 +97,7 @@ public class TestCoordActionStartXComman
                 CoordinatorAction.Status.SUBMITTED, "coord-action-start-escape-strings.xml", 0);
 
         String actionId = action.getId();
-        new CoordActionStartXCommand(actionId, getTestUser(), "undef").call();
+        new CoordActionStartXCommand(actionId, getTestUser(), "undef","myjob").call();
 
         final JPAService jpaService = Services.get().get(JPAService.class);
         action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));