You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/01/26 19:43:44 UTC
oozie git commit: OOZIE-2394 Oozie can execute command without
holding lock
Repository: oozie
Updated Branches:
refs/heads/master 70a5ffe4b -> 70052969a
OOZIE-2394 Oozie can execute command without holding lock
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/70052969
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/70052969
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/70052969
Branch: refs/heads/master
Commit: 70052969a1df064957bc5dd06d69fc1955401e1b
Parents: 70a5ffe
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue Jan 26 10:43:33 2016 -0800
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue Jan 26 10:43:33 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/oozie/command/XCommand.java | 58 +++++++-------------
.../coord/CoordActionInputCheckXCommand.java | 2 +-
.../command/coord/CoordActionReadyXCommand.java | 2 +-
.../oozie/command/wf/ActionCheckXCommand.java | 2 +-
.../oozie/command/wf/ActionEndXCommand.java | 2 +-
.../oozie/command/wf/ActionStartXCommand.java | 2 +-
.../apache/oozie/command/wf/SignalXCommand.java | 2 +-
release-log.txt | 1 +
8 files changed, 26 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/XCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java b/core/src/main/java/org/apache/oozie/command/XCommand.java
index ff87510..bdf13f6 100644
--- a/core/src/main/java/org/apache/oozie/command/XCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/XCommand.java
@@ -74,7 +74,6 @@ public abstract class XCommand<T> implements XCallable<T> {
private LockToken lock;
private AtomicBoolean used = new AtomicBoolean(false);
private boolean inInterrupt = false;
- private boolean isSynchronous = false;
private Map<Long, List<XCommand<?>>> commandQueue;
protected boolean dryrun = false;
@@ -212,13 +211,16 @@ public abstract class XCommand<T> implements XCallable<T> {
if (lock == null) {
instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
if (isReQueueRequired()) {
- //if not acquire the lock, re-queue itself with default delay
+ // if not acquire the lock, re-queue itself with default delay
queue(this, getRequeueDelay());
- LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
- } else {
+ LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(),
+ getLockTimeOut(), getName());
+ }
+ else {
throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
}
- } else {
+ }
+ else {
LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
}
}
@@ -252,13 +254,11 @@ public abstract class XCommand<T> implements XCallable<T> {
Instrumentation.Cron callCron = new Instrumentation.Cron();
try {
callCron.start();
- if (!isSynchronous) {
- eagerLoadState();
- eagerVerifyPrecondition();
- }
+ eagerLoadState();
+ eagerVerifyPrecondition();
try {
T ret = null;
- if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
+ if (isLockRequired() && !this.inInterruptMode()) {
Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
acquireLockCron.start();
acquireLock();
@@ -270,10 +270,11 @@ public abstract class XCommand<T> implements XCallable<T> {
this.executeInterrupts();
}
- if (isSynchronous || !isLockRequired() || (lock != null) || this.inInterruptMode()) {
+ if (!isLockRequired() || (lock != null) || this.inInterruptMode()) {
if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
&& !used.compareAndSet(false, true)) {
- LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString());
+ LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(),
+ this.toString());
return null;
}
LOG.trace("Load state for [{0}]", getEntityKey());
@@ -300,12 +301,12 @@ public abstract class XCommand<T> implements XCallable<T> {
return ret;
}
finally {
- if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
+ if (isLockRequired() && !this.inInterruptMode()) {
releaseLock();
}
}
}
- catch(PreconditionException pex){
+ catch (PreconditionException pex) {
LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
return null;
@@ -338,25 +339,6 @@ public abstract class XCommand<T> implements XCallable<T> {
}
/**
- * Call this command synchronously from its caller. This benefits faster
- * execution of command lifecycle for control nodes and kicking off
- * subsequent actions
- *
- * @param callerEntityKey
- * @return the {link #execute} return value.
- * @throws CommandException
- */
- public final T call(String callerEntityKey) throws CommandException {
- if (!callerEntityKey.equals(this.getEntityKey())) {
- throw new CommandException(ErrorCode.E0607, "Entity Keys mismatch during synchronous call", "caller="
- + callerEntityKey + ", callee=" + getEntityKey());
- }
- isSynchronous = true; //setting to true so lock acquiring and release is not repeated
- LOG.trace("Executing synchronously command [{0}] on job [{1}]", this.getName(), this.getKey());
- return call();
- }
-
- /**
* Check for the existence of interrupts for the same lock key
* Execute them if exist.
*
@@ -441,7 +423,7 @@ public abstract class XCommand<T> implements XCallable<T> {
* <p>
* A trivial implementation is calling {link #loadState}.
*/
- protected void eagerLoadState() throws CommandException{
+ protected void eagerLoadState() throws CommandException {
}
/**
@@ -453,7 +435,7 @@ public abstract class XCommand<T> implements XCallable<T> {
*
* @throws CommandException thrown if the precondition is not met.
*/
- protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
+ protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
}
/**
@@ -470,7 +452,7 @@ public abstract class XCommand<T> implements XCallable<T> {
*
* @throws CommandException thrown if the precondition is not met.
*/
- protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
+ protected abstract void verifyPrecondition() throws CommandException, PreconditionException;
/**
* Command execution body.
@@ -485,7 +467,6 @@ public abstract class XCommand<T> implements XCallable<T> {
*/
protected abstract T execute() throws CommandException;
-
/**
* Return the {@link Instrumentation} instance in use.
*
@@ -502,7 +483,6 @@ public abstract class XCommand<T> implements XCallable<T> {
this.used.set(false);
}
-
/**
* Return the delay time for requeue
* <p>
@@ -522,7 +502,7 @@ public abstract class XCommand<T> implements XCallable<T> {
* @return command key
*/
@Override
- public String getKey(){
+ public String getKey() {
return this.key;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
index 742d00d..11184d1 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
@@ -197,7 +197,7 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
coordAction.setActionXml(actionXml.toString());
coordAction.setStatus(CoordinatorAction.Status.READY);
updateCoordAction(coordAction, true);
- new CoordActionReadyXCommand(coordAction.getJobId()).call(getEntityKey());
+ new CoordActionReadyXCommand(coordAction.getJobId()).call();
}
else if (!isTimeout(currentTime)) {
if (status == false) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
index 2e9c5ea..2d8af04 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
@@ -182,7 +182,7 @@ public class CoordActionReadyXCommand extends CoordinatorXCommand<Void> {
}
// start action
new CoordActionStartXCommand(action.getId(), coordJob.getUser(), coordJob.getAppName(),
- action.getJobId()).call(getEntityKey());
+ action.getJobId()).call();
}
else {
break;
http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
index 5827387..ea4d340 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -241,7 +241,7 @@ public class ActionCheckXCommand extends ActionXCommand<Void> {
generateEvent(wfAction, wfJob.getUser());
}
if (execSynchronous) {
- new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey());
+ new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
}
}
catch (JPAExecutorException e) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
index 4006441..d030a10 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
@@ -277,7 +277,7 @@ public class ActionEndXCommand extends ActionXCommand<Void> {
if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
generateEvent(wfAction, wfJob.getUser());
}
- new SignalXCommand(jobId, actionId).call(getEntityKey());
+ new SignalXCommand(jobId, actionId).call();
}
LOG.debug("ENDED ActionEndXCommand for action " + actionId);
http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index 85a6cd7..2939b60 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -353,7 +353,7 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
}
protected void callActionEnd() throws CommandException {
- new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey());
+ new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
}
protected void updateJobLastModified(){
http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 6f64647..d2bb403 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -456,7 +456,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
}
else if (syncAction != null) {
- new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(getEntityKey());
+ new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call();
}
else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)){
startForkedActions(workflowActionBeanListForForked);
http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 6dac28b..1525ae4 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2394 Oozie can execute command without holding lock (puru)
OOZIE-1922 MemoryLocksService fails if lock is acquired multiple times in same thread and released (puru)
OOZIE-2432 TestPurgeXCommand fails (fdenes via rkanter)
OOZIE-2434 inconsistent coord action status and workflow job status (satishsaley via puru)