You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/05/17 22:08:47 UTC

[3/3] oozie git commit: OOZIE-2509 SLA job status can stuck in running state

OOZIE-2509 SLA job status can stuck in running state


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ba7a7b85
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ba7a7b85
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ba7a7b85

Branch: refs/heads/master
Commit: ba7a7b85e040a313fa107474768dd67a325f91d5
Parents: 5fbd3eb
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue May 17 15:08:35 2016 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue May 17 15:08:35 2016 -0700

----------------------------------------------------------------------
 .../command/coord/CoordActionCheckXCommand.java |   6 +-
 .../sla/SLACoordActionJobEventXCommand.java     |  80 ++
 .../sla/SLACoordActionJobHistoryXCommand.java   |  78 ++
 .../oozie/command/sla/SLAJobEventXCommand.java  | 301 ++++++
 .../command/sla/SLAJobHistoryXCommand.java      | 127 +++
 .../sla/SLAWorkflowActionJobEventXCommand.java  |  62 ++
 .../SLAWorkflowActionJobHistoryXCommand.java    |  57 ++
 .../sla/SLAWorkflowJobEventXCommand.java        |  64 ++
 .../sla/SLAWorkflowJobHistoryXCommand.java      |  56 ++
 .../jpa/CoordActionGetForSLAJPAExecutor.java    |  82 --
 .../executor/jpa/CoordActionQueryExecutor.java  |  13 +-
 .../executor/jpa/SLASummaryQueryExecutor.java   |   9 -
 .../jpa/WorkflowActionGetForSLAJPAExecutor.java |  78 --
 .../jpa/WorkflowActionQueryExecutor.java        |  13 +-
 .../jpa/WorkflowJobGetForSLAJPAExecutor.java    |  78 --
 .../executor/jpa/WorkflowJobQueryExecutor.java  |  13 +-
 .../oozie/service/ConfigurationService.java     |  12 +-
 .../org/apache/oozie/sla/SLACalcStatus.java     |  54 -
 .../apache/oozie/sla/SLACalculatorMemory.java   | 989 +++----------------
 .../org/apache/oozie/sla/SLASummaryBean.java    |   2 -
 .../apache/oozie/sla/SLAXCommandFactory.java    |  92 ++
 .../coord/TestCoordActionsKillXCommand.java     |  11 +-
 .../jpa/TestSLASummaryQueryExecutor.java        |   2 +-
 .../apache/oozie/service/TestHASLAService.java  |  64 +-
 .../oozie/sla/TestSLACalculatorMemory.java      | 344 +++++--
 .../oozie/sla/TestSLAEventGeneration.java       |   1 +
 .../oozie/sla/TestSLAJobEventListener.java      | 136 ++-
 .../org/apache/oozie/sla/TestSLAService.java    | 123 ++-
 release-log.txt                                 |   1 +
 29 files changed, 1668 insertions(+), 1280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
index 128feb2..bdbbd24 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
@@ -47,9 +47,10 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 
 /**
  * The command checks workflow status for coordinator action.
@@ -177,7 +178,8 @@ public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> {
                 coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId));
                 coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
                         coordAction.getJobId()));
-                workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId()));
+                workflowJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA,
+                        coordAction.getExternalId());
                 LogUtils.setLogInfo(coordAction);
             }
             else {

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
new file mode 100644
index 0000000..dfe2637
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLACoordActionJobEventXCommand extends SLAJobEventXCommand {
+    CoordinatorActionBean ca;
+    WorkflowJobBean wf;
+
+    public SLACoordActionJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+        super(slaCalc, lockTimeOut);
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        try {
+            ca = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_FOR_SLA, slaCalc.getId());
+            if (ca.getExternalId() != null) {
+                wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, ca.getExternalId());
+            }
+            LogUtils.setLogInfo(ca);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+    }
+
+
+    protected void updateJobInfo() {
+        if (ca.isTerminalStatus()) {
+            setEnded(true);
+            setEndMiss(ca.isTerminalWithFailure());
+            slaCalc.setActualEnd(ca.getLastModifiedTime());
+            if (wf != null) {
+                if (wf.getEndTime() != null) {
+                    if (slaCalc.getExpectedEnd() != null
+                            && wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+                        setEndMiss(true);
+                    }
+                    slaCalc.setActualEnd(wf.getEndTime());
+                }
+                slaCalc.setActualStart(wf.getStartTime());
+            }
+        }
+        else {
+            if (wf != null) {
+                slaCalc.setActualStart(wf.getStartTime());
+            }
+        }
+        slaCalc.setJobStatus(ca.getStatusStr());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
new file mode 100644
index 0000000..b7f09d3
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLACoordActionJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+    CoordinatorActionBean cAction = null;
+
+    public SLACoordActionJobHistoryXCommand(String jobId) {
+        super(jobId);
+    }
+
+
+    protected void loadState() throws CommandException {
+        try {
+            cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_FOR_SLA, jobId);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(cAction);
+    }
+
+    protected void updateSLASummary() throws CommandException {
+        try {
+            updateSLASummaryForCoordAction(cAction);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+
+    }
+
+    protected void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException {
+        String wrkflowId = bean.getExternalId();
+        if (wrkflowId != null) {
+            WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get(
+                    WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId);
+            if (wrkflow != null) {
+                updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime(), bean.getStatusStr());
+            }
+        }
+        else{
+            updateSLASummary(bean.getId(), null, bean.getLastModifiedTime(), bean.getStatusStr());
+        }
+    }
+
+    @Override
+    protected boolean isJobEnded() {
+        return cAction.isTerminalStatus();
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
new file mode 100644
index 0000000..9b18606
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import java.util.Date;
+
+import org.apache.oozie.XException;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.event.SLAEvent.EventStatus;
+import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.sla.SLASummaryBean;
+
+public abstract class SLAJobEventXCommand extends XCommand<Void> {
+    private long lockTimeOut = 0 ;
+    JPAService jpaService = Services.get().get(JPAService.class);
+    SLACalcStatus slaCalc;
+    final static String SLA_LOCK_PREFIX = "sla_";
+    private boolean isEnded = false;
+    private boolean isEndMiss = false;
+
+    public SLAJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+        super("SLA.job.event", "SLA.job.event", 1);
+        this.slaCalc = slaCalc;
+        this.lockTimeOut = lockTimeOut;
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return true;
+    }
+
+    @Override
+    protected boolean isReQueueRequired() {
+        return false;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return SLA_LOCK_PREFIX + slaCalc.getId();
+    }
+
+    protected long getLockTimeOut() {
+        return lockTimeOut;
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+    }
+
+
+    @Override
+    protected Void execute() throws CommandException {
+        updateJobInfo();
+        if (isEnded) {
+            processForEnd();
+        }
+        else {
+            processForRunning();
+        }
+        try {
+            writeToDB();
+        }
+        catch (XException e) {
+            throw new CommandException(e);
+        }
+        return null;
+    }
+
+    /**
+     * Verify job.
+     */
+    protected abstract void updateJobInfo();
+
+    /**
+     * Should alert.
+     *
+     * @param slaObj the sla obj
+     * @return true, if successful
+     */
+    private boolean shouldAlert(SLACalcStatus slaObj) {
+        return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT);
+    }
+
+    /**
+     * Queue event.
+     *
+     * @param event the event
+     */
+    private void queueEvent(SLACalcStatus event) {
+        Services.get().get(EventHandlerService.class).queueEvent(event);
+    }
+
+    /**
+     * Process duration sla.
+     *
+     * @param expected the expected
+     * @param actual the actual
+     * @param slaCalc the sla calc
+     */
+    private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
+        if (expected != -1) {
+            if (actual > expected) {
+                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+            }
+            else if (actual <= expected) {
+                slaCalc.setEventStatus(EventStatus.DURATION_MET);
+            }
+            if (shouldAlert(slaCalc)) {
+                queueEvent(new SLACalcStatus(slaCalc));
+            }
+        }
+    }
+
+
+    /**
+     * WriteSLA object to DB.
+     *
+     * @throws JPAExecutorException the JPA executor exception
+     */
+    private void writeToDB() throws JPAExecutorException {
+        byte eventProc = slaCalc.getEventProcessed();
+        // no more processing, no transfer to history set
+        if (slaCalc.getEventProcessed() >= 8) {
+            slaCalc.setEventProcessed(8);
+        }
+
+        SLASummaryBean slaSummaryBean = new SLASummaryBean();
+        slaSummaryBean.setId(slaCalc.getId());
+        slaSummaryBean.setEventProcessed(eventProc);
+        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+        slaSummaryBean.setActualStart(slaCalc.getActualStart());
+        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+        slaSummaryBean.setLastModifiedTime(new Date());
+
+        SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+                slaSummaryBean);
+
+        LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", slaCalc.getId(),
+                slaCalc.getEventProcessed(), slaCalc.getJobStatus());
+
+    }
+
+    /**
+     * Process for end.
+     */
+    private void processForEnd() {
+        byte eventProc = slaCalc.getEventProcessed();
+
+        LOG.debug("Job {0} has ended. endtime = [{1}]", slaCalc.getId(), slaCalc.getActualEnd());
+        if (isEndMiss()) {
+            slaCalc.setSLAStatus(SLAStatus.MISS);
+        }
+        else {
+            slaCalc.setSLAStatus(SLAStatus.MET);
+        }
+        if (eventProc != 8 && slaCalc.getActualStart() != null) {
+            if ((eventProc & 1) == 0) {
+                if (slaCalc.getExpectedStart() != null) {
+                    if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
+                        slaCalc.setEventStatus(EventStatus.START_MISS);
+                    }
+                    else {
+                        slaCalc.setEventStatus(EventStatus.START_MET);
+                    }
+                    if (shouldAlert(slaCalc)) {
+                        queueEvent(new SLACalcStatus(slaCalc));
+                    }
+                }
+            }
+            slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
+            if (((eventProc >> 1) & 1) == 0) {
+                processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
+            }
+        }
+        if (eventProc != 8 && eventProc < 4) {
+            if (isEndMiss()) {
+                slaCalc.setEventStatus(EventStatus.END_MISS);
+            }
+            else {
+                slaCalc.setEventStatus(EventStatus.END_MET);
+            }
+            if (shouldAlert(slaCalc)) {
+                queueEvent(new SLACalcStatus(slaCalc));
+            }
+        }
+        slaCalc.setEventProcessed(8);
+    }
+
+    /**
+     * Process for running.
+     */
+    private void processForRunning() {
+        byte eventProc = slaCalc.getEventProcessed();
+
+        if (eventProc != 8 && slaCalc.getActualStart() != null) {
+            slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
+        }
+        if (eventProc != 8 && (eventProc & 1) == 0) {
+            if (slaCalc.getExpectedStart() == null) {
+                eventProc++;
+            }
+            else if (slaCalc.getActualStart() != null) {
+                if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
+                    slaCalc.setEventStatus(EventStatus.START_MISS);
+                }
+                else {
+                    slaCalc.setEventStatus(EventStatus.START_MET);
+                }
+                if (shouldAlert(slaCalc)) {
+                    queueEvent(new SLACalcStatus(slaCalc));
+                }
+                eventProc++;
+            }
+            else if (slaCalc.getExpectedStart() != null
+                    && slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
+                slaCalc.setEventStatus(EventStatus.START_MISS);
+                if (shouldAlert(slaCalc)) {
+                    queueEvent(new SLACalcStatus(slaCalc));
+                }
+                eventProc++;
+            }
+
+        }
+        if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+            if (slaCalc.getExpectedDuration() == -1) {
+                eventProc += 2;
+            }
+            else if (slaCalc.getActualStart() != null && slaCalc.getExpectedDuration() != -1) {
+                if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
+                    slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+                    if (shouldAlert(slaCalc)) {
+                        queueEvent(new SLACalcStatus(slaCalc));
+                    }
+                    eventProc += 2;
+                }
+            }
+        }
+        if (eventProc < 4) {
+            if (slaCalc.getExpectedEnd() != null) {
+                if (slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
+                    slaCalc.setEventStatus(EventStatus.END_MISS);
+                    slaCalc.setSLAStatus(SLAStatus.MISS);
+                    if (shouldAlert(slaCalc)) {
+                        queueEvent(new SLACalcStatus(slaCalc));
+                    }
+                    eventProc += 4;
+                }
+            }
+            else {
+                eventProc += 4;
+            }
+        }
+        slaCalc.setEventProcessed(eventProc);
+    }
+
+    public boolean isEnded() {
+        return isEnded;
+    }
+
+    public void setEnded(boolean isEnded) {
+        this.isEnded = isEnded;
+    }
+
+    public boolean isEndMiss() {
+        return isEndMiss;
+    }
+
+    public void setEndMiss(boolean isEndMiss) {
+        this.isEndMiss = isEndMiss;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
new file mode 100644
index 0000000..0b4045a
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import java.util.Date;
+
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.sla.SLASummaryBean;
+
+public abstract class SLAJobHistoryXCommand extends XCommand<Boolean> {
+
+    protected String jobId;
+
+    public SLAJobHistoryXCommand(String jobId) {
+        super("SLAJobHistoryXCommand", "SLAJobHistoryXCommand", 1);
+        this.jobId = jobId;
+
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return true;
+    }
+
+    @Override
+    protected boolean isReQueueRequired() {
+        return false;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return SLAJobEventXCommand.SLA_LOCK_PREFIX + jobId;
+    }
+
+    protected long getLockTimeOut() {
+        return 0L;
+    }
+
+    protected Boolean execute() throws CommandException {
+        if (isJobEnded()) {
+            try {
+                updateSLASummary();
+            }
+            catch (XException e) {
+                throw new CommandException(e);
+            }
+            return true;
+        }
+        else {
+            LOG.debug("Job [{0}] is not finished", jobId);
+        }
+        return false;
+
+    }
+
+    /**
+     * Checks if is job ended.
+     *
+     * @return true, if is job ended
+     */
+    protected abstract boolean isJobEnded();
+
+    /**
+     * Update SLASummary
+     *
+     */
+    protected abstract void updateSLASummary() throws CommandException, XException;
+
+    /**
+     * Update sla summary.
+     *
+     * @param id the id
+     * @param startTime the start time
+     * @param endTime the end time
+     * @param status the status
+     * @throws JPAExecutorException the JPA executor exception
+     */
+    protected void updateSLASummary(String id, Date startTime, Date endTime, String status) throws JPAExecutorException {
+        SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
+        if (sla.getJobStatus().equals(status) && sla.getEventProcessed() == 8) {
+            LOG.debug("SLA job is already updated", sla.getId(), sla.getEventProcessed(), sla.getJobStatus());
+            return;
+        }
+        if (sla != null) {
+            sla.setActualStart(startTime);
+            sla.setActualEnd(endTime);
+            if (startTime != null && endTime != null) {
+                sla.setActualDuration(endTime.getTime() - startTime.getTime());
+            }
+            sla.setLastModifiedTime(new Date());
+            sla.setEventProcessed(8);
+            sla.setJobStatus(status);
+            SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+                    sla);
+            LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", sla.getId(),
+                    sla.getEventProcessed(), sla.getJobStatus());
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
new file mode 100644
index 0000000..fef77ae
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowActionJobEventXCommand extends SLAJobEventXCommand {
+    WorkflowActionBean wa;
+
+    public SLAWorkflowActionJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+        super(slaCalc, lockTimeOut);
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        try {
+            wa = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_FOR_SLA, slaCalc.getId());
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(wa);
+
+    }
+
+
+    @Override
+    protected void updateJobInfo() {
+        if (wa.getEndTime() != null) {
+            setEnded(true);
+            if (wa.isTerminalWithFailure() || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+                setEndMiss(true);
+            }
+        }
+        slaCalc.setActualStart(wa.getStartTime());
+        slaCalc.setActualEnd(wa.getEndTime());
+        slaCalc.setJobStatus(wa.getStatusStr());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
new file mode 100644
index 0000000..7dc4a3c
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowActionJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+    WorkflowActionBean wfAction = null;
+
+    public SLAWorkflowActionJobHistoryXCommand(String jobId) {
+        super(jobId);
+    }
+
+    protected void loadState() throws CommandException {
+
+        try {
+            wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_COMPLETED, jobId);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(wfAction);
+    }
+
+    protected void updateSLASummary() throws XException {
+        updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
+
+    }
+
+    @Override
+    protected boolean isJobEnded() {
+        return wfAction.isComplete() || wfAction.isTerminalWithFailure();
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
new file mode 100644
index 0000000..9a72617
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowJobEventXCommand extends SLAJobEventXCommand {
+    WorkflowJobBean wf;
+
+    public SLAWorkflowJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+        super(slaCalc, lockTimeOut);
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        try {
+            wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, slaCalc.getId());
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(wf);
+
+    }
+
+
+    @Override
+    protected void updateJobInfo() {
+        if (wf.inTerminalState()) {
+            setEnded(true);
+            if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
+                    || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+                setEndMiss(true);
+            }
+            slaCalc.setActualEnd(wf.getEndTime());
+        }
+        slaCalc.setActualStart(wf.getStartTime());
+        slaCalc.setJobStatus(wf.getStatusStr());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
new file mode 100644
index 0000000..79e45ee
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+    WorkflowJobBean wfJob = null;
+
+    public SLAWorkflowJobHistoryXCommand(String jobId) {
+        super(jobId);
+    }
+
+
+    protected void loadState() throws CommandException {
+        try {
+            wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(wfJob);
+    }
+
+    protected void updateSLASummary() throws XException {
+            updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
+    }
+
+    @Override
+    protected boolean isJobEnded() {
+        return wfJob.inTerminalState();
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
deleted file mode 100644
index 8a5b997..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * JPAExecutor to get attributes of CoordinatorActionBean required by SLAService on restart
- */
-public class CoordActionGetForSLAJPAExecutor implements JPAExecutor<CoordinatorActionBean> {
-
-    private String coordActionId;
-
-    public CoordActionGetForSLAJPAExecutor(String coordActionId) {
-        ParamChecker.notNull(coordActionId, "coordActionId");
-        this.coordActionId = coordActionId;
-    }
-
-    @Override
-    public String getName() {
-        return "CoordActionGetForSLAJPAExecutor";
-    }
-
-    @Override
-    public CoordinatorActionBean execute(EntityManager em) throws JPAExecutorException {
-        try {
-            Query q = em.createNamedQuery("GET_COORD_ACTION_FOR_SLA");
-            q.setParameter("id", coordActionId);
-            Object[] obj = (Object[]) q.getSingleResult();
-            CoordinatorActionBean caBean = getBeanForRunningCoordAction(obj);
-            return caBean;
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-
-    }
-
-    private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) {
-        CoordinatorActionBean bean = new CoordinatorActionBean();
-        if (arr[0] != null) {
-            bean.setId((String) arr[0]);
-        }
-        if (arr[1] != null) {
-            bean.setJobId((String) arr[1]);
-        }
-        if (arr[2] != null) {
-            bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
-        }
-        if (arr[3] != null) {
-            bean.setExternalId((String) arr[3]);
-        }
-        if (arr[4] != null) {
-            bean.setLastModifiedTime(DateUtils.toDate((Timestamp)arr[4]));
-        }
-        return bean;
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index 79ec28c..c0e6c19 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -59,7 +59,8 @@ public class CoordActionQueryExecutor extends
         GET_TERMINATED_ACTION_IDS_FOR_DATES,
         GET_ACTIVE_ACTIONS_FOR_DATES,
         GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN,
-        GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN
+        GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN,
+        GET_COORD_ACTION_FOR_SLA
     };
 
     private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
@@ -177,6 +178,7 @@ public class CoordActionQueryExecutor extends
         switch (caQuery) {
             case GET_COORD_ACTION:
             case GET_COORD_ACTION_STATUS:
+            case GET_COORD_ACTION_FOR_SLA:
                 query.setParameter("id", parameters[0]);
                 break;
             case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
@@ -330,6 +332,15 @@ public class CoordActionQueryExecutor extends
                 bean.setExternalId((String) arr[3]);
                 bean.setPushMissingDependenciesBlob((StringBlob) arr[4]);
                 break;
+            case GET_COORD_ACTION_FOR_SLA:
+                arr = (Object[]) ret;
+                bean = new CoordinatorActionBean();
+                bean.setId((String) arr[0]);
+                bean.setJobId((String) arr[1]);
+                bean.setStatusStr((String) arr[2]);
+                bean.setExternalId((String) arr[3]);
+                bean.setLastModifiedTime((Timestamp) arr[4]);
+                break;
 
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
index 6663162..6ff9df8 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
@@ -37,7 +37,6 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
 
     public enum SLASummaryQuery {
         UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
-        UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES,
         UPDATE_SLA_SUMMARY_ALL,
         UPDATE_SLA_SUMMARY_EVENTPROCESSED,
         UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
@@ -72,14 +71,6 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
                 query.setParameter("actualEndTS", bean.getActualEndTimestamp());
                 query.setParameter("actualDuration", bean.getActualDuration());
                 break;
-            case UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES:
-                query.setParameter("jobId", bean.getId());
-                query.setParameter("eventProcessed", bean.getEventProcessed());
-                query.setParameter("actualStartTS", bean.getActualStartTimestamp());
-                query.setParameter("actualEndTS", bean.getActualEndTimestamp());
-                query.setParameter("actualDuration", bean.getActualDuration());
-                query.setParameter("lastModifiedTS", bean.getLastModifiedTimestamp());
-                break;
             case UPDATE_SLA_SUMMARY_ALL:
                 query.setParameter("appName", bean.getAppName());
                 query.setParameter("appType", bean.getAppType().toString());

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
deleted file mode 100644
index 280294b..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Retrieve the workflow action bean for sla service
- */
-public class WorkflowActionGetForSLAJPAExecutor implements JPAExecutor<WorkflowActionBean> {
-
-    private String wfActionId;
-
-    public WorkflowActionGetForSLAJPAExecutor(String wfActionId) {
-        ParamChecker.notNull(wfActionId, "wfActionId");
-        this.wfActionId = wfActionId;
-    }
-
-    @Override
-    public String getName() {
-        return "WorkflowActionGetForSLAJPAExecutor";
-    }
-
-    @Override
-    public WorkflowActionBean execute(EntityManager em) throws JPAExecutorException {
-        try {
-            Query q = em.createNamedQuery("GET_ACTION_FOR_SLA");
-            q.setParameter("id", wfActionId);
-            Object[] obj = (Object[]) q.getSingleResult();
-            return getBeanFromArray(obj);
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-    }
-
-    private WorkflowActionBean getBeanFromArray(Object[] arr) {
-        WorkflowActionBean wab = new WorkflowActionBean();
-        if (arr[0] != null) {
-            wab.setId((String) arr[0]);
-        }
-        if (arr[1] != null) {
-            wab.setStatus(WorkflowAction.Status.valueOf((String) arr[1]));
-        }
-        if (arr[2] != null) {
-            wab.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
-        }
-        if (arr[3] != null) {
-            wab.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
-        }
-        return wab;
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 078fd40..f01f090 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -57,7 +57,8 @@ public class WorkflowActionQueryExecutor extends
         GET_ACTION_COMPLETED,
         GET_RUNNING_ACTIONS,
         GET_PENDING_ACTIONS,
-        GET_ACTIONS_FOR_WORKFLOW_RERUN
+        GET_ACTIONS_FOR_WORKFLOW_RERUN,
+        GET_ACTION_FOR_SLA
     };
 
     private static WorkflowActionQueryExecutor instance = new WorkflowActionQueryExecutor();
@@ -202,6 +203,7 @@ public class WorkflowActionQueryExecutor extends
             case GET_ACTION_CHECK:
             case GET_ACTION_END:
             case GET_ACTION_COMPLETED:
+            case GET_ACTION_FOR_SLA:
                 query.setParameter("id", parameters[0]);
                 break;
             case GET_RUNNING_ACTIONS:
@@ -363,6 +365,15 @@ public class WorkflowActionQueryExecutor extends
                 bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
                 bean.setType((String) arr[4]);
                 break;
+            case GET_ACTION_FOR_SLA:
+                bean = new WorkflowActionBean();
+                arr = (Object[]) ret;
+                bean.setId((String) arr[0]);
+                bean.setStatusStr((String) arr[1]);
+                bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+                bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
deleted file mode 100644
index 774766f..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Retrieve the workflow job bean for sla service
- */
-public class WorkflowJobGetForSLAJPAExecutor implements JPAExecutor<WorkflowJobBean> {
-
-    private String wfJobId;
-
-    public WorkflowJobGetForSLAJPAExecutor(String wfJobId) {
-        ParamChecker.notNull(wfJobId, "wfJobId");
-        this.wfJobId = wfJobId;
-    }
-
-    @Override
-    public String getName() {
-        return "WorkflowJobGetForSLAJPAExecutor";
-    }
-
-    @Override
-    public WorkflowJobBean execute(EntityManager em) throws JPAExecutorException {
-        try {
-            Query q = em.createNamedQuery("GET_WORKFLOW_FOR_SLA");
-            q.setParameter("id", wfJobId);
-            Object[] obj = (Object[]) q.getSingleResult();
-            return getBeanFromArray(obj);
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-    }
-
-    private WorkflowJobBean getBeanFromArray(Object[] arr) {
-        WorkflowJobBean wjb = new WorkflowJobBean();
-        if (arr[0] != null) {
-            wjb.setId((String) arr[0]);
-        }
-        if (arr[1] != null) {
-            wjb.setStatus(WorkflowJob.Status.valueOf((String) arr[1]));
-        }
-        if (arr[2] != null) {
-            wjb.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
-        }
-        if (arr[3] != null) {
-            wjb.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
-        }
-        return wjb;
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
index ce108d5..13fa54d 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
@@ -28,7 +28,6 @@ import javax.persistence.Query;
 import org.apache.oozie.BinaryBlob;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.StringBlob;
-import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -60,7 +59,8 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
         GET_WORKFLOW_RESUME,
         GET_WORKFLOW_STATUS,
         GET_WORKFLOWS_PARENT_COORD_RERUN,
-        GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN
+        GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN,
+        GET_WORKFLOW_FOR_SLA
     };
 
     private static WorkflowJobQueryExecutor instance = new WorkflowJobQueryExecutor();
@@ -171,6 +171,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
             case GET_WORKFLOW_KILL:
             case GET_WORKFLOW_RESUME:
             case GET_WORKFLOW_STATUS:
+            case GET_WORKFLOW_FOR_SLA:
                 query.setParameter("id", parameters[0]);
                 break;
             case GET_WORKFLOWS_PARENT_COORD_RERUN:
@@ -330,6 +331,14 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
                 bean.setId((String) arr[0]);
                 bean.setParentId((String) arr[1]);
                 break;
+            case GET_WORKFLOW_FOR_SLA:
+                bean = new WorkflowJobBean();
+                arr = (Object[]) ret;
+                bean.setId((String) arr[0]);
+                bean.setStatusStr((String) arr[1]);
+                bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+                bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ConfigurationService.java b/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
index 4246764..9d4dcd9 100644
--- a/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
+++ b/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
@@ -544,12 +544,19 @@ public class ConfigurationService implements Service, Instrumentable {
     }
 
     public static long getLong(String name) {
+        return getLong(name, ConfigUtils.LONG_DEFAULT);
+    }
+
+    public static long getLong(String name, long defultValue) {
         Configuration conf = Services.get().getConf();
-        return getLong(conf, name);
+        return getLong(conf, name, defultValue);
     }
 
     public static long getLong(Configuration conf, String name) {
-        return conf.getLong(name, ConfigUtils.LONG_DEFAULT);
+        return getLong(conf, name, ConfigUtils.LONG_DEFAULT);
+    }
+    public static long getLong(Configuration conf, String name, long defultValue) {
+        return conf.getLong(name, defultValue);
     }
 
     public static Class<?>[] getClasses(String name) {
@@ -590,4 +597,5 @@ public class ConfigurationService implements Service, Instrumentable {
         Configuration conf = Services.get().getConf();
         return getPassword(conf, name, defaultValue);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index 5c0cfd9..3a76dfe 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -25,11 +25,6 @@ import java.util.Map;
 import org.apache.oozie.AppType;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.event.SLAEvent;
-import org.apache.oozie.lock.LockToken;
-import org.apache.oozie.service.JobsConcurrencyService;
-import org.apache.oozie.service.MemoryLocksService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.service.SLAService;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.XLog;
 
@@ -49,7 +44,6 @@ public class SLACalcStatus extends SLAEvent {
     private long actualDuration = -1;
     private Date lastModifiedTime;
     private byte eventProcessed;
-    private LockToken lock;
 
     private XLog LOG;
 
@@ -293,53 +287,5 @@ public class SLACalcStatus extends SLAEvent {
     public String getEntityKey() {
         return SLA_ENTITYKEY_PREFIX + this.getId();
     }
-    /**
-     * Obtain an exclusive lock on the {link #getEntityKey}.
-     * <p>
-     * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
-     *
-     * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
-     */
-    public void acquireLock() throws InterruptedException {
-        // only get ZK lock when multiple servers running
-        if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
-            lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
-            if (lock == null) {
-            LOG.debug("Could not aquire lock for [{0}]", getEntityKey());
-            }
-            else {
-                LOG.debug("Acquired lock for [{0}]", getEntityKey());
-            }
-        }
-        else {
-            lock = new DummyToken();
-        }
-    }
-
-    private static class DummyToken implements LockToken {
-        @Override
-        public void release() {
-        }
-    }
-
-    public boolean isLocked() {
-        boolean locked = false;
-        if(lock != null) {
-            locked = true;
-        }
-        return locked;
-    }
-
-    public void releaseLock(){
-        if (lock != null) {
-            lock.release();
-            lock = null;
-            LOG.debug("Released lock for [{0}]", getEntityKey());
-        }
-    }
-
-    public long getLockTimeOut() {
-        return Services.get().getConf().getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000);
-    }
 
 }