You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/01/26 19:43:44 UTC

oozie git commit: OOZIE-2394 Oozie can execute command without holding lock

Repository: oozie
Updated Branches:
  refs/heads/master 70a5ffe4b -> 70052969a


OOZIE-2394 Oozie can execute command without holding lock


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/70052969
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/70052969
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/70052969

Branch: refs/heads/master
Commit: 70052969a1df064957bc5dd06d69fc1955401e1b
Parents: 70a5ffe
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue Jan 26 10:43:33 2016 -0800
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue Jan 26 10:43:33 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/oozie/command/XCommand.java | 58 +++++++-------------
 .../coord/CoordActionInputCheckXCommand.java    |  2 +-
 .../command/coord/CoordActionReadyXCommand.java |  2 +-
 .../oozie/command/wf/ActionCheckXCommand.java   |  2 +-
 .../oozie/command/wf/ActionEndXCommand.java     |  2 +-
 .../oozie/command/wf/ActionStartXCommand.java   |  2 +-
 .../apache/oozie/command/wf/SignalXCommand.java |  2 +-
 release-log.txt                                 |  1 +
 8 files changed, 26 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/XCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java b/core/src/main/java/org/apache/oozie/command/XCommand.java
index ff87510..bdf13f6 100644
--- a/core/src/main/java/org/apache/oozie/command/XCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/XCommand.java
@@ -74,7 +74,6 @@ public abstract class XCommand<T> implements XCallable<T> {
     private LockToken lock;
     private AtomicBoolean used = new AtomicBoolean(false);
     private boolean inInterrupt = false;
-    private boolean isSynchronous = false;
 
     private Map<Long, List<XCommand<?>>> commandQueue;
     protected boolean dryrun = false;
@@ -212,13 +211,16 @@ public abstract class XCommand<T> implements XCallable<T> {
         if (lock == null) {
             instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
             if (isReQueueRequired()) {
-                //if not acquire the lock, re-queue itself with default delay
+                // if not acquire the lock, re-queue itself with default delay
                 queue(this, getRequeueDelay());
-                LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
-            } else {
+                LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(),
+                        getLockTimeOut(), getName());
+            }
+            else {
                 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
             }
-        } else {
+        }
+        else {
             LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
         }
     }
@@ -252,13 +254,11 @@ public abstract class XCommand<T> implements XCallable<T> {
         Instrumentation.Cron callCron = new Instrumentation.Cron();
         try {
             callCron.start();
-            if (!isSynchronous) {
-                eagerLoadState();
-                eagerVerifyPrecondition();
-            }
+            eagerLoadState();
+            eagerVerifyPrecondition();
             try {
                 T ret = null;
-                if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
+                if (isLockRequired() && !this.inInterruptMode()) {
                     Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
                     acquireLockCron.start();
                     acquireLock();
@@ -270,10 +270,11 @@ public abstract class XCommand<T> implements XCallable<T> {
                     this.executeInterrupts();
                 }
 
-                if (isSynchronous || !isLockRequired() || (lock != null) || this.inInterruptMode()) {
+                if (!isLockRequired() || (lock != null) || this.inInterruptMode()) {
                     if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
                             && !used.compareAndSet(false, true)) {
-                        LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(), this.toString());
+                        LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(),
+                                this.toString());
                         return null;
                     }
                     LOG.trace("Load state for [{0}]", getEntityKey());
@@ -300,12 +301,12 @@ public abstract class XCommand<T> implements XCallable<T> {
                 return ret;
             }
             finally {
-                if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
+                if (isLockRequired() && !this.inInterruptMode()) {
                     releaseLock();
                 }
             }
         }
-        catch(PreconditionException pex){
+        catch (PreconditionException pex) {
             LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
             instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
             return null;
@@ -338,25 +339,6 @@ public abstract class XCommand<T> implements XCallable<T> {
     }
 
     /**
-     * Call this command synchronously from its caller. This benefits faster
-     * execution of command lifecycle for control nodes and kicking off
-     * subsequent actions
-     *
-     * @param callerEntityKey
-     * @return the {link #execute} return value.
-     * @throws CommandException
-     */
-    public final T call(String callerEntityKey) throws CommandException {
-        if (!callerEntityKey.equals(this.getEntityKey())) {
-            throw new CommandException(ErrorCode.E0607, "Entity Keys mismatch during synchronous call", "caller="
-                    + callerEntityKey + ", callee=" + getEntityKey());
-        }
-        isSynchronous = true; //setting to true so lock acquiring and release is not repeated
-        LOG.trace("Executing synchronously command [{0}] on job [{1}]", this.getName(), this.getKey());
-        return call();
-    }
-
-    /**
      * Check for the existence of interrupts for the same lock key
      * Execute them if exist.
      *
@@ -441,7 +423,7 @@ public abstract class XCommand<T> implements XCallable<T> {
      * <p>
      * A trivial implementation is calling {link #loadState}.
      */
-    protected void eagerLoadState() throws CommandException{
+    protected void eagerLoadState() throws CommandException {
     }
 
     /**
@@ -453,7 +435,7 @@ public abstract class XCommand<T> implements XCallable<T> {
      *
      * @throws CommandException thrown if the precondition is not met.
      */
-    protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
+    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
     }
 
     /**
@@ -470,7 +452,7 @@ public abstract class XCommand<T> implements XCallable<T> {
      *
      * @throws CommandException thrown if the precondition is not met.
      */
-    protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
+    protected abstract void verifyPrecondition() throws CommandException, PreconditionException;
 
     /**
      * Command execution body.
@@ -485,7 +467,6 @@ public abstract class XCommand<T> implements XCallable<T> {
      */
     protected abstract T execute() throws CommandException;
 
-
     /**
      * Return the {@link Instrumentation} instance in use.
      *
@@ -502,7 +483,6 @@ public abstract class XCommand<T> implements XCallable<T> {
         this.used.set(false);
     }
 
-
     /**
      * Return the delay time for requeue
      * <p>
@@ -522,7 +502,7 @@ public abstract class XCommand<T> implements XCallable<T> {
      * @return command key
      */
     @Override
-    public String getKey(){
+    public String getKey() {
         return this.key;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
index 742d00d..11184d1 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
@@ -197,7 +197,7 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
                 coordAction.setActionXml(actionXml.toString());
                 coordAction.setStatus(CoordinatorAction.Status.READY);
                 updateCoordAction(coordAction, true);
-                new CoordActionReadyXCommand(coordAction.getJobId()).call(getEntityKey());
+                new CoordActionReadyXCommand(coordAction.getJobId()).call();
             }
             else if (!isTimeout(currentTime)) {
                 if (status == false) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
index 2e9c5ea..2d8af04 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
@@ -182,7 +182,7 @@ public class CoordActionReadyXCommand extends CoordinatorXCommand<Void> {
                 }
                 // start action
                 new CoordActionStartXCommand(action.getId(), coordJob.getUser(), coordJob.getAppName(),
-                        action.getJobId()).call(getEntityKey());
+                        action.getJobId()).call();
             }
             else {
                 break;

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
index 5827387..ea4d340 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -241,7 +241,7 @@ public class ActionCheckXCommand extends ActionXCommand<Void> {
                     generateEvent(wfAction, wfJob.getUser());
                 }
                 if (execSynchronous) {
-                    new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey());
+                    new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
                 }
             }
             catch (JPAExecutorException e) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
index 4006441..d030a10 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
@@ -277,7 +277,7 @@ public class ActionEndXCommand extends ActionXCommand<Void> {
             if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
                 generateEvent(wfAction, wfJob.getUser());
             }
-            new SignalXCommand(jobId, actionId).call(getEntityKey());
+            new SignalXCommand(jobId, actionId).call();
         }
 
         LOG.debug("ENDED ActionEndXCommand for action " + actionId);

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index 85a6cd7..2939b60 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -353,7 +353,7 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
     }
 
     protected void callActionEnd() throws CommandException {
-        new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey());
+        new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
     }
 
     protected void updateJobLastModified(){

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 6f64647..d2bb403 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -456,7 +456,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
             new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
         }
         else if (syncAction != null) {
-            new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(getEntityKey());
+            new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call();
         }
         else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)){
             startForkedActions(workflowActionBeanListForForked);

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 6dac28b..1525ae4 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2394 Oozie can execute command without holding lock (puru)
 OOZIE-1922 MemoryLocksService fails if lock is acquired multiple times in same thread and released (puru)
 OOZIE-2432 TestPurgeXCommand fails (fdenes via rkanter)
 OOZIE-2434 inconsistent coord action status and workflow job status (satishsaley via puru)