You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/05/17 22:08:45 UTC

[1/3] oozie git commit: OOZIE-2509 SLA job status can stuck in running state

Repository: oozie
Updated Branches:
  refs/heads/master 5fbd3eb3f -> ba7a7b85e


http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
index 432efef..559e2b3 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
@@ -108,11 +108,11 @@ public class TestSLACalculatorMemory extends XDataTestCase {
     public void testLoadOnRestart() throws Exception {
         SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
         slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
         String jobId1 = slaRegBean1.getId();
-        SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+        SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2-W", AppType.WORKFLOW_JOB);
         String jobId2 = slaRegBean2.getId();
-        SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3", AppType.WORKFLOW_JOB);
+        SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3-W", AppType.WORKFLOW_JOB);
         String jobId3 = slaRegBean3.getId();
         List<String> idList = new ArrayList<String>();
         idList.add(slaRegBean1.getId());
@@ -134,6 +134,12 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         slaRegBean1.setAlertEvents("MISS");
         slaRegBean1.setJobData("jobData");
 
+        Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back
+        Date endTime = new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000); // 1 hour back
+
+        slaRegBean3.setExpectedStart(startTime);
+        slaRegBean3.setExpectedEnd(endTime);
+
         slaCalcMemory.addRegistration(jobId1, slaRegBean1);
         slaCalcMemory.addRegistration(jobId2, slaRegBean2);
         slaCalcMemory.addRegistration(jobId3, slaRegBean3);
@@ -142,9 +148,6 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         SLACalcStatus calc2 = slaCalcMemory.get(jobId2);
         SLACalcStatus calc3 = slaCalcMemory.get(jobId3);
 
-        calc1.setEventProcessed(5);
-        calc2.setEventProcessed(6);
-        calc3.setEventProcessed(7);
 
         calc1.setEventStatus(SLAEvent.EventStatus.END_MISS);
         calc1.setSLAStatus(SLAEvent.SLAStatus.MISS);
@@ -154,11 +157,30 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         calc1.setLastModifiedTime(lastModifiedTime);
 
         List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
-        SLASummaryBean bean = new SLASummaryBean(calc1);
-        bean.setActualStart(sdf.parse("2011-03-09"));
-        bean.setActualEnd(sdf.parse("2011-03-10"));
-        bean.setActualDuration(456);
-        updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, bean));
+        WorkflowJobBean wf1 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId1);
+        wf1.setId(jobId1);
+        wf1.setStatus(WorkflowJob.Status.SUCCEEDED);
+        wf1.setStartTime(sdf.parse("2011-03-09"));
+        wf1.setEndTime(sdf.parse("2011-03-10"));
+        wf1.setLastModifiedTime(new Date());
+
+        WorkflowJobBean wf2 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId2);
+        wf2.setId(jobId2);
+        wf2.setStatus(WorkflowJob.Status.RUNNING);
+        wf2.setStartTime(sdf.parse("2011-03-09"));
+        wf2.setEndTime(null);
+        wf2.setLastModifiedTime(new Date());
+
+        WorkflowJobBean wf3 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId3);
+        wf3.setId(jobId3);
+        wf3.setStatus(WorkflowJob.Status.RUNNING);
+        wf3.setStartTime(startTime);
+        wf3.setEndTime(null);
+        wf3.setLastModifiedTime(new Date());
+
+        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf1));
+        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf2));
+        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf3));
         updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
                 new SLASummaryBean(calc2)));
         updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
@@ -169,10 +191,11 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         slaCalcMemory = new SLACalculatorMemory();
         slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
 
-        assertEquals(2, slaCalcMemory.size());
 
-        SLACalcStatus calc = slaCalcMemory.get(jobId1);
-        assertEquals("job-1", calc.getId());
+        SLACalcStatus calc = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(
+                SLASummaryQuery.GET_SLA_SUMMARY, jobId1), SLARegistrationQueryExecutor.getInstance().get(
+                SLARegQuery.GET_SLA_REG_ON_RESTART, jobId1));
+        assertEquals("job-1-W", calc.getId());
         assertEquals(AppType.WORKFLOW_JOB, calc.getAppType());
         assertEquals("app-name", calc.getAppName());
         assertEquals(123, calc.getExpectedDuration());
@@ -188,22 +211,35 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         assertEquals("jobData", calc.getJobData());
         assertEquals(sdf.parse("2011-03-09"), calc.getActualStart());
         assertEquals(sdf.parse("2011-03-10"), calc.getActualEnd());
-        assertEquals(456, calc.getActualDuration());
         assertEquals(SLAEvent.EventStatus.END_MISS, calc1.getEventStatus());
         assertEquals(SLAEvent.SLAStatus.MISS, calc1.getSLAStatus());
         assertEquals(WorkflowJob.Status.FAILED.toString(), calc1.getJobStatus());
         assertEquals(lastModifiedTime, calc1.getLastModifiedTime());
 
-        assertEquals(5, calc.getEventProcessed());
-        assertEquals(6, slaCalcMemory.get(jobId2).getEventProcessed());
-        // jobId3 should be in history set as eventprocessed is 7 (111)
-        assertEquals(2, slaCalcMemory.size()); // 2 out of 3 jobs in map
+        calc2 = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(
+                SLASummaryQuery.GET_SLA_SUMMARY, jobId2), SLARegistrationQueryExecutor.getInstance().get(
+                SLARegQuery.GET_SLA_REG_ON_RESTART, jobId2));
+
+
+        assertEquals(8, calc.getEventProcessed());
+        assertEquals(7, calc2.getEventProcessed());
+        // jobId2 should be in history set as eventprocessed is 7 (111)
+        //job3 will be in slamap
+        assertEquals(1, slaCalcMemory.size()); // 1 out of 3 jobs in map
+        WorkflowJobBean wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId3);
+        wf.setId(jobId3);
+        wf.setStatus(WorkflowJob.Status.SUCCEEDED);
+        wf.setEndTime(endTime);
+        wf.setStartTime(startTime);
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wf);
+
         slaCalcMemory.addJobStatus(jobId3, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
-                sdf.parse("2011-03-09"), sdf.parse("2011-04-09"));
+                startTime, endTime);
+
         SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId3);
         assertEquals(8, slaSummary.getEventProcessed());
-        assertEquals(sdf.parse("2011-03-09"), slaSummary.getActualStart());
-        assertEquals(sdf.parse("2011-04-09"), slaSummary.getActualEnd());
+        assertEquals(startTime, slaSummary.getActualStart());
+        assertEquals(endTime, slaSummary.getActualEnd());
         assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
     }
 
@@ -213,7 +249,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
         SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
         slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
         String jobId1 = slaRegBean1.getId();
         slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
         slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
@@ -240,11 +276,8 @@ public class TestSLACalculatorMemory extends XDataTestCase {
 
         slaCalcMemory = new SLACalculatorMemory();
         slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-
-        // As job succeeded, it should not be in memory
-        assertEquals(0, slaCalcMemory.size());
         slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
-        assertEquals("job-1", slaSummary.getId());
+        assertEquals("job-1-W", slaSummary.getId());
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(AppType.WORKFLOW_JOB, slaSummary.getAppType());
         assertEquals("SUCCEEDED", slaSummary.getJobStatus());
@@ -270,7 +303,6 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         slaCalcMemory = new SLACalculatorMemory();
         slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
 
-        assertEquals(0, slaCalcMemory.size());
         slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
         assertEquals("FAILED", slaSummary.getJobStatus());
         assertEquals(8, slaSummary.getEventProcessed());
@@ -293,15 +325,14 @@ public class TestSLACalculatorMemory extends XDataTestCase {
 
         slaCalcMemory = new SLACalculatorMemory();
         slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-
-        assertEquals(1, slaCalcMemory.size());
-        SLACalcStatus calc = slaCalcMemory.get(jobId1);
-        assertEquals(1, calc.getEventProcessed());
-        assertEquals("RUNNING", calc.getJobStatus());
-        assertEquals(sdf.parse("2012-02-07"), calc.getActualStart());
-        assertNull(calc.getActualEnd());
-        assertEquals(-1, calc.getActualDuration());
-        assertEquals(SLAEvent.SLAStatus.IN_PROCESS, calc.getSLAStatus());
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, slaSummaryBean.getId());
+        //since job is already running and it's a old job
+        assertEquals(7, slaSummary.getEventProcessed());
+        assertEquals("RUNNING", slaSummary.getJobStatus());
+        assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
+        assertNull(slaSummary.getActualEnd());
+        assertEquals(-1, slaSummary.getActualDuration());
+        assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus());
     }
 
     @Test
@@ -309,7 +340,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
         SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
         slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-        SLARegistrationBean slaRegBean1 = _createSLARegistration("job@1", AppType.WORKFLOW_ACTION);
+        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-W@1", AppType.WORKFLOW_ACTION);
         String jobId1 = slaRegBean1.getId();
         slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
         slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
@@ -334,11 +365,10 @@ public class TestSLACalculatorMemory extends XDataTestCase {
 
         slaCalcMemory = new SLACalculatorMemory();
         slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-
-        // As job succeeded, it should not be in memory
+        slaCalcMemory.updateAllSlaStatus();
         assertEquals(0, slaCalcMemory.size());
         SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
-        assertEquals("job@1", slaSummary.getId());
+        assertEquals("job-W@1", slaSummary.getId());
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(AppType.WORKFLOW_ACTION, slaSummary.getAppType());
         assertEquals("OK", slaSummary.getJobStatus());
@@ -355,7 +385,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
         SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
         slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-        SLARegistrationBean slaRegBean1 = _createSLARegistration("job@1", AppType.COORDINATOR_ACTION);
+        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-C@1", AppType.COORDINATOR_ACTION);
         String jobId1 = slaRegBean1.getId();
         slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
         slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
@@ -374,23 +404,23 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         cab.setId(jobId1);
         cab.setStatus(CoordinatorAction.Status.FAILED);
         cab.setLastModifiedTime(sdf.parse("2013-02-07"));
-        cab.setExternalId("wf_job");
+        cab.setExternalId("wf_job-W");
         CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(cab);
         jpaService.execute(caInsertCmd);
         WorkflowJobBean wjb = new WorkflowJobBean();
-        wjb.setId("wf_job");
+        wjb.setId("wf_job-W");
         wjb.setStartTime(sdf.parse("2012-02-07"));
         wjb.setLastModifiedTime(new Date());
         WorkflowJobQueryExecutor.getInstance().insert(wjb);
 
         slaCalcMemory = new SLACalculatorMemory();
         slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-
+        slaCalcMemory.updateAllSlaStatus();
         // As job succeeded, it should not be in memory
         assertEquals(0, slaCalcMemory.size());
         SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
 
-        assertEquals("job@1", slaSummary.getId());
+        assertEquals("job-C@1", slaSummary.getId());
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(AppType.COORDINATOR_ACTION, slaSummary.getAppType());
         assertEquals("FAILED", slaSummary.getJobStatus());
@@ -402,6 +432,68 @@ public class TestSLACalculatorMemory extends XDataTestCase {
     }
 
     @Test
+    public void testEventMissOnRestart() throws Exception {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+        CoordinatorActionBean coordAction = new CoordinatorActionBean();
+        coordAction.setId("coordActionId-C@1");
+        coordAction.setStatus(CoordinatorAction.Status.RUNNING);
+        coordAction.setLastModifiedTime(sdf.parse("2013-02-07"));
+        CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(coordAction);
+        jpaService.execute(caInsertCmd);
+
+        CoordinatorActionBean coordAction2 = new CoordinatorActionBean();
+        coordAction2.setId("coordActionId-C@2");
+        coordAction2.setStatus(CoordinatorAction.Status.RUNNING);
+        coordAction2.setLastModifiedTime(sdf.parse("2013-02-07"));
+        caInsertCmd = new CoordActionInsertJPAExecutor(coordAction2);
+        jpaService.execute(caInsertCmd);
+
+        SLARegistrationBean slaRegBean1 = _createSLARegistration("coordActionId-C@1", AppType.COORDINATOR_ACTION);
+        String jobId1 = slaRegBean1.getId();
+        slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
+        slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
+        slaRegBean1.setExpectedDuration(100000); // long duration;
+        slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+        slaCalcMemory.updateAllSlaStatus();
+
+        SLARegistrationBean slaRegBean2 = _createSLARegistration("coordActionId-C@2", AppType.COORDINATOR_ACTION);
+        String jobId2 = slaRegBean2.getId();
+        slaRegBean2.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1 hour
+        slaRegBean2.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000)); // 2 hour
+        slaRegBean2.setExpectedDuration(100000); // long duration;
+        slaCalcMemory.addRegistration(jobId2, slaRegBean2);
+        slaCalcMemory.updateAllSlaStatus();
+        assertEquals(2, slaCalcMemory.size());
+
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
+        SLASummaryBean slaSummary2 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId2);
+
+        assertEquals("coordActionId-C@1", slaSummary.getId());
+        assertEquals(5, slaSummary.getEventProcessed());
+        assertEquals(-1, slaSummary.getActualDuration());
+
+        assertEquals("coordActionId-C@2", slaSummary2.getId());
+        assertEquals(0, slaSummary2.getEventProcessed());
+        assertEquals(-1, slaSummary2.getActualDuration());
+
+        coordAction.setStatusStr("FAILED");
+        coordAction2.setStatusStr("FAILED");
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction);
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction2);
+
+        slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
+        slaSummary2 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId2);
+
+        assertEquals("coordActionId-C@1", slaSummary.getId());
+        assertEquals(8, slaSummary.getEventProcessed());
+        assertEquals("coordActionId-C@2", slaSummary2.getId());
+        assertEquals(8, slaSummary2.getEventProcessed());
+
+    }
+    @Test
     public void testSLAEvents1() throws Exception {
         SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
         EventHandlerService ehs = Services.get().get(EventHandlerService.class);
@@ -428,13 +520,24 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
 
         assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
-        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
-                sdf.parse("2012-01-01"), null);
+
+        job1.setStatusStr(WorkflowJob.Status.SUSPENDED.toString());
+        job1.setLastModifiedTime(new Date());
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, job1);
+
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUSPENDED.toString(), EventStatus.SUSPEND,
                 sdf.parse("2012-01-01"), null);
         slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         assertEquals(WorkflowJob.Status.SUSPENDED.toString(), slaSummary.getJobStatus());
+
         assertEquals(5, slaSummary.getEventProcessed());
+        job1.setStatusStr(WorkflowJob.Status.SUCCEEDED.toString());
+        job1.setLastModifiedTime(new Date());
+        job1.setStartTime(sdf.parse("2012-01-01"));
+        job1.setEndTime(sdf.parse("2012-01-02"));
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
                 sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
 
@@ -442,7 +545,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         // All events processed and actual times stored (1000)
         assertEquals(8, slaSummary.getEventProcessed());
-        assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+        assertEquals(SLAStatus.MET, slaSummary.getSLAStatus());
         assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
         assertEquals(SLAEvent.EventStatus.DURATION_MISS, slaSummary.getEventStatus());
         assertEquals(sdf.parse("2012-01-01").getTime(), slaSummary.getActualStart().getTime());
@@ -479,6 +582,12 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummary);
 
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+        job1.setStatus(WorkflowJob.Status.SUCCEEDED);
+        job1.setStartTime(sdf.parse("2012-01-01"));
+        job1.setEndTime(sdf.parse("2012-01-02"));
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
                 sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
         slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
@@ -490,16 +599,21 @@ public class TestSLACalculatorMemory extends XDataTestCase {
 
         slaSummary.setEventProcessed(1);
         SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummary);
+        WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
 
-        slaRegBean = _createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+        slaRegBean = _createSLARegistration(job2.getId(), AppType.WORKFLOW_JOB);
         slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
         slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000));
 
         jobId = slaRegBean.getId();
         slaCalcMemory.addRegistration(jobId, slaRegBean);
         assertEquals(1, slaCalcMemory.size());
+        job2.setStatus(WorkflowJob.Status.KILLED);
+        job2.setEndTime(sdf.parse("2012-01-02"));
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job2);
 
-        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null,
+        slaCalcMemory.addJobStatus(job2.getId(), WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null,
                 sdf.parse("2012-01-02"));
         slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         // Actual start null, so all events processed
@@ -521,7 +635,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
         Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back
         slaRegBean.setExpectedStart(startTime);
-        slaRegBean.setExpectedDuration(3600 * 1000);
+        slaRegBean.setExpectedDuration(2* 3600 * 1000); //to avoid duration miss
         slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1 hour ahead
         String jobId = slaRegBean.getId();
         slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
@@ -530,11 +644,15 @@ public class TestSLACalculatorMemory extends XDataTestCase {
 
         assertEquals(1, slaSummary.getEventProcessed());
         assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
+        job1.setStatus(WorkflowJob.Status.RUNNING);
+        job1.setStartTime(startTime);
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
                 new Date(System.currentTimeMillis()), null);
-        slaCalcMemory.updateJobSla(jobId);
-        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
 
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         assertEquals(1, slaSummary.getEventProcessed());
         assertEquals(SLAStatus.IN_PROCESS, slaSummary.getSLAStatus());
         assertEquals(WorkflowJob.Status.RUNNING.toString(), slaSummary.getJobStatus());
@@ -565,13 +683,21 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         assertEquals(4, slaSummary.getEventProcessed());
         assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+
+        job1.setId(job1.getId());
+        job1.setStatus(WorkflowJob.Status.SUCCEEDED);
+        job1.setStartTime(new Date(System.currentTimeMillis()));
+        job1.setEndTime(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
                 new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
         slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
 
         // Only Duration sla should be processed as end is already processed
         // (110)
-        assertEquals(6, slaSummary.getEventProcessed());
+        assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
         // Recieve start event
         assertTrue(slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
@@ -598,15 +724,32 @@ public class TestSLACalculatorMemory extends XDataTestCase {
             String jobId = slaRegBean.getId();
             slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
             slaCalcMemory.updateJobSla(jobId);
+
+            job1.setId(job1.getId());
+            job1.setStatus(WorkflowJob.Status.RUNNING);
+            job1.setStartTime(new Date(System.currentTimeMillis() - 3600 * 1000));
+            WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                    WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
             slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
                     System.currentTimeMillis() - 3600 * 1000), null);
-            slaCalcMemory.updateJobSla(jobId);
             SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
             // The actual end times are not stored, but sla's processed so (111)
             assertEquals(7, slaSummary.getEventProcessed());
             // Moved from map to history set
             assertEquals(0, slaCalcMemory.size());
             // Add terminal state event so actual end time is stored
+
+            job1.setId(job1.getId());
+            job1.setStatus(WorkflowJob.Status.SUCCEEDED);
+            job1.setEndTime(new Date(System.currentTimeMillis() - 3600 * 1000));
+            job1.setStartTime(new Date(System.currentTimeMillis()));
+            WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                    WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
+            WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
             slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, new Date(
                     System.currentTimeMillis() - 3600 * 1000), new Date(System.currentTimeMillis()));
             slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
@@ -634,16 +777,21 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         String jobId = slaRegBean.getId();
         slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
         slaCalcMemory.updateJobSla(jobId);
+        job1.setStatusStr("RUNNING");
+        job1.setLastModifiedTime(new Date());
+        job1.setStartTime(startTime);
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
                 System.currentTimeMillis() - 3600 * 1000), null);
-        slaCalcMemory.updateJobSla(jobId);
         SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         // The actual end times are not stored, but sla's processed so (111)
         assertEquals(7, slaSummary.getEventProcessed());
         assertTrue(slaCalcMemory.isJobIdInHistorySet(job1.getId()));
         job1.setStatusStr("SUCCEEDED");
         job1.setLastModifiedTime(new Date());
-        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, job1);
+        job1.setStartTime(startTime);
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
         slaCalcMemory.new HistoryPurgeWorker().run();
         assertFalse(slaCalcMemory.isJobIdInHistorySet(job1.getId()));
     }
@@ -742,5 +890,85 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         slaCalculator.addRegistration(slaRegBean.getId(), slaRegBean);
         return action.getId();
     }
+    @Test
+    public void testEventOutOfOrder() throws Exception {
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        SLARegistrationBean slaRegBean = _createSLARegistration(wfJob.getId(), AppType.WORKFLOW_JOB);
+        Date startTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000); // 1 hour ahead
+        slaRegBean.setExpectedStart(startTime);
+        slaRegBean.setExpectedDuration(3600 * 1000);
+        slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 hour back
+        String jobId = slaRegBean.getId();
+        slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+        slaCalcMemory.updateJobSla(jobId);
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+        slaRegBean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
+        assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.RUNNING.toString());
+
+        wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+        wfJob.setEndTime(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, wfJob);
+        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+                new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+        assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString());
+
+        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.SUCCESS,
+                new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
+
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+        assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString());
+    }
+
+    public void testWFEndNotCoord() throws Exception {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+        SLARegistrationBean slaRegBean = _createSLARegistration("job-C@1", AppType.COORDINATOR_ACTION);
+        String coordActionId = slaRegBean.getId();
+        slaRegBean.setExpectedEnd(sdf.parse("2013-03-07"));
+        slaRegBean.setExpectedStart(sdf.parse("2012-03-07"));
+        slaCalcMemory.addRegistration(coordActionId, slaRegBean);
+        SLACalcStatus calc1 = slaCalcMemory.get(coordActionId);
+        calc1.setEventProcessed(1);
+        calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
+        calc1.setJobStatus(WorkflowAction.Status.RUNNING.name());
+        calc1.setLastModifiedTime(new Date());
+        SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
+
+        SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummaryBean);
+
+        // Simulate a lost failed event
+        CoordinatorActionBean coordAction = new CoordinatorActionBean();
+        coordAction.setId(coordActionId);
+        coordAction.setStatus(CoordinatorAction.Status.RUNNING);
+        coordAction.setLastModifiedTime(sdf.parse("2013-02-07"));
+        coordAction.setExternalId("wf_job-W");
+        CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(coordAction);
+        jpaService.execute(caInsertCmd);
+        WorkflowJobBean wjb = new WorkflowJobBean();
+        wjb.setId("wf_job-W");
+        wjb.setStartTime(sdf.parse("2012-02-07"));
+        wjb.setLastModifiedTime(new Date());
+        wjb.setStatus(WorkflowJob.Status.SUCCEEDED);
+        WorkflowJobQueryExecutor.getInstance().insert(wjb);
+
+        calc1 = slaCalcMemory.get(coordActionId);
+        slaCalcMemory.updateJobSla(coordActionId);
+        slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, coordActionId);
+        //cord action is running and wf job is completed
+        assertEquals(slaSummaryBean.getJobStatus(), WorkflowInstance.Status.RUNNING.name());
+
+        coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction);
+
+        slaCalcMemory.addJobStatus(coordActionId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+                sdf.parse("2012-02-07"), sdf.parse("2012-03-07"));
+
+        slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, coordActionId);
+        assertEquals(slaSummaryBean.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java b/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
index 7a710c2..06f54f2 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
@@ -513,6 +513,7 @@ public class TestSLAEventGeneration extends XDataTestCase {
         wf.setId(action.getExternalId());
         wf.setStatus(WorkflowJob.Status.KILLED);
         wf.setParentId(action.getId());
+        wf.setEndTime(new Date());
         jpa.execute(new WorkflowJobInsertJPAExecutor(wf));
         new CoordActionUpdateXCommand(wf).call();
         assertEquals(1, ehs.getEventQueue().size());

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java b/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
index ebb12f7..7d40e31 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
@@ -18,22 +18,32 @@
 
 package org.apache.oozie.sla;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.AppType;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.event.SLAEvent.EventStatus;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.event.CoordinatorActionEvent;
-import org.apache.oozie.event.CoordinatorJobEvent;
 import org.apache.oozie.event.WorkflowActionEvent;
 import org.apache.oozie.event.WorkflowJobEvent;
 import org.apache.oozie.event.listener.JobEventListener;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.sla.listener.SLAJobEventListener;
@@ -81,61 +91,79 @@ public class TestSLAJobEventListener extends XTestCase {
         SLAJobEventListener listener = new SLAJobEventListener();
         listener.init(services.getConf());
         // add dummy registration events to the SLAService map
-        SLARegistrationBean job = _createSLARegBean("wf1", AppType.WORKFLOW_JOB);
+        SLARegistrationBean job = _createSLARegBean("wf1-W", AppType.WORKFLOW_JOB);
         job.setExpectedStart(DateUtils.parseDateUTC("2012-07-22T00:00Z"));
+        job.setExpectedEnd(DateUtils.parseDateUTC("2012-07-23T00:00Z"));
         slas.addRegistrationEvent(job);
         assertEquals(1, slas.getSLACalculator().size());
         Date actualStart = DateUtils.parseDateUTC("2012-07-22T01:00Z");
-        WorkflowJobEvent wfe = new WorkflowJobEvent("wf1", "caId1", WorkflowJob.Status.RUNNING, "user1",
+
+        createWorkflow("wf1-W", actualStart);
+        WorkflowJobEvent wfe = new WorkflowJobEvent("wf1-W", "caId1", WorkflowJob.Status.RUNNING, "user1",
                 "wf-app-name1", actualStart, null);
         listener.onWorkflowJobEvent(wfe);
-        SLACalcStatus serviceObj = slas.getSLACalculator().get("wf1");
+        SLACalcStatus serviceObj = slas.getSLACalculator().get("wf1-W");
+
+        // job will be checked against DB.. since it's old job. all event will get evaluted and job will move to history set.
         // check that start sla has been calculated
-        assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus());
-        assertEquals(1, serviceObj.getEventProcessed()); //Job switching to running is only partially
-                                                       //sla processed. so state = 1
+        assertEquals(EventStatus.END_MISS, serviceObj.getEventStatus());
+        assertEquals(7, serviceObj.getEventProcessed()); //Job switching to running is only partially
+        assertEquals(0, slas.getSLACalculator().size());
+
+
+        createWorkflowAction("wfId1-W@wa1", "wf1-W");
+        job = _createSLARegBean("wfId1-W@wa1", AppType.WORKFLOW_ACTION);
+        job.setExpectedEnd(DateUtils.parseDateUTC("2012-07-22T01:00Z"));
 
-        job = _createSLARegBean("wfId1@wa1", AppType.WORKFLOW_ACTION);
         slas.addRegistrationEvent(job);
-        assertEquals(2, slas.getSLACalculator().size());
+        assertEquals(1, slas.getSLACalculator().size());
         job.setExpectedStart(DateUtils.parseDateUTC("2012-07-22T00:00Z"));
-        WorkflowActionEvent wae = new WorkflowActionEvent("wfId1@wa1", "wfId1", WorkflowAction.Status.RUNNING, "user1",
+        WorkflowActionEvent wae = new WorkflowActionEvent("wfId1-W@wa1", "wf1-W", WorkflowAction.Status.RUNNING, "user1",
                 "wf-app-name1", actualStart, null);
         listener.onWorkflowActionEvent(wae);
-        serviceObj = slas.getSLACalculator().get("wfId1@wa1");
+        serviceObj = slas.getSLACalculator().get("wfId1-W@wa1");
         // check that start sla has been calculated
-        assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus());
+        assertEquals(EventStatus.END_MISS, serviceObj.getEventStatus());
+        createCoord("cj1-C");
 
-        job = _createSLARegBean("cj1", AppType.COORDINATOR_JOB);
+        CoordinatorActionBean coordAction= createCoordAction("cj1-C@ca1", "cj1-C");
+        job = _createSLARegBean("cj1-C@ca1", AppType.COORDINATOR_ACTION);
         job.setExpectedEnd(DateUtils.parseDateUTC("2012-07-22T01:00Z"));
+        Date actualEnd = DateUtils.parseDateUTC("2012-07-22T02:00Z");
         slas.addRegistrationEvent(job);
-        assertEquals(3, slas.getSLACalculator().size());
-        Date actualEnd = DateUtils.parseDateUTC("2012-07-22T00:00Z");
-        CoordinatorJobEvent cje = new CoordinatorJobEvent("cj1", "bj1", CoordinatorJob.Status.SUCCEEDED, "user1",
-                "coord-app-name1", actualStart, actualEnd);
-        listener.onCoordinatorJobEvent(cje);
-
-        SLASummaryBean summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1");
-        // check that end and duration sla has been calculated
-        assertEquals(6, summary.getEventProcessed());
-
-        assertEquals(EventStatus.END_MET, summary.getEventStatus());
-
-        job = _createSLARegBean("cj1@ca1", AppType.COORDINATOR_ACTION);
-        actualEnd = DateUtils.parseDateUTC("2012-07-22T02:00Z");
-        slas.addRegistrationEvent(job);
-        assertEquals(4, slas.getSLACalculator().size());
-        CoordinatorActionEvent cae = new CoordinatorActionEvent("cj1@ca1", "cj1", CoordinatorAction.Status.RUNNING, "user1",
+        assertEquals(1, slas.getSLACalculator().size());
+        CoordinatorActionEvent cae = new CoordinatorActionEvent("cj1-C@ca1", "cj1-C", CoordinatorAction.Status.RUNNING, "user1",
                 "coord-app-name1", null, actualEnd, null);
         listener.onCoordinatorActionEvent(cae);
-        cae = new CoordinatorActionEvent("cj1@ca1", "cj1", CoordinatorAction.Status.KILLED, "user1",
+        coordAction.setStatus(CoordinatorAction.Status.KILLED);
+        coordAction.setLastModifiedTime(new Date());
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction);
+
+        cae = new CoordinatorActionEvent("cj1-C@ca1", "cj1-C", CoordinatorAction.Status.KILLED, "user1",
                 "coord-app-name1", null, actualEnd, null);
         listener.onCoordinatorActionEvent(cae);
-        summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1@ca1");
+        SLASummaryBean summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1-C@ca1");
         // check that all events are processed
         assertEquals(8, summary.getEventProcessed());
         assertEquals(EventStatus.END_MISS, summary.getEventStatus());
-        assertEquals(3, slas.getSLACalculator().size());
+        //all jobs are processed
+        assertEquals(0, slas.getSLACalculator().size());
+
+        job = _createSLARegBean("wf2-W", AppType.WORKFLOW_JOB);
+        job.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); //2 hour before
+        job.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hours after
+        slas.addRegistrationEvent(job);
+        assertEquals(1, slas.getSLACalculator().size());
+
+        createWorkflow("wf2-W", new Date());
+        wfe = new WorkflowJobEvent("wf2-W", "caId2", WorkflowJob.Status.RUNNING, "user1",
+                "wf-app-name1", null, null);
+        listener.onWorkflowJobEvent(wfe);
+        serviceObj = slas.getSLACalculator().get("wf2-W");
+
+        assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus());
+        assertEquals(3, serviceObj.getEventProcessed()); //Only duration and start are processed. Duration = -1
+        assertEquals(1, slas.getSLACalculator().size());
 
     }
 
@@ -145,4 +173,44 @@ public class TestSLAJobEventListener extends XTestCase {
         reg.setAppType(appType);
         return reg;
     }
+    private WorkflowJobBean createWorkflow(String id, Date actualStart) throws Exception {
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        WorkflowJobBean workflow = new WorkflowJobBean();
+        workflow.setId(id);
+        workflow.setStatusStr("PREP");
+        workflow.setStartTime(actualStart);
+        workflow.setSlaXml("<sla></sla>");
+        insertList.add(workflow);
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+        return workflow;
+    }
+
+    private WorkflowActionBean createWorkflowAction(String id, String parentId) throws Exception {
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        WorkflowActionBean action = new WorkflowActionBean();
+        action.setId(id);
+        action.setJobId(parentId);
+        insertList.add(action);
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+        return action;
+    }
+
+    private CoordinatorActionBean createCoordAction(String id, String parentId) throws Exception {
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        CoordinatorActionBean action = new CoordinatorActionBean();
+        action.setId(id);
+        action.setJobId(parentId);
+        insertList.add(action);
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+        return action;
+    }
+
+    private CoordinatorJobBean createCoord(String id) throws Exception {
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        CoordinatorJobBean job = new CoordinatorJobBean();
+        job.setId(id);
+        insertList.add(job);
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+        return job;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAService.java b/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
index c3bc110..1e19923 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
@@ -25,16 +25,17 @@ import org.apache.oozie.AppType;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
 import org.apache.oozie.client.event.SLAEvent;
 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
@@ -100,7 +101,9 @@ public class TestSLAService extends XDataTestCase {
 
         EventHandlerService ehs = Services.get().get(EventHandlerService.class);
         // test start-miss
-        SLARegistrationBean sla1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+
+        SLARegistrationBean sla1 = _createSLARegistration(wfJob.getId(), AppType.WORKFLOW_JOB);
         sla1.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); //1 hour back
         sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); //1 hour back
         sla1.setExpectedDuration(10 * 60 * 1000); //10 mins
@@ -113,22 +116,48 @@ public class TestSLAService extends XDataTestCase {
         output.setLength(0);
 
         // test different jobs and events start-met and end-miss
-        sla1 = _createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+        WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+
+        sla1 = _createSLARegistration(wfJob2.getId(), AppType.WORKFLOW_JOB);
         sla1.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead
         sla1.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 3600 * 1000)); //2 hours ahead
         slas.addRegistrationEvent(sla1);
+
+        wfJob2.setStatusStr("RUNNING");
+        wfJob2.setLastModifiedTime(new Date());
+        wfJob2.setStartTime(new Date());
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob2);
+
         slas.addStatusEvent(sla1.getId(), WorkflowJob.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
                 new Date());
-        SLARegistrationBean sla2 = _createSLARegistration("job-3", AppType.COORDINATOR_JOB);
+        CoordinatorActionBean action = addRecordToCoordActionTable("coord_id-C", 1,
+                CoordinatorAction.Status.TIMEDOUT, "coord-action-get.xml", 0);
+
+        SLARegistrationBean sla2 = _createSLARegistration(action.getId(), AppType.COORDINATOR_ACTION);
         sla2.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead only for testing
         sla2.setExpectedEnd(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); //2 hours back
         sla2.setExpectedDuration(10); //to process duration too
         slas.addRegistrationEvent(sla2);
         assertEquals(3, slas.getSLACalculator().size());
+
         Date startTime = new Date();
-        slas.addStatusEvent(sla2.getId(), CoordinatorJob.Status.RUNNING.name(), EventStatus.STARTED, startTime,
-                null);
-        slas.addStatusEvent(sla2.getId(), CoordinatorJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, startTime,
+        WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        wfJob3.setStatusStr("SUCCEEDED");
+        wfJob3.setLastModifiedTime(new Date());
+        wfJob3.setStartTime(startTime);
+        wfJob3.setEndTime(startTime);
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob3);
+
+        action.setCreatedTime(startTime);
+        action.setStatus(CoordinatorAction.Status.SUCCEEDED);
+        action.setLastModifiedTime(new Date());
+        action.setExternalId(wfJob3.getId());
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action);
+        slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
+                new Date());
+        slas.addStatusEvent(sla2.getId(), CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, startTime,
                 new Date());
         slas.runSLAWorker();
         ehs.new EventWorker().run();
@@ -139,6 +168,11 @@ public class TestSLAService extends XDataTestCase {
 
         // test same job multiple events (start-miss, end-miss) through regular check
         WorkflowJobBean job4 = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
+        job4.setLastModifiedTime(new Date());
+        job4.setEndTime(new Date());
+        job4.setStartTime(new Date());
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job4);
         sla2 = _createSLARegistration(job4.getId(), AppType.WORKFLOW_JOB);
         sla2.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); //2 hours back
         sla2.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 * 1000)); //1 hour back
@@ -151,14 +185,13 @@ public class TestSLAService extends XDataTestCase {
         output.setLength(0);
         // As expected duration is not set, duration shall be processed and job removed from map
         assertEquals(2, slas.getSLACalculator().size());
+
         // test same job multiple events (start-met, end-met) through job status event
-        sla1 = _createSLARegistration("action@1", AppType.COORDINATOR_ACTION);
+        sla1 = _createCoordActionSLARegistration(CoordinatorAction.Status.SUCCEEDED.name());
         sla1.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead
         sla1.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 3600 * 1000)); //2 hours ahead
         slas.addRegistrationEvent(sla1);
         assertEquals(3, slas.getSLACalculator().size());
-        slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
-                new Date());
         slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS,
                 new Date(), new Date());
         slas.runSLAWorker();
@@ -174,39 +207,40 @@ public class TestSLAService extends XDataTestCase {
         EventHandlerService ehs = Services.get().get(EventHandlerService.class);
         JPAService jpaService = Services.get().get(JPAService.class);
 
+        Date date = new Date();
         // CASE 1: positive test WF job
         WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
         SLARegistrationBean sla = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
-        sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1800 * 1000)); // half hour back
+        sla.setExpectedEnd(new Date(date.getTime() - 1 * 1800 * 1000)); // half hour back
         slas.addRegistrationEvent(sla);
 
         // CASE 2: negative test WF job
         WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-        job2.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 * 1000));
-        job2.setStartTime(new Date(System.currentTimeMillis() - 1 * 2000 * 1000));
+        job2.setEndTime(new Date(date.getTime()  - 1 * 1800 * 1000));
+        job2.setStartTime(new Date(date.getTime()  - 1 * 2000 * 1000));
         job2.setLastModifiedTime(new Date());
         WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job2);
         sla = _createSLARegistration(job2.getId(), AppType.WORKFLOW_JOB);
-        sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1500 * 1000)); // in past but > actual end
+        sla.setExpectedEnd(new Date(date.getTime()  - 1 * 1500 * 1000)); // in past but > actual end
         sla.setExpectedDuration(100); //unreasonable to cause MISS
         slas.addRegistrationEvent(sla);
 
+        slas.runSLAWorker();
+
         // CASE 3: positive test Coord action
-        CoordinatorActionBean action1 = addRecordToCoordActionTable("coord-action-1", 1,
+        CoordinatorActionBean action1 = addRecordToCoordActionTable("coord-action-C@1", 1,
                 CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
-        WorkflowJobBean extWf = new WorkflowJobBean();
-        extWf.setId(action1.getExternalId());
-        extWf.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 * 1000));
-        extWf.setStartTime(new Date(System.currentTimeMillis() - 1 * 2100 * 1000));
-        jpaService.execute(new WorkflowJobInsertJPAExecutor(extWf));
+        action1.setExternalId(null);
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action1);
+
         sla = _createSLARegistration(action1.getId(), AppType.COORDINATOR_ACTION);
         sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 2000 * 1000)); // past
         slas.addRegistrationEvent(sla);
 
         // CASE 4: positive test coord action
-        CoordinatorActionBean action2 = addRecordToCoordActionTable("coord-action-2", 1,
+        CoordinatorActionBean action2 = addRecordToCoordActionTable("coord-action-C@2", 1,
                 CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0);
-        extWf = new WorkflowJobBean();
+        WorkflowJobBean extWf = new WorkflowJobBean();
         extWf.setId(action2.getExternalId());
         // actual end before expected. but action is failed
         extWf.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 * 1000));
@@ -217,7 +251,7 @@ public class TestSLAService extends XDataTestCase {
         slas.addRegistrationEvent(sla);
 
         // CASE 5: negative test coord action
-        CoordinatorActionBean action3 = addRecordToCoordActionTable("coord-action-3", 1,
+        CoordinatorActionBean action3 = addRecordToCoordActionTable("coord-action-C@3", 1,
                 CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
         extWf = new WorkflowJobBean();
         extWf.setId(action3.getExternalId());
@@ -271,16 +305,25 @@ public class TestSLAService extends XDataTestCase {
         assertNull(slas.getSLACalculator().get(action2.getId())); //removed from memory
 
         slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, action1.getId());
-        extWf = jpaService.execute(new WorkflowJobGetJPAExecutor(action1.getExternalId()));
-        assertEquals(extWf.getStartTime(), slaSummary.getActualStart());
-        assertEquals(extWf.getEndTime(), slaSummary.getActualEnd());
-        assertEquals(extWf.getEndTime().getTime() - extWf.getStartTime().getTime(), slaSummary.getActualDuration());
+        assertNull(slaSummary.getActualStart());
+        assertNull(slaSummary.getActualEnd());
         assertEquals(action1.getStatusStr(), slaSummary.getJobStatus());
         assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
         assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
-        assertEquals(8, slaSummary.getEventProcessed());
-        assertNull(slas.getSLACalculator().get(action1.getId())); //removed from memory
+        assertEquals(7, slaSummary.getEventProcessed());
+        assertNotNull(slas.getSLACalculator().get(action1.getId()));
 
+        //From waiting to TIMEOUT with wf jobid
+        action1.setStatus(CoordinatorAction.Status.TIMEDOUT);
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action1);
+        slas.getSLACalculator().addJobStatus(action1.getId(), null, null, null, null);
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, action1.getId());
+        assertNull(slaSummary.getActualStart());
+        assertNotNull(slaSummary.getActualEnd());
+        assertEquals("TIMEDOUT", slaSummary.getJobStatus());
+        assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
+        assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+        assertEquals(8, slaSummary.getEventProcessed());
     }
 
     /**
@@ -307,6 +350,28 @@ public class TestSLAService extends XDataTestCase {
         return bean;
     }
 
+    public SLARegistrationBean _createCoordActionSLARegistration(String status) throws Exception {
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        wfJob.setLastModifiedTime(new Date());
+        wfJob.setStartTime(new Date());
+        wfJob.setEndTime(new Date());
+        wfJob.setStatusStr(status);
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob);
+
+        CoordinatorActionBean action = addRecordToCoordActionTable(new Date().getTime() + "-C", 1,
+                CoordinatorAction.Status.TIMEDOUT, "coord-action-get.xml", 0);
+        action.setExternalId(wfJob.getId());
+        action.setStatusStr(status);
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action);
+
+        SLARegistrationBean bean = new SLARegistrationBean();
+        bean.setId(action.getId());
+        bean.setAppType(AppType.COORDINATOR_ACTION);
+        return bean;
+    }
+
+
     public static void assertEventNoDuplicates(String outputStr, String eventMsg) {
         int index = outputStr.indexOf(eventMsg);
         assertTrue(index != -1);

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 412128c..8ea3c4a 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2509 SLA job status can stuck in running state (puru)
 OOZIE-2529 Support adding secret keys to Credentials of Launcher (satishsaley via rohini)
 OOZIE-1402 Increase retry interval for non-progressing coordinator action with fix value (satishsaley via puru)
 OOZIE-2512 ShareLibservice returns incorrect path for jar (satishsaley via puru)


[3/3] oozie git commit: OOZIE-2509 SLA job status can stuck in running state

Posted by pu...@apache.org.
OOZIE-2509 SLA job status can stuck in running state


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ba7a7b85
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ba7a7b85
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ba7a7b85

Branch: refs/heads/master
Commit: ba7a7b85e040a313fa107474768dd67a325f91d5
Parents: 5fbd3eb
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue May 17 15:08:35 2016 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue May 17 15:08:35 2016 -0700

----------------------------------------------------------------------
 .../command/coord/CoordActionCheckXCommand.java |   6 +-
 .../sla/SLACoordActionJobEventXCommand.java     |  80 ++
 .../sla/SLACoordActionJobHistoryXCommand.java   |  78 ++
 .../oozie/command/sla/SLAJobEventXCommand.java  | 301 ++++++
 .../command/sla/SLAJobHistoryXCommand.java      | 127 +++
 .../sla/SLAWorkflowActionJobEventXCommand.java  |  62 ++
 .../SLAWorkflowActionJobHistoryXCommand.java    |  57 ++
 .../sla/SLAWorkflowJobEventXCommand.java        |  64 ++
 .../sla/SLAWorkflowJobHistoryXCommand.java      |  56 ++
 .../jpa/CoordActionGetForSLAJPAExecutor.java    |  82 --
 .../executor/jpa/CoordActionQueryExecutor.java  |  13 +-
 .../executor/jpa/SLASummaryQueryExecutor.java   |   9 -
 .../jpa/WorkflowActionGetForSLAJPAExecutor.java |  78 --
 .../jpa/WorkflowActionQueryExecutor.java        |  13 +-
 .../jpa/WorkflowJobGetForSLAJPAExecutor.java    |  78 --
 .../executor/jpa/WorkflowJobQueryExecutor.java  |  13 +-
 .../oozie/service/ConfigurationService.java     |  12 +-
 .../org/apache/oozie/sla/SLACalcStatus.java     |  54 -
 .../apache/oozie/sla/SLACalculatorMemory.java   | 989 +++----------------
 .../org/apache/oozie/sla/SLASummaryBean.java    |   2 -
 .../apache/oozie/sla/SLAXCommandFactory.java    |  92 ++
 .../coord/TestCoordActionsKillXCommand.java     |  11 +-
 .../jpa/TestSLASummaryQueryExecutor.java        |   2 +-
 .../apache/oozie/service/TestHASLAService.java  |  64 +-
 .../oozie/sla/TestSLACalculatorMemory.java      | 344 +++++--
 .../oozie/sla/TestSLAEventGeneration.java       |   1 +
 .../oozie/sla/TestSLAJobEventListener.java      | 136 ++-
 .../org/apache/oozie/sla/TestSLAService.java    | 123 ++-
 release-log.txt                                 |   1 +
 29 files changed, 1668 insertions(+), 1280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
index 128feb2..bdbbd24 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
@@ -47,9 +47,10 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 
 /**
  * The command checks workflow status for coordinator action.
@@ -177,7 +178,8 @@ public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> {
                 coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId));
                 coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
                         coordAction.getJobId()));
-                workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId()));
+                workflowJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA,
+                        coordAction.getExternalId());
                 LogUtils.setLogInfo(coordAction);
             }
             else {

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
new file mode 100644
index 0000000..dfe2637
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLACoordActionJobEventXCommand extends SLAJobEventXCommand {
+    CoordinatorActionBean ca;
+    WorkflowJobBean wf;
+
+    public SLACoordActionJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+        super(slaCalc, lockTimeOut);
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        try {
+            ca = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_FOR_SLA, slaCalc.getId());
+            if (ca.getExternalId() != null) {
+                wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, ca.getExternalId());
+            }
+            LogUtils.setLogInfo(ca);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+    }
+
+
+    protected void updateJobInfo() {
+        if (ca.isTerminalStatus()) {
+            setEnded(true);
+            setEndMiss(ca.isTerminalWithFailure());
+            slaCalc.setActualEnd(ca.getLastModifiedTime());
+            if (wf != null) {
+                if (wf.getEndTime() != null) {
+                    if (slaCalc.getExpectedEnd() != null
+                            && wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+                        setEndMiss(true);
+                    }
+                    slaCalc.setActualEnd(wf.getEndTime());
+                }
+                slaCalc.setActualStart(wf.getStartTime());
+            }
+        }
+        else {
+            if (wf != null) {
+                slaCalc.setActualStart(wf.getStartTime());
+            }
+        }
+        slaCalc.setJobStatus(ca.getStatusStr());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
new file mode 100644
index 0000000..b7f09d3
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLACoordActionJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+    CoordinatorActionBean cAction = null;
+
+    public SLACoordActionJobHistoryXCommand(String jobId) {
+        super(jobId);
+    }
+
+
+    protected void loadState() throws CommandException {
+        try {
+            cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_FOR_SLA, jobId);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(cAction);
+    }
+
+    protected void updateSLASummary() throws CommandException {
+        try {
+            updateSLASummaryForCoordAction(cAction);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+
+    }
+
+    protected void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException {
+        String wrkflowId = bean.getExternalId();
+        if (wrkflowId != null) {
+            WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get(
+                    WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId);
+            if (wrkflow != null) {
+                updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime(), bean.getStatusStr());
+            }
+        }
+        else{
+            updateSLASummary(bean.getId(), null, bean.getLastModifiedTime(), bean.getStatusStr());
+        }
+    }
+
+    @Override
+    protected boolean isJobEnded() {
+        return cAction.isTerminalStatus();
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
new file mode 100644
index 0000000..9b18606
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import java.util.Date;
+
+import org.apache.oozie.XException;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.event.SLAEvent.EventStatus;
+import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.sla.SLASummaryBean;
+
+public abstract class SLAJobEventXCommand extends XCommand<Void> {
+    private long lockTimeOut = 0 ;
+    JPAService jpaService = Services.get().get(JPAService.class);
+    SLACalcStatus slaCalc;
+    final static String SLA_LOCK_PREFIX = "sla_";
+    private boolean isEnded = false;
+    private boolean isEndMiss = false;
+
+    public SLAJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+        super("SLA.job.event", "SLA.job.event", 1);
+        this.slaCalc = slaCalc;
+        this.lockTimeOut = lockTimeOut;
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return true;
+    }
+
+    @Override
+    protected boolean isReQueueRequired() {
+        return false;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return SLA_LOCK_PREFIX + slaCalc.getId();
+    }
+
+    protected long getLockTimeOut() {
+        return lockTimeOut;
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+    }
+
+
+    @Override
+    protected Void execute() throws CommandException {
+        updateJobInfo();
+        if (isEnded) {
+            processForEnd();
+        }
+        else {
+            processForRunning();
+        }
+        try {
+            writeToDB();
+        }
+        catch (XException e) {
+            throw new CommandException(e);
+        }
+        return null;
+    }
+
+    /**
+     * Verify job.
+     */
+    protected abstract void updateJobInfo();
+
+    /**
+     * Should alert.
+     *
+     * @param slaObj the sla obj
+     * @return true, if successful
+     */
+    private boolean shouldAlert(SLACalcStatus slaObj) {
+        return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT);
+    }
+
+    /**
+     * Queue event.
+     *
+     * @param event the event
+     */
+    private void queueEvent(SLACalcStatus event) {
+        Services.get().get(EventHandlerService.class).queueEvent(event);
+    }
+
+    /**
+     * Process duration sla.
+     *
+     * @param expected the expected
+     * @param actual the actual
+     * @param slaCalc the sla calc
+     */
+    private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
+        if (expected != -1) {
+            if (actual > expected) {
+                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+            }
+            else if (actual <= expected) {
+                slaCalc.setEventStatus(EventStatus.DURATION_MET);
+            }
+            if (shouldAlert(slaCalc)) {
+                queueEvent(new SLACalcStatus(slaCalc));
+            }
+        }
+    }
+
+
+    /**
+     * WriteSLA object to DB.
+     *
+     * @throws JPAExecutorException the JPA executor exception
+     */
+    private void writeToDB() throws JPAExecutorException {
+        byte eventProc = slaCalc.getEventProcessed();
+        // no more processing, no transfer to history set
+        if (slaCalc.getEventProcessed() >= 8) {
+            slaCalc.setEventProcessed(8);
+        }
+
+        SLASummaryBean slaSummaryBean = new SLASummaryBean();
+        slaSummaryBean.setId(slaCalc.getId());
+        slaSummaryBean.setEventProcessed(eventProc);
+        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+        slaSummaryBean.setActualStart(slaCalc.getActualStart());
+        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+        slaSummaryBean.setLastModifiedTime(new Date());
+
+        SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+                slaSummaryBean);
+
+        LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", slaCalc.getId(),
+                slaCalc.getEventProcessed(), slaCalc.getJobStatus());
+
+    }
+
+    /**
+     * Process for end.
+     */
+    private void processForEnd() {
+        byte eventProc = slaCalc.getEventProcessed();
+
+        LOG.debug("Job {0} has ended. endtime = [{1}]", slaCalc.getId(), slaCalc.getActualEnd());
+        if (isEndMiss()) {
+            slaCalc.setSLAStatus(SLAStatus.MISS);
+        }
+        else {
+            slaCalc.setSLAStatus(SLAStatus.MET);
+        }
+        if (eventProc != 8 && slaCalc.getActualStart() != null) {
+            if ((eventProc & 1) == 0) {
+                if (slaCalc.getExpectedStart() != null) {
+                    if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
+                        slaCalc.setEventStatus(EventStatus.START_MISS);
+                    }
+                    else {
+                        slaCalc.setEventStatus(EventStatus.START_MET);
+                    }
+                    if (shouldAlert(slaCalc)) {
+                        queueEvent(new SLACalcStatus(slaCalc));
+                    }
+                }
+            }
+            slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
+            if (((eventProc >> 1) & 1) == 0) {
+                processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
+            }
+        }
+        if (eventProc != 8 && eventProc < 4) {
+            if (isEndMiss()) {
+                slaCalc.setEventStatus(EventStatus.END_MISS);
+            }
+            else {
+                slaCalc.setEventStatus(EventStatus.END_MET);
+            }
+            if (shouldAlert(slaCalc)) {
+                queueEvent(new SLACalcStatus(slaCalc));
+            }
+        }
+        slaCalc.setEventProcessed(8);
+    }
+
+    /**
+     * Process for running.
+     */
+    private void processForRunning() {
+        byte eventProc = slaCalc.getEventProcessed();
+
+        if (eventProc != 8 && slaCalc.getActualStart() != null) {
+            slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
+        }
+        if (eventProc != 8 && (eventProc & 1) == 0) {
+            if (slaCalc.getExpectedStart() == null) {
+                eventProc++;
+            }
+            else if (slaCalc.getActualStart() != null) {
+                if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
+                    slaCalc.setEventStatus(EventStatus.START_MISS);
+                }
+                else {
+                    slaCalc.setEventStatus(EventStatus.START_MET);
+                }
+                if (shouldAlert(slaCalc)) {
+                    queueEvent(new SLACalcStatus(slaCalc));
+                }
+                eventProc++;
+            }
+            else if (slaCalc.getExpectedStart() != null
+                    && slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
+                slaCalc.setEventStatus(EventStatus.START_MISS);
+                if (shouldAlert(slaCalc)) {
+                    queueEvent(new SLACalcStatus(slaCalc));
+                }
+                eventProc++;
+            }
+
+        }
+        if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+            if (slaCalc.getExpectedDuration() == -1) {
+                eventProc += 2;
+            }
+            else if (slaCalc.getActualStart() != null && slaCalc.getExpectedDuration() != -1) {
+                if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
+                    slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+                    if (shouldAlert(slaCalc)) {
+                        queueEvent(new SLACalcStatus(slaCalc));
+                    }
+                    eventProc += 2;
+                }
+            }
+        }
+        if (eventProc < 4) {
+            if (slaCalc.getExpectedEnd() != null) {
+                if (slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
+                    slaCalc.setEventStatus(EventStatus.END_MISS);
+                    slaCalc.setSLAStatus(SLAStatus.MISS);
+                    if (shouldAlert(slaCalc)) {
+                        queueEvent(new SLACalcStatus(slaCalc));
+                    }
+                    eventProc += 4;
+                }
+            }
+            else {
+                eventProc += 4;
+            }
+        }
+        slaCalc.setEventProcessed(eventProc);
+    }
+
+    public boolean isEnded() {
+        return isEnded;
+    }
+
+    public void setEnded(boolean isEnded) {
+        this.isEnded = isEnded;
+    }
+
+    public boolean isEndMiss() {
+        return isEndMiss;
+    }
+
+    public void setEndMiss(boolean isEndMiss) {
+        this.isEndMiss = isEndMiss;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
new file mode 100644
index 0000000..0b4045a
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import java.util.Date;
+
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.sla.SLASummaryBean;
+
+public abstract class SLAJobHistoryXCommand extends XCommand<Boolean> {
+
+    protected String jobId;
+
+    public SLAJobHistoryXCommand(String jobId) {
+        super("SLAJobHistoryXCommand", "SLAJobHistoryXCommand", 1);
+        this.jobId = jobId;
+
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return true;
+    }
+
+    @Override
+    protected boolean isReQueueRequired() {
+        return false;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return SLAJobEventXCommand.SLA_LOCK_PREFIX + jobId;
+    }
+
+    protected long getLockTimeOut() {
+        return 0L;
+    }
+
+    protected Boolean execute() throws CommandException {
+        if (isJobEnded()) {
+            try {
+                updateSLASummary();
+            }
+            catch (XException e) {
+                throw new CommandException(e);
+            }
+            return true;
+        }
+        else {
+            LOG.debug("Job [{0}] is not finished", jobId);
+        }
+        return false;
+
+    }
+
+    /**
+     * Checks if is job ended.
+     *
+     * @return true, if is job ended
+     */
+    protected abstract boolean isJobEnded();
+
+    /**
+     * Update SLASummary
+     *
+     */
+    protected abstract void updateSLASummary() throws CommandException, XException;
+
+    /**
+     * Update sla summary.
+     *
+     * @param id the id
+     * @param startTime the start time
+     * @param endTime the end time
+     * @param status the status
+     * @throws JPAExecutorException the JPA executor exception
+     */
+    protected void updateSLASummary(String id, Date startTime, Date endTime, String status) throws JPAExecutorException {
+        SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
+        if (sla.getJobStatus().equals(status) && sla.getEventProcessed() == 8) {
+            LOG.debug("SLA job is already updated", sla.getId(), sla.getEventProcessed(), sla.getJobStatus());
+            return;
+        }
+        if (sla != null) {
+            sla.setActualStart(startTime);
+            sla.setActualEnd(endTime);
+            if (startTime != null && endTime != null) {
+                sla.setActualDuration(endTime.getTime() - startTime.getTime());
+            }
+            sla.setLastModifiedTime(new Date());
+            sla.setEventProcessed(8);
+            sla.setJobStatus(status);
+            SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+                    sla);
+            LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", sla.getId(),
+                    sla.getEventProcessed(), sla.getJobStatus());
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
new file mode 100644
index 0000000..fef77ae
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowActionJobEventXCommand extends SLAJobEventXCommand {
+    WorkflowActionBean wa;
+
+    public SLAWorkflowActionJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+        super(slaCalc, lockTimeOut);
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        try {
+            wa = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_FOR_SLA, slaCalc.getId());
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(wa);
+
+    }
+
+
+    @Override
+    protected void updateJobInfo() {
+        if (wa.getEndTime() != null) {
+            setEnded(true);
+            if (wa.isTerminalWithFailure() || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+                setEndMiss(true);
+            }
+        }
+        slaCalc.setActualStart(wa.getStartTime());
+        slaCalc.setActualEnd(wa.getEndTime());
+        slaCalc.setJobStatus(wa.getStatusStr());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
new file mode 100644
index 0000000..7dc4a3c
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowActionJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+    WorkflowActionBean wfAction = null;
+
+    public SLAWorkflowActionJobHistoryXCommand(String jobId) {
+        super(jobId);
+    }
+
+    protected void loadState() throws CommandException {
+
+        try {
+            wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_COMPLETED, jobId);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(wfAction);
+    }
+
+    protected void updateSLASummary() throws XException {
+        updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
+
+    }
+
+    @Override
+    protected boolean isJobEnded() {
+        return wfAction.isComplete() || wfAction.isTerminalWithFailure();
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
new file mode 100644
index 0000000..9a72617
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowJobEventXCommand extends SLAJobEventXCommand {
+    WorkflowJobBean wf;
+
+    public SLAWorkflowJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+        super(slaCalc, lockTimeOut);
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        try {
+            wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, slaCalc.getId());
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(wf);
+
+    }
+
+
+    @Override
+    protected void updateJobInfo() {
+        if (wf.inTerminalState()) {
+            setEnded(true);
+            if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
+                    || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+                setEndMiss(true);
+            }
+            slaCalc.setActualEnd(wf.getEndTime());
+        }
+        slaCalc.setActualStart(wf.getStartTime());
+        slaCalc.setJobStatus(wf.getStatusStr());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
new file mode 100644
index 0000000..79e45ee
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+    WorkflowJobBean wfJob = null;
+
+    public SLAWorkflowJobHistoryXCommand(String jobId) {
+        super(jobId);
+    }
+
+
+    protected void loadState() throws CommandException {
+        try {
+            wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(wfJob);
+    }
+
+    protected void updateSLASummary() throws XException {
+            updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
+    }
+
+    @Override
+    protected boolean isJobEnded() {
+        return wfJob.inTerminalState();
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
deleted file mode 100644
index 8a5b997..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * JPAExecutor to get attributes of CoordinatorActionBean required by SLAService on restart
- */
-public class CoordActionGetForSLAJPAExecutor implements JPAExecutor<CoordinatorActionBean> {
-
-    private String coordActionId;
-
-    public CoordActionGetForSLAJPAExecutor(String coordActionId) {
-        ParamChecker.notNull(coordActionId, "coordActionId");
-        this.coordActionId = coordActionId;
-    }
-
-    @Override
-    public String getName() {
-        return "CoordActionGetForSLAJPAExecutor";
-    }
-
-    @Override
-    public CoordinatorActionBean execute(EntityManager em) throws JPAExecutorException {
-        try {
-            Query q = em.createNamedQuery("GET_COORD_ACTION_FOR_SLA");
-            q.setParameter("id", coordActionId);
-            Object[] obj = (Object[]) q.getSingleResult();
-            CoordinatorActionBean caBean = getBeanForRunningCoordAction(obj);
-            return caBean;
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-
-    }
-
-    private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) {
-        CoordinatorActionBean bean = new CoordinatorActionBean();
-        if (arr[0] != null) {
-            bean.setId((String) arr[0]);
-        }
-        if (arr[1] != null) {
-            bean.setJobId((String) arr[1]);
-        }
-        if (arr[2] != null) {
-            bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
-        }
-        if (arr[3] != null) {
-            bean.setExternalId((String) arr[3]);
-        }
-        if (arr[4] != null) {
-            bean.setLastModifiedTime(DateUtils.toDate((Timestamp)arr[4]));
-        }
-        return bean;
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index 79ec28c..c0e6c19 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -59,7 +59,8 @@ public class CoordActionQueryExecutor extends
         GET_TERMINATED_ACTION_IDS_FOR_DATES,
         GET_ACTIVE_ACTIONS_FOR_DATES,
         GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN,
-        GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN
+        GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN,
+        GET_COORD_ACTION_FOR_SLA
     };
 
     private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
@@ -177,6 +178,7 @@ public class CoordActionQueryExecutor extends
         switch (caQuery) {
             case GET_COORD_ACTION:
             case GET_COORD_ACTION_STATUS:
+            case GET_COORD_ACTION_FOR_SLA:
                 query.setParameter("id", parameters[0]);
                 break;
             case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
@@ -330,6 +332,15 @@ public class CoordActionQueryExecutor extends
                 bean.setExternalId((String) arr[3]);
                 bean.setPushMissingDependenciesBlob((StringBlob) arr[4]);
                 break;
+            case GET_COORD_ACTION_FOR_SLA:
+                arr = (Object[]) ret;
+                bean = new CoordinatorActionBean();
+                bean.setId((String) arr[0]);
+                bean.setJobId((String) arr[1]);
+                bean.setStatusStr((String) arr[2]);
+                bean.setExternalId((String) arr[3]);
+                bean.setLastModifiedTime((Timestamp) arr[4]);
+                break;
 
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
index 6663162..6ff9df8 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
@@ -37,7 +37,6 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
 
     public enum SLASummaryQuery {
         UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
-        UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES,
         UPDATE_SLA_SUMMARY_ALL,
         UPDATE_SLA_SUMMARY_EVENTPROCESSED,
         UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
@@ -72,14 +71,6 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
                 query.setParameter("actualEndTS", bean.getActualEndTimestamp());
                 query.setParameter("actualDuration", bean.getActualDuration());
                 break;
-            case UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES:
-                query.setParameter("jobId", bean.getId());
-                query.setParameter("eventProcessed", bean.getEventProcessed());
-                query.setParameter("actualStartTS", bean.getActualStartTimestamp());
-                query.setParameter("actualEndTS", bean.getActualEndTimestamp());
-                query.setParameter("actualDuration", bean.getActualDuration());
-                query.setParameter("lastModifiedTS", bean.getLastModifiedTimestamp());
-                break;
             case UPDATE_SLA_SUMMARY_ALL:
                 query.setParameter("appName", bean.getAppName());
                 query.setParameter("appType", bean.getAppType().toString());

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
deleted file mode 100644
index 280294b..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Retrieve the workflow action bean for sla service
- */
-public class WorkflowActionGetForSLAJPAExecutor implements JPAExecutor<WorkflowActionBean> {
-
-    private String wfActionId;
-
-    public WorkflowActionGetForSLAJPAExecutor(String wfActionId) {
-        ParamChecker.notNull(wfActionId, "wfActionId");
-        this.wfActionId = wfActionId;
-    }
-
-    @Override
-    public String getName() {
-        return "WorkflowActionGetForSLAJPAExecutor";
-    }
-
-    @Override
-    public WorkflowActionBean execute(EntityManager em) throws JPAExecutorException {
-        try {
-            Query q = em.createNamedQuery("GET_ACTION_FOR_SLA");
-            q.setParameter("id", wfActionId);
-            Object[] obj = (Object[]) q.getSingleResult();
-            return getBeanFromArray(obj);
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-    }
-
-    private WorkflowActionBean getBeanFromArray(Object[] arr) {
-        WorkflowActionBean wab = new WorkflowActionBean();
-        if (arr[0] != null) {
-            wab.setId((String) arr[0]);
-        }
-        if (arr[1] != null) {
-            wab.setStatus(WorkflowAction.Status.valueOf((String) arr[1]));
-        }
-        if (arr[2] != null) {
-            wab.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
-        }
-        if (arr[3] != null) {
-            wab.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
-        }
-        return wab;
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 078fd40..f01f090 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -57,7 +57,8 @@ public class WorkflowActionQueryExecutor extends
         GET_ACTION_COMPLETED,
         GET_RUNNING_ACTIONS,
         GET_PENDING_ACTIONS,
-        GET_ACTIONS_FOR_WORKFLOW_RERUN
+        GET_ACTIONS_FOR_WORKFLOW_RERUN,
+        GET_ACTION_FOR_SLA
     };
 
     private static WorkflowActionQueryExecutor instance = new WorkflowActionQueryExecutor();
@@ -202,6 +203,7 @@ public class WorkflowActionQueryExecutor extends
             case GET_ACTION_CHECK:
             case GET_ACTION_END:
             case GET_ACTION_COMPLETED:
+            case GET_ACTION_FOR_SLA:
                 query.setParameter("id", parameters[0]);
                 break;
             case GET_RUNNING_ACTIONS:
@@ -363,6 +365,15 @@ public class WorkflowActionQueryExecutor extends
                 bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
                 bean.setType((String) arr[4]);
                 break;
+            case GET_ACTION_FOR_SLA:
+                bean = new WorkflowActionBean();
+                arr = (Object[]) ret;
+                bean.setId((String) arr[0]);
+                bean.setStatusStr((String) arr[1]);
+                bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+                bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
deleted file mode 100644
index 774766f..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Retrieve the workflow job bean for sla service
- */
-public class WorkflowJobGetForSLAJPAExecutor implements JPAExecutor<WorkflowJobBean> {
-
-    private String wfJobId;
-
-    public WorkflowJobGetForSLAJPAExecutor(String wfJobId) {
-        ParamChecker.notNull(wfJobId, "wfJobId");
-        this.wfJobId = wfJobId;
-    }
-
-    @Override
-    public String getName() {
-        return "WorkflowJobGetForSLAJPAExecutor";
-    }
-
-    @Override
-    public WorkflowJobBean execute(EntityManager em) throws JPAExecutorException {
-        try {
-            Query q = em.createNamedQuery("GET_WORKFLOW_FOR_SLA");
-            q.setParameter("id", wfJobId);
-            Object[] obj = (Object[]) q.getSingleResult();
-            return getBeanFromArray(obj);
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-    }
-
-    private WorkflowJobBean getBeanFromArray(Object[] arr) {
-        WorkflowJobBean wjb = new WorkflowJobBean();
-        if (arr[0] != null) {
-            wjb.setId((String) arr[0]);
-        }
-        if (arr[1] != null) {
-            wjb.setStatus(WorkflowJob.Status.valueOf((String) arr[1]));
-        }
-        if (arr[2] != null) {
-            wjb.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
-        }
-        if (arr[3] != null) {
-            wjb.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
-        }
-        return wjb;
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
index ce108d5..13fa54d 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
@@ -28,7 +28,6 @@ import javax.persistence.Query;
 import org.apache.oozie.BinaryBlob;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.StringBlob;
-import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -60,7 +59,8 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
         GET_WORKFLOW_RESUME,
         GET_WORKFLOW_STATUS,
         GET_WORKFLOWS_PARENT_COORD_RERUN,
-        GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN
+        GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN,
+        GET_WORKFLOW_FOR_SLA
     };
 
     private static WorkflowJobQueryExecutor instance = new WorkflowJobQueryExecutor();
@@ -171,6 +171,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
             case GET_WORKFLOW_KILL:
             case GET_WORKFLOW_RESUME:
             case GET_WORKFLOW_STATUS:
+            case GET_WORKFLOW_FOR_SLA:
                 query.setParameter("id", parameters[0]);
                 break;
             case GET_WORKFLOWS_PARENT_COORD_RERUN:
@@ -330,6 +331,14 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
                 bean.setId((String) arr[0]);
                 bean.setParentId((String) arr[1]);
                 break;
+            case GET_WORKFLOW_FOR_SLA:
+                bean = new WorkflowJobBean();
+                arr = (Object[]) ret;
+                bean.setId((String) arr[0]);
+                bean.setStatusStr((String) arr[1]);
+                bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+                bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ConfigurationService.java b/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
index 4246764..9d4dcd9 100644
--- a/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
+++ b/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
@@ -544,12 +544,19 @@ public class ConfigurationService implements Service, Instrumentable {
     }
 
     public static long getLong(String name) {
+        return getLong(name, ConfigUtils.LONG_DEFAULT);
+    }
+
+    public static long getLong(String name, long defultValue) {
         Configuration conf = Services.get().getConf();
-        return getLong(conf, name);
+        return getLong(conf, name, defultValue);
     }
 
     public static long getLong(Configuration conf, String name) {
-        return conf.getLong(name, ConfigUtils.LONG_DEFAULT);
+        return getLong(conf, name, ConfigUtils.LONG_DEFAULT);
+    }
+    public static long getLong(Configuration conf, String name, long defultValue) {
+        return conf.getLong(name, defultValue);
     }
 
     public static Class<?>[] getClasses(String name) {
@@ -590,4 +597,5 @@ public class ConfigurationService implements Service, Instrumentable {
         Configuration conf = Services.get().getConf();
         return getPassword(conf, name, defaultValue);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index 5c0cfd9..3a76dfe 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -25,11 +25,6 @@ import java.util.Map;
 import org.apache.oozie.AppType;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.event.SLAEvent;
-import org.apache.oozie.lock.LockToken;
-import org.apache.oozie.service.JobsConcurrencyService;
-import org.apache.oozie.service.MemoryLocksService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.service.SLAService;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.XLog;
 
@@ -49,7 +44,6 @@ public class SLACalcStatus extends SLAEvent {
     private long actualDuration = -1;
     private Date lastModifiedTime;
     private byte eventProcessed;
-    private LockToken lock;
 
     private XLog LOG;
 
@@ -293,53 +287,5 @@ public class SLACalcStatus extends SLAEvent {
     public String getEntityKey() {
         return SLA_ENTITYKEY_PREFIX + this.getId();
     }
-    /**
-     * Obtain an exclusive lock on the {link #getEntityKey}.
-     * <p>
-     * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
-     *
-     * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
-     */
-    public void acquireLock() throws InterruptedException {
-        // only get ZK lock when multiple servers running
-        if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
-            lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
-            if (lock == null) {
-            LOG.debug("Could not aquire lock for [{0}]", getEntityKey());
-            }
-            else {
-                LOG.debug("Acquired lock for [{0}]", getEntityKey());
-            }
-        }
-        else {
-            lock = new DummyToken();
-        }
-    }
-
-    private static class DummyToken implements LockToken {
-        @Override
-        public void release() {
-        }
-    }
-
-    public boolean isLocked() {
-        boolean locked = false;
-        if(lock != null) {
-            locked = true;
-        }
-        return locked;
-    }
-
-    public void releaseLock(){
-        if (lock != null) {
-            lock.release();
-            lock = null;
-            LOG.debug("Released lock for [{0}]", getEntityKey());
-        }
-    }
-
-    public long getLockTimeOut() {
-        return Services.get().getConf().getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000);
-    }
 
 }


[2/3] oozie git commit: OOZIE-2509 SLA job status can stuck in running state

Posted by pu...@apache.org.
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index 42313fd..e8638a9 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -31,47 +31,28 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.AppType;
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.event.JobEvent;
-import org.apache.oozie.client.event.SLAEvent.EventStatus;
 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
 import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
-import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
-import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.JobsConcurrencyService;
-import org.apache.oozie.service.MemoryLocksService;
 import org.apache.oozie.service.SchedulerService;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
@@ -80,8 +61,8 @@ import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.Pair;
-import com.google.common.annotations.VisibleForTesting;
 
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Implementation class for SLACalculator that calculates SLA related to
@@ -111,14 +92,14 @@ public class SLACalculatorMemory implements SLACalculator {
         modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
         loadOnRestart();
         Runnable purgeThread = new HistoryPurgeWorker();
-        // schedule runnable by default 1 day
+        // schedule runnable by default 1 hours
         Services.get()
                 .get(SchedulerService.class)
-                .schedule(purgeThread, 86400, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 86400),
+                .schedule(purgeThread, 3600, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 3600),
                         SchedulerService.Unit.SEC);
     }
 
-    public class HistoryPurgeWorker implements Runnable {
+    public class HistoryPurgeWorker extends Thread {
 
         public HistoryPurgeWorker() {
         }
@@ -131,327 +112,88 @@ public class SLACalculatorMemory implements SLACalculator {
             Iterator<String> jobItr = historySet.iterator();
             while (jobItr.hasNext()) {
                 String jobId = jobItr.next();
-
-                if (jobId.endsWith("-W")) {
-                    WorkflowJobBean wfJob = null;
-                    try {
-                        wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
-                    }
-                    catch (JPAExecutorException e) {
-                        if (e.getErrorCode().equals(ErrorCode.E0604)) {
-                            jobItr.remove();
-                        }
-                        else {
-                            LOG.info("Failed to fetch the workflow job: " + jobId, e);
-                        }
-                    }
-                    if (wfJob != null && wfJob.inTerminalState()) {
-                        try {
-                            updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime());
-                            jobItr.remove();
-                        }
-                        catch (JPAExecutorException e) {
-                            LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
-                        }
-
-                    }
-                }
-                else if (jobId.contains("-W@")) {
-                    WorkflowActionBean wfAction = null;
-                    try {
-                        wfAction = WorkflowActionQueryExecutor.getInstance().get(
-                                WorkflowActionQuery.GET_ACTION_COMPLETED, jobId);
-                    }
-                    catch (JPAExecutorException e) {
-                        if (e.getErrorCode().equals(ErrorCode.E0605)) {
-                            jobItr.remove();
-                        }
-                        else {
-                            LOG.info("Failed to fetch the workflow action: " + jobId, e);
-                        }
-                    }
-                    if (wfAction != null && (wfAction.isComplete() || wfAction.isTerminalWithFailure())) {
-                        try {
-                            updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime());
-                            jobItr.remove();
-                        }
-                        catch (JPAExecutorException e) {
-                            LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
-                        }
-                    }
-                }
-                else if (jobId.contains("-C@")) {
-                    CoordinatorActionBean cAction = null;
-                    try {
-                        cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId);
-                    }
-                    catch (JPAExecutorException e) {
-                        if (e.getErrorCode().equals(ErrorCode.E0605)) {
-                            jobItr.remove();
-                        }
-                        else {
-                            LOG.info("Failed to fetch the coord action: " + jobId, e);
-                        }
-                    }
-                    if (cAction != null && cAction.isTerminalStatus()) {
-                        try {
-                            updateSLASummaryForCoordAction(cAction);
-                            jobItr.remove();
-                        }
-                        catch (JPAExecutorException e) {
-                            XLog.getLog(SLACalculatorMemory.class).info(
-                                    "Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
-                        }
-
+                LOG.debug(" Running HistoryPurgeWorker for " + jobId);
+                try {
+                    boolean isDone = SLAXCommandFactory.getSLAJobHistoryXCommand(jobId).call();
+                    if (isDone) {
+                        LOG.debug("[{0}] job is finished and processed. Removing from history");
+                        jobItr.remove();
                     }
                 }
-                else if (jobId.endsWith("-C")) {
-                    CoordinatorJobBean cJob = null;
-                    try {
-                        cJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID,
-                                jobId);
-                    }
-                    catch (JPAExecutorException e) {
-                        if (e.getErrorCode().equals(ErrorCode.E0604)) {
-                            jobItr.remove();
-                        }
-                        else {
-                            LOG.info("Failed to fetch the coord job: " + jobId, e);
-                        }
+                catch (CommandException e) {
+                    if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
+                        LOG.warn("Job is not found in db: " + jobId, e);
+                        jobItr.remove();
                     }
-                    if (cJob != null && cJob.isTerminalStatus()) {
-                        try {
-                            updateSLASummary(cJob.getId(), cJob.getStartTime(), cJob.getEndTime());
-                            jobItr.remove();
-                        }
-                        catch (JPAExecutorException e) {
-                            LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
-                        }
-
+                    else {
+                        LOG.error("Failed to fetch the job: " + jobId, e);
                     }
                 }
             }
         }
-
-        private void updateSLASummary(String id, Date startTime, Date endTime) throws JPAExecutorException {
-            SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
-            if (sla != null) {
-                sla.setActualStart(startTime);
-                sla.setActualEnd(endTime);
-                if (startTime != null && endTime != null) {
-                    sla.setActualDuration(endTime.getTime() - startTime.getTime());
-                }
-                sla.setLastModifiedTime(new Date());
-                sla.setEventProcessed(8);
-                SLASummaryQueryExecutor.getInstance().executeUpdate(
-                        SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, sla);
-            }
-        }
-
-        private void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException {
-            String wrkflowId = bean.getExternalId();
-            if (wrkflowId != null) {
-                WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get(
-                        WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId);
-                if (wrkflow != null) {
-                    updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime());
-                }
-            }
-        }
     }
 
     private void loadOnRestart() {
-        boolean isJobModified = false;
+        long slaPendingCount = 0;
+        long statusPendingCount = 0;
+
         try {
-            long slaPendingCount = 0;
-            long statusPendingCount = 0;
             List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
                     modifiedAfter));
             for (SLASummaryBean summaryBean : summaryBeans) {
                 String jobId = summaryBean.getId();
-                LockToken lock = null;
-                switch (summaryBean.getAppType()) {
-                    case COORDINATOR_ACTION:
-                        isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
-                        break;
-                    case WORKFLOW_ACTION:
-                        isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
-                        break;
-                    case WORKFLOW_JOB:
-                        isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
-                        break;
-                    default:
-                        break;
-                }
-                if (isJobModified) {
-                    try {
-                        boolean update = true;
-                        if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
-                            lock = Services
-                                    .get()
-                                    .get(MemoryLocksService.class)
-                                    .getWriteLock(
-                                            SLACalcStatus.SLA_ENTITYKEY_PREFIX + jobId,
-                                            Services.get().getConf()
-                                                    .getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000));
-                            if (lock == null) {
-                                update = false;
-                            }
-                        }
-                        if (update) {
-                            summaryBean.setLastModifiedTime(new Date());
-                            SLASummaryQueryExecutor.getInstance().executeUpdate(
-                                    SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean);
-                        }
-                    }
-                    catch (Exception e) {
-                        LOG.warn("Failed to load records for " + jobId, e);
-                    }
-                    finally {
-                        if (lock != null) {
-                            lock.release();
-                            lock = null;
-                        }
-                    }
-                }
+
+                SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
+                        SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
+                SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
+
+                // Processed missed jobs
                 try {
-                    if (summaryBean.getEventProcessed() == 7) {
-                        historySet.add(jobId);
-                        statusPendingCount++;
-                    }
-                    else if (summaryBean.getEventProcessed() <= 7) {
-                        SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
-                                SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
-                        SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
-                        slaMap.put(jobId, slaCalcStatus);
-                        slaPendingCount++;
-                    }
+                    SLAXCommandFactory.getSLAEventXCommand(slaCalcStatus).call();
                 }
-                catch (Exception e) {
-                    LOG.warn("Failed to fetch/update records for " + jobId, e);
+                catch (Throwable e) {
+                    LOG.error("Error while updating job {0}", slaCalcStatus.getId(), e);
                 }
 
+                if (slaCalcStatus.getEventProcessed() == 7) {
+                    historySet.add(jobId);
+                    statusPendingCount++;
+                    LOG.debug("Adding job [{0}] to historySet. EventProcessed is [{1}]", slaCalcStatus,
+                            slaCalcStatus);
+                }
+                else if (slaCalcStatus.getEventProcessed() < 7) {
+                    slaMap.put(jobId, slaCalcStatus);
+                    slaPendingCount++;
+                    LOG.debug("Adding job [{0}] to slamap. EventProcessed is [{1}]", slaCalcStatus,
+                            slaCalcStatus);
+
+                }
             }
             LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount);
-
         }
         catch (Exception e) {
             LOG.warn("Failed to retrieve SLASummary records on restart", e);
         }
     }
 
-    private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId)
-            throws JPAExecutorException {
-        boolean isJobModified = false;
-        CoordinatorActionBean coordAction = null;
-        coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId));
-        if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) {
-            LOG.trace("Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is "
-                    + summaryBean.getJobStatus());
-            isJobModified = true;
-            summaryBean.setJobStatus(coordAction.getStatusStr());
-            if (coordAction.isTerminalStatus()) {
-                WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
-                        .getExternalId()));
-                setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(),
-                        coordAction.getStatusStr());
-            }
-            else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) {
-                WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
-                        .getExternalId()));
-                setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
-            }
-        }
-        return isJobModified;
-    }
-
-    private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId)
-            throws JPAExecutorException {
-        boolean isJobModified = false;
-        WorkflowActionBean wfAction = null;
-        wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId));
-        if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) {
-            LOG.trace("Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is "
-                    + summaryBean.getJobStatus());
-            isJobModified = true;
-            summaryBean.setJobStatus(wfAction.getStatusStr());
-            if (wfAction.inTerminalState()) {
-                setEndForSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
-            }
-            else if (wfAction.getStatus() != WorkflowAction.Status.PREP) {
-                setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime());
-            }
-        }
-        return isJobModified;
-    }
-
-    private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId)
-            throws JPAExecutorException {
-        boolean isJobModified = false;
-        WorkflowJobBean wfJob = null;
-        wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId));
-        if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) {
-            LOG.trace("Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is "
-                    + summaryBean.getJobStatus());
-            isJobModified = true;
-            summaryBean.setJobStatus(wfJob.getStatusStr());
-            if (wfJob.inTerminalState()) {
-                setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
-            }
-            else if (wfJob.getStatus() != WorkflowJob.Status.PREP) {
-                setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
-            }
-        }
-        return isJobModified;
-    }
-
-    private void setEndForSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) {
-        byte eventProc = summaryBean.getEventProcessed();
-        summaryBean.setEventProcessed(8);
-        summaryBean.setActualStart(startTime);
-        summaryBean.setActualEnd(endTime);
-        long actualDuration = endTime.getTime() - startTime.getTime();
-        summaryBean.setActualDuration(actualDuration);
-        if (eventProc < 4) {
-            if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name())
-                    || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) {
-                if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) {
-                    summaryBean.setSLAStatus(SLAStatus.MET);
-                }
-                else {
-                    summaryBean.setSLAStatus(SLAStatus.MISS);
-                }
-            }
-            else {
-                summaryBean.setSLAStatus(SLAStatus.MISS);
-            }
-        }
-
-    }
-
-    private void setStartForSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) {
-        if (((eventProc & 1) == 0)) {
-            eventProc += 1;
-            summaryBean.setEventProcessed(eventProc);
-        }
-        if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
-            summaryBean.setSLAStatus(SLAStatus.IN_PROCESS);
-        }
-        summaryBean.setActualStart(startTime);
-    }
-
     @Override
     public int size() {
         return slaMap.size();
     }
 
+    @VisibleForTesting
+    public Set<String> getHistorySet(){
+        return historySet;
+    }
+
     @Override
     public SLACalcStatus get(String jobId) throws JPAExecutorException {
         SLACalcStatus memObj;
         memObj = slaMap.get(jobId);
         if (memObj == null && historySet.contains(jobId)) {
-            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId),
-                    SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
+            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
+                    .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get(
+                    SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
         }
         return memObj;
     }
@@ -460,13 +202,13 @@ public class SLACalculatorMemory implements SLACalculator {
         SLACalcStatus memObj;
         memObj = slaMap.get(jobId);
         if (memObj == null) {
-            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId),
-                    SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
+            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
+                    .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get(
+                    SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
         }
         return memObj;
     }
 
-
     @Override
     public Iterator<String> iterator() {
         return slaMap.keySet().iterator();
@@ -488,12 +230,17 @@ public class SLACalculatorMemory implements SLACalculator {
      */
     protected void updateJobSla(String jobId) throws Exception {
         SLACalcStatus slaCalc = slaMap.get(jobId);
+
+        if (slaCalc == null) {
+            // job might be processed and removed from map by addJobStatus
+            return;
+        }
         synchronized (slaCalc) {
-            boolean change = false;
             // get eventProcessed on DB for validation in HA
             SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
                     SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
             byte eventProc = summaryBean.getEventProcessed();
+            slaCalc.setEventProcessed(eventProc);
             if (eventProc >= 7) {
                 if (eventProc == 7) {
                     historySet.add(jobId);
@@ -503,127 +250,69 @@ public class SLACalculatorMemory implements SLACalculator {
             }
             else {
                 if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
-                    //Update last modified time.
+                    // Update last modified time.
                     slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
                     reloadExpectedTimeAndConfig(slaCalc);
                     LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
                 }
-                slaCalc.setEventProcessed(eventProc);
-                SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
-                // calculation w.r.t current time and status
-                if ((eventProc & 1) == 0) { // first bit (start-processed) unset
-                    if (reg.getExpectedStart() != null) {
-                        if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
-                            confirmWithDB(slaCalc);
-                            eventProc = slaCalc.getEventProcessed();
-                            if (eventProc != 8 && (eventProc & 1) == 0) {
-                                // Some DB exception
-                                slaCalc.setEventStatus(EventStatus.START_MISS);
-                                if (shouldAlert(slaCalc)) {
-                                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                                }
-                                eventProc++;
-                            }
-                            change = true;
-                        }
-                    }
-                    else {
-                        eventProc++; // disable further processing for optional start sla condition
-                        change = true;
-                    }
-                }
-                // check if second bit (duration-processed) is unset
-                if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
-                    if (reg.getExpectedDuration() == -1) {
-                        eventProc += 2;
-                        change = true;
-                    }
-                    else if (slaCalc.getActualStart() != null) {
-                        if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
-                                .getActualStart().getTime())) {
-                            slaCalc.setEventProcessed(eventProc);
-                            confirmWithDB(slaCalc);
-                            eventProc = slaCalc.getEventProcessed();
-                            if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
-                                // Some DB exception
-                                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
-                                if (shouldAlert(slaCalc)) {
-                                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                                }
-                                eventProc += 2;
-                            }
-                            change = true;
-                        }
-                    }
-                }
-                if (eventProc < 4) {
-                    if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
-                        slaCalc.setEventProcessed(eventProc);
-                        confirmWithDB(slaCalc);
-                        eventProc = slaCalc.getEventProcessed();
-                        change = true;
-                    }
-                }
-                if (change) {
+                if (isChanged(slaCalc)) {
+                    LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(),
+                            slaCalc.getEventProcessed(), slaCalc.getJobStatus());
                     try {
-                        boolean locked = true;
-                        slaCalc.acquireLock();
-                        locked = slaCalc.isLocked();
-                        if (locked) {
-                            // no more processing, no transfer to history set
-                            if (slaCalc.getEventProcessed() >= 8) {
-                                eventProc = 8;
-                                // Should not be > 8. But to handle any corner cases
-                                slaCalc.setEventProcessed(8);
-                                slaMap.remove(jobId);
-                                LOG.trace("Removed Job [{0}] from map after Event-processed=8", jobId);
-                            }
-                            else {
-                                slaCalc.setEventProcessed(eventProc);
-                            }
-                            writetoDB(slaCalc, eventProc);
-                            if (eventProc == 7) {
-                                historySet.add(jobId);
-                                slaMap.remove(jobId);
-                                LOG.trace("Removed Job [{0}] from map after Event-processed=7", jobId);
-                            }
-                        }
-                    }
-                    catch (InterruptedException e) {
-                        throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut());
+                        SLAXCommandFactory.getSLAEventXCommand(slaCalc).call();
+                        checkEventProc(slaCalc);
                     }
-                    finally {
-                        slaCalc.releaseLock();
+                    catch (XException e) {
+                        if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
+                            LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId());
+                            slaMap.remove(jobId);
+                        }
                     }
                 }
+
             }
         }
     }
 
-    private void writetoDB(SLACalcStatus slaCalc, byte eventProc) throws JPAExecutorException {
-        SLASummaryBean slaSummaryBean = new SLASummaryBean();
-        slaSummaryBean.setId(slaCalc.getId());
-        slaSummaryBean.setEventProcessed(eventProc);
-        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
-        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
-        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
-        slaSummaryBean.setActualStart(slaCalc.getActualStart());
-        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
-        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
-        slaSummaryBean.setLastModifiedTime(new Date());
+    private boolean isChanged(SLACalcStatus slaCalc) {
+        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
+        byte eventProc = slaCalc.getEventProcessed();
 
-        SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
-                slaSummaryBean);
-        LOG.trace("Stored SLA SummaryBean Job [{0}] with Event-processed=[{1}]", slaCalc.getId(),
-                slaSummaryBean.getEventProcessed());
+        if ((eventProc & 1) == 0) { // first bit (start-processed) unset
+            if (reg.getExpectedStart() != null) {
+                if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
+                    return true;
+                }
+            }
+            else {
+                return true;
+            }
+        }
+        if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+            if (reg.getExpectedDuration() == -1) {
+                return true;
+            }
+            else if (slaCalc.getActualStart() != null) {
+                if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
+                        .getActualStart().getTime())) {
+                    return true;
+                }
+            }
+        }
+        if (eventProc < 4) {
+            if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
+                return true;
+            }
+        }
+        return false;
     }
 
     @SuppressWarnings("rawtypes")
-    private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList)
-            throws JPAExecutorException {
+    private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) throws JPAExecutorException {
         updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean()));
         slaCalc.setLastModifiedTime(new Date());
-        updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME, new SLASummaryBean(slaCalc)));
+        updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME,
+                new SLASummaryBean(slaCalc)));
     }
 
     @SuppressWarnings("rawtypes")
@@ -641,7 +330,6 @@ public class SLACalculatorMemory implements SLACalculator {
         BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
     }
 
-
     /**
      * Periodically run by the SLAService worker threads to update SLA status by
      * iterating through all the jobs in the map
@@ -770,410 +458,62 @@ public class SLACalculatorMemory implements SLACalculator {
     @Override
     public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
             Date endTime) throws JPAExecutorException, ServiceException {
+        LOG.debug(
+                "Received addJobStatus request for job  [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], "
+                        + "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime);
         SLACalcStatus slaCalc = slaMap.get(jobId);
-        SLASummaryBean slaInfo = null;
-        boolean hasSla = false;
-        if (slaCalc == null) {
-            if (historySet.contains(jobId)) {
-                slaInfo = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
-                if (slaInfo == null) {
-                    throw new JPAExecutorException(ErrorCode.E0604, jobId);
-                }
-                slaInfo.setJobStatus(jobStatus);
-                slaInfo.setActualStart(startTime);
-                slaInfo.setActualEnd(endTime);
-                if (endTime != null) {
-                    slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
-                }
-                slaInfo.setEventProcessed(8);
-                historySet.remove(jobId);
-                slaInfo.setLastModifiedTime(new Date());
-                SLASummaryQueryExecutor.getInstance().executeUpdate(
-                        SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo);
-                hasSla = true;
-            }
-            else if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
-                // jobid might not exist in slaMap in HA Setting
-                SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
-                        SLARegQuery.GET_SLA_REG_ALL, jobId);
-                if (slaRegBean != null) { // filter out jobs picked by SLA job event listener
-                                          // but not actually configured for SLA
-                    SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
-                            SLASummaryQuery.GET_SLA_SUMMARY, jobId);
-                    if (slaSummaryBean.getEventProcessed() < 7) {
-                        slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
-                        slaMap.put(jobId, slaCalc);
-                    }
-                }
-            }
-        }
-        if (slaCalc != null) {
-            synchronized (slaCalc) {
-                try {
-                    // only get ZK lock when multiple servers running
-                    boolean locked = true;
-                    slaCalc.acquireLock();
-                    locked = slaCalc.isLocked();
-                    if (locked) {
-                        // get eventProcessed on DB for validation in HA
-                        SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
-                                SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
-                        byte eventProc = summaryBean.getEventProcessed();
-
-                        if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
-                            //Update last modified time.
-                            slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
-                            reloadExpectedTimeAndConfig(slaCalc);
-                            LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
-                        }
 
-                        slaCalc.setEventProcessed(eventProc);
-                        slaCalc.setJobStatus(jobStatus);
-                        switch (jobEventStatus) {
-                            case STARTED:
-                                slaInfo = processJobStartSLA(slaCalc, startTime);
-                                break;
-                            case SUCCESS:
-                                slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
-                                break;
-                            case FAILURE:
-                                slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
-                                break;
-                            default:
-                                LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
-                                slaInfo = getSLASummaryBean(slaCalc);
-                        }
-                        if (slaCalc.getEventProcessed() == 7) {
-                            slaInfo.setEventProcessed(8);
-                            slaMap.remove(jobId);
-                        }
-                        slaInfo.setLastModifiedTime(new Date());
-                        SLASummaryQueryExecutor.getInstance().executeUpdate(
-                                SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo);
-                        hasSla = true;
-                    }
-                }
-                catch (InterruptedException e) {
-                    throw new ServiceException(ErrorCode.E0606, slaCalc.getEntityKey(), slaCalc.getLockTimeOut());
-                }
-                finally {
-                    slaCalc.releaseLock();
-                }
+        if (slaCalc == null) {
+            SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
+                    SLARegQuery.GET_SLA_REG_ALL, jobId);
+            if (slaRegBean != null) { // filter out jobs picked by SLA job event listener
+                                      // but not actually configured for SLA
+                SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
+                        SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+                slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
+                slaMap.put(jobId, slaCalc);
             }
-            LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
         }
+        else {
+            SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
+                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
+            byte eventProc = summaryBean.getEventProcessed();
+            if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
+                // Update last modified time.
+                slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
+                reloadExpectedTimeAndConfig(slaCalc);
+                LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
 
-        return hasSla;
-    }
-
-    /**
-     * Process SLA for jobs that started running. Also update actual-start time
-     *
-     * @param slaCalc
-     * @param actualStart
-     * @return SLASummaryBean
-     */
-    private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) {
-        slaCalc.setActualStart(actualStart);
-        if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
-            slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
-        }
-        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
-        Date expecStart = reg.getExpectedStart();
-        byte eventProc = slaCalc.getEventProcessed();
-        // set event proc here
-        if (((eventProc & 1) == 0)) {
-            if (expecStart != null) {
-                if (actualStart.getTime() > expecStart.getTime()) {
-                    slaCalc.setEventStatus(EventStatus.START_MISS);
-                }
-                else {
-                    slaCalc.setEventStatus(EventStatus.START_MET);
-                }
-                if (shouldAlert(slaCalc)) {
-                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                }
             }
-            eventProc += 1;
-            slaCalc.setEventProcessed(eventProc);
-        }
-        return getSLASummaryBean(slaCalc);
-    }
-
-    /**
-     * Process SLA for jobs that ended successfully. Also update actual-start
-     * and end time
-     *
-     * @param slaCalc
-     * @param actualStart
-     * @param actualEnd
-     * @return SLASummaryBean
-     * @throws JPAExecutorException
-     */
-    private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
-        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
-        slaCalc.setActualStart(actualStart);
-        slaCalc.setActualEnd(actualEnd);
-        long expectedDuration = reg.getExpectedDuration();
-        long actualDuration = actualEnd.getTime() - actualStart.getTime();
-        slaCalc.setActualDuration(actualDuration);
-        //check event proc
-        byte eventProc = slaCalc.getEventProcessed();
-        if (((eventProc >> 1) & 1) == 0) {
-            processDurationSLA(expectedDuration, actualDuration, slaCalc);
-            eventProc += 2;
             slaCalc.setEventProcessed(eventProc);
         }
-
-        if (eventProc < 4) {
-            Date expectedEnd = reg.getExpectedEnd();
-            if (actualEnd.getTime() > expectedEnd.getTime()) {
-                slaCalc.setEventStatus(EventStatus.END_MISS);
-                slaCalc.setSLAStatus(SLAStatus.MISS);
-            }
-            else {
-                slaCalc.setEventStatus(EventStatus.END_MET);
-                slaCalc.setSLAStatus(SLAStatus.MET);
-            }
-            eventProc += 4;
-            slaCalc.setEventProcessed(eventProc);
-            if (shouldAlert(slaCalc)) {
-                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+        if (slaCalc != null) {
+            try {
+                SLAXCommandFactory.getSLAEventXCommand(slaCalc,
+                        ConfigurationService.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 20 * 1000)).call();
+                checkEventProc(slaCalc);
             }
-        }
-        return getSLASummaryBean(slaCalc);
-    }
-
-    /**
-     * Process SLA for jobs that ended in failure. Also update actual-start and
-     * end time
-     *
-     * @param slaCalc
-     * @param actualStart
-     * @param actualEnd
-     * @return SLASummaryBean
-     * @throws JPAExecutorException
-     */
-    private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
-        slaCalc.setActualStart(actualStart);
-        slaCalc.setActualEnd(actualEnd);
-        if (actualStart == null) { // job failed before starting
-            if (slaCalc.getEventProcessed() < 4) {
-                slaCalc.setEventStatus(EventStatus.END_MISS);
-                slaCalc.setSLAStatus(SLAStatus.MISS);
-                if (shouldAlert(slaCalc)) {
-                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                }
-                slaCalc.setEventProcessed(7);
-                return getSLASummaryBean(slaCalc);
+            catch (XException e) {
+                LOG.error(e);
+                throw new ServiceException(e);
             }
+            return true;
         }
-        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
-        long expectedDuration = reg.getExpectedDuration();
-        long actualDuration = actualEnd.getTime() - actualStart.getTime();
-        slaCalc.setActualDuration(actualDuration);
-
-        byte eventProc = slaCalc.getEventProcessed();
-        if (((eventProc >> 1) & 1) == 0) {
-            if (expectedDuration != -1) {
-                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
-                if (shouldAlert(slaCalc)) {
-                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                }
-            }
-            eventProc += 2;
-            slaCalc.setEventProcessed(eventProc);
-        }
-        if (eventProc < 4) {
-            slaCalc.setEventStatus(EventStatus.END_MISS);
-            slaCalc.setSLAStatus(SLAStatus.MISS);
-            eventProc += 4;
-            slaCalc.setEventProcessed(eventProc);
-            if (shouldAlert(slaCalc)) {
-                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-            }
-        }
-        return getSLASummaryBean(slaCalc);
-    }
-
-    private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) {
-        SLASummaryBean slaSummaryBean = new SLASummaryBean();
-        slaSummaryBean.setActualStart(slaCalc.getActualStart());
-        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
-        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
-        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
-        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
-        slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed());
-        slaSummaryBean.setId(slaCalc.getId());
-        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
-        return slaSummaryBean;
-    }
-
-    private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
-        if (expected != -1) {
-            if (actual > expected) {
-                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
-            }
-            else if (actual <= expected) {
-                slaCalc.setEventStatus(EventStatus.DURATION_MET);
-            }
-            if (shouldAlert(slaCalc)) {
-                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-            }
+        else {
+            return false;
         }
     }
 
-    /*
-     * Confirm alerts against source of truth - DB. Also required in case of High Availability
-     */
-    private void confirmWithDB(SLACalcStatus slaCalc) {
-        boolean ended = false, isEndMiss = false;
-        try {
-            switch (slaCalc.getAppType()) {
-                case WORKFLOW_JOB:
-                    WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(slaCalc.getId()));
-                    if (wf.getEndTime() != null) {
-                        ended = true;
-                        if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
-                                || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
-                            isEndMiss = true;
-                        }
-                    }
-                    slaCalc.setActualStart(wf.getStartTime());
-                    slaCalc.setActualEnd(wf.getEndTime());
-                    slaCalc.setJobStatus(wf.getStatusStr());
-                    break;
-                case WORKFLOW_ACTION:
-                    WorkflowActionBean wa = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(slaCalc.getId()));
-                    if (wa.getEndTime() != null) {
-                        ended = true;
-                        if (wa.isTerminalWithFailure()
-                                || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
-                            isEndMiss = true;
-                        }
-                    }
-                    slaCalc.setActualStart(wa.getStartTime());
-                    slaCalc.setActualEnd(wa.getEndTime());
-                    slaCalc.setJobStatus(wa.getStatusStr());
-                    break;
-                case COORDINATOR_ACTION:
-                    CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId()));
-                    if (ca.isTerminalWithFailure()) {
-                        isEndMiss = ended = true;
-                        slaCalc.setActualEnd(ca.getLastModifiedTime());
-                    }
-                    if (ca.getExternalId() != null) {
-                        wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ca.getExternalId()));
-                        if (wf.getEndTime() != null) {
-                            ended = true;
-                            if (wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
-                                isEndMiss = true;
-                            }
-                        }
-                        slaCalc.setActualEnd(wf.getEndTime());
-                        slaCalc.setActualStart(wf.getStartTime());
-                    }
-                    slaCalc.setJobStatus(ca.getStatusStr());
-                    break;
-                default:
-                    LOG.debug("Unsupported App-type for SLA - " + slaCalc.getAppType());
-            }
-
-            byte eventProc = slaCalc.getEventProcessed();
-            if (ended) {
-                if (isEndMiss) {
-                    slaCalc.setSLAStatus(SLAStatus.MISS);
-                }
-                else {
-                    slaCalc.setSLAStatus(SLAStatus.MET);
-                }
-                if (slaCalc.getActualStart() != null) {
-                    if ((eventProc & 1) == 0) {
-                        if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
-                            slaCalc.setEventStatus(EventStatus.START_MISS);
-                        }
-                        else {
-                            slaCalc.setEventStatus(EventStatus.START_MET);
-                        }
-                        if (shouldAlert(slaCalc)) {
-                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                        }
-                    }
-                    slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
-                    if (((eventProc >> 1) & 1) == 0) {
-                        processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
-                    }
-                }
-                if (eventProc < 4) {
-                    if (isEndMiss) {
-                        slaCalc.setEventStatus(EventStatus.END_MISS);
-                    }
-                    else {
-                        slaCalc.setEventStatus(EventStatus.END_MET);
-                    }
-                    if (shouldAlert(slaCalc)) {
-                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                    }
-                }
-                slaCalc.setEventProcessed(8);
-            }
-            else {
-                if (slaCalc.getActualStart() != null) {
-                    slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
-                }
-                if ((eventProc & 1) == 0) {
-                    if (slaCalc.getActualStart() != null) {
-                        if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
-                            slaCalc.setEventStatus(EventStatus.START_MISS);
-                        }
-                        else {
-                            slaCalc.setEventStatus(EventStatus.START_MET);
-                        }
-                        if (shouldAlert(slaCalc)) {
-                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                        }
-                        eventProc++;
-                    }
-                    else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
-                        slaCalc.setEventStatus(EventStatus.START_MISS);
-                        if (shouldAlert(slaCalc)) {
-                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                        }
-                        eventProc++;
-                    }
-                }
-                if (((eventProc >> 1) & 1) == 0 && slaCalc.getActualStart() != null
-                        && slaCalc.getExpectedDuration() != -1) {
-                    if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
-                        slaCalc.setEventStatus(EventStatus.DURATION_MISS);
-                        if (shouldAlert(slaCalc)) {
-                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                        }
-                        eventProc += 2;
-                    }
-                }
-                if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
-                    slaCalc.setEventStatus(EventStatus.END_MISS);
-                    slaCalc.setSLAStatus(SLAStatus.MISS);
-                    if (shouldAlert(slaCalc)) {
-                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                    }
-                    eventProc += 4;
-                }
-                slaCalc.setEventProcessed(eventProc);
-            }
+    private void checkEventProc(SLACalcStatus slaCalc){
+        byte eventProc = slaCalc.getEventProcessed();
+        if (slaCalc.getEventProcessed() >= 8) {
+            slaMap.remove(slaCalc.getId());
+            LOG.debug("Removed Job [{0}] from map after Event-processed=8", slaCalc.getId());
         }
-        catch (Exception e) {
-            LOG.warn("Error while confirming SLA against DB for jobid= " + slaCalc.getId() + ". Exception is "
-                    + e.getClass().getName() + ": " + e.getMessage());
-            if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
-                slaCalc.setEventStatus(EventStatus.END_MISS);
-                slaCalc.setSLAStatus(SLAStatus.MISS);
-                if (shouldAlert(slaCalc)) {
-                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                }
-                slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4);
-            }
+        if (eventProc == 7) {
+            historySet.add(slaCalc.getId());
+            slaMap.remove(slaCalc.getId());
+            LOG.debug("Removed Job [{0}] from map after Event-processed=7", slaCalc.getId());
         }
     }
 
@@ -1235,21 +575,21 @@ public class SLACalculatorMemory implements SLACalculator {
         return enableAlert(getSLAJobsforParents(parentJobIds));
     }
 
-
     @Override
     public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException {
         boolean isJobFound = false;
         @SuppressWarnings("rawtypes")
         List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
 
-            for (String jobId : jobIds) {
-                SLACalcStatus slaCalc = getSLACalcStatus(jobId);
-                if (slaCalc != null) {
-                    slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true));
-                    updateDBSlaConfig(slaCalc, updateList);
-                    isJobFound = true;
-                }
+        for (String jobId : jobIds) {
+            SLACalcStatus slaCalc = getSLACalcStatus(jobId);
+            if (slaCalc != null) {
+                slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
+                        Boolean.toString(true));
+                updateDBSlaConfig(slaCalc, updateList);
+                isJobFound = true;
             }
+        }
         executeBatchQuery(updateList);
         return isJobFound;
     }
@@ -1260,19 +600,19 @@ public class SLACalculatorMemory implements SLACalculator {
     }
 
     @Override
-    public boolean changeDefinition(List<Pair<String, Map<String,String>>> jobIdsSLAPair ) throws JPAExecutorException,
-            ServiceException{
+    public boolean changeDefinition(List<Pair<String, Map<String, String>>> jobIdsSLAPair) throws JPAExecutorException,
+            ServiceException {
         boolean isJobFound = false;
         @SuppressWarnings("rawtypes")
         List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
-            for (Pair<String, Map<String,String>> jobIdSLAPair : jobIdsSLAPair) {
-                SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist());
-                if (slaCalc != null) {
-                    updateParams(slaCalc, jobIdSLAPair.getSecond());
-                    updateDBSlaExpectedValues(slaCalc, updateList);
-                    isJobFound = true;
-                }
+        for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) {
+            SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist());
+            if (slaCalc != null) {
+                updateParams(slaCalc, jobIdSLAPair.getSecond());
+                updateDBSlaExpectedValues(slaCalc, updateList);
+                isJobFound = true;
             }
+        }
         executeBatchQuery(updateList);
         return isJobFound;
     }
@@ -1292,11 +632,7 @@ public class SLACalculatorMemory implements SLACalculator {
         }
     }
 
-    private boolean shouldAlert(SLACalcStatus slaObj) {
-        return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT);
-    }
-
-    private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException{
+    private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException {
         List<String> childJobIds = new ArrayList<String>();
         for (String jobId : parentJobIds) {
             List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList(
@@ -1307,4 +643,5 @@ public class SLACalculatorMemory implements SLACalculator {
         }
         return childJobIds;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
index ef1ea98..3b2cebd 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
@@ -52,8 +52,6 @@ import org.json.simple.JSONObject;
 
  @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES", query = "update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.jobStatus = :jobStatus, w.lastModifiedTS = :lastModifiedTS, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration where w.jobId = :jobId"),
 
- @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration, w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"),
-
  @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES", query = "update SLASummaryBean w set w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration , w.lastModifiedTS = :lastModTime where w.jobId = :jobId"),
 
  @NamedQuery(name = "UPDATE_SLA_SUMMARY_EVENTPROCESSED", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed where w.jobId = :jobId"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java b/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java
new file mode 100644
index 0000000..e6298f5
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.sla;
+
+import org.apache.oozie.command.sla.SLACoordActionJobEventXCommand;
+import org.apache.oozie.command.sla.SLACoordActionJobHistoryXCommand;
+import org.apache.oozie.command.sla.SLAJobEventXCommand;
+import org.apache.oozie.command.sla.SLAJobHistoryXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowActionJobEventXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowActionJobHistoryXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowJobEventXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowJobHistoryXCommand;
+
+/**
+ * A factory for creating SLACommand objects.
+ */
+public class SLAXCommandFactory {
+
+    /**
+     * Gets the SLA job history  command.
+     *
+     * @param jobId the job id
+     * @return the SLA job history x command
+     */
+    public static SLAJobHistoryXCommand getSLAJobHistoryXCommand(String jobId) {
+        if (jobId.endsWith("-W")) {
+            return new SLAWorkflowJobHistoryXCommand(jobId);
+        }
+        else if (jobId.contains("-W@")) {
+            return new SLAWorkflowActionJobHistoryXCommand(jobId);
+
+        }
+        else if (jobId.contains("-C@")) {
+            return new SLACoordActionJobHistoryXCommand(jobId);
+        }
+
+        else {
+            return null;
+        }
+    }
+
+    /**
+     * Gets the SLAevent  command.
+     *
+     * @param slaCalc the sla calc
+     * @return the SLA event x command
+     */
+    public static SLAJobEventXCommand getSLAEventXCommand(SLACalcStatus slaCalc) {
+        return getSLAEventXCommand(slaCalc, 0);
+    }
+
+    /**
+     * Gets the SLA event x command.
+     *
+     * @param slaCalc the sla calc
+     * @param lockTimeOut the lock time out
+     * @return the SLA event x command
+     */
+    public static SLAJobEventXCommand getSLAEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+        if (slaCalc.getId().endsWith("-W")) {
+            return new SLAWorkflowJobEventXCommand(slaCalc, lockTimeOut);
+        }
+        else if (slaCalc.getId().contains("-W@")) {
+            return new SLAWorkflowActionJobEventXCommand(slaCalc, lockTimeOut);
+
+        }
+        else if (slaCalc.getId().contains("-C@")) {
+            return new SLACoordActionJobEventXCommand(slaCalc, lockTimeOut);
+        }
+
+        else {
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
index 52560e6..e239084 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
@@ -19,6 +19,7 @@
 package org.apache.oozie.command.coord;
 
 import java.util.Date;
+
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.WorkflowJobBean;
@@ -28,9 +29,10 @@ import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
@@ -84,7 +86,9 @@ public class TestCoordActionsKillXCommand extends XDataTestCase {
         assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
 
         sleep(100);
-        WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ids[3]));
+        WorkflowJobBean wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA,
+                ids[3]);
+
         assertEquals(WorkflowJob.Status.KILLED, wf.getStatus());
 
         CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(ids[0]));
@@ -118,7 +122,8 @@ public class TestCoordActionsKillXCommand extends XDataTestCase {
         assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
 
         sleep(100);
-        WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ids[3]));
+        WorkflowJobBean wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA,
+                ids[3]);
         assertEquals(WorkflowJob.Status.KILLED, wf.getStatus());
 
         CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(ids[0]));

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
index 5914b3b..1daecdc 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
@@ -131,7 +131,7 @@ public class TestSLASummaryQueryExecutor extends XDataTestCase {
         bean.setActualDuration(endTime.getTime() - startTime.getTime());
         bean.setLastModifiedTime(new Date());
         bean.setEventProcessed(8);
-        SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, bean);
+        SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, bean);
         retBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, bean.getId());
         assertEquals(bean.getActualStartTimestamp(), retBean.getActualStartTimestamp());
         assertEquals(bean.getActualEndTimestamp(), retBean.getActualEndTimestamp());

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
index 795db37..3af263e 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
@@ -101,17 +101,18 @@ public class TestHASLAService extends ZKXTestCase {
 
             // Case 1 workflow job submitted to dummy server,
             // but before start running, the dummy server is down
-            createWorkflow("job-1");
-            SLARegistrationBean sla1 = TestSLAService._createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+            WorkflowJobBean wfJob1 = createWorkflow("job-1-W");
+            SLARegistrationBean sla1 = TestSLAService._createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
             sla1.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2 hr before
             sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 * 1000)); // 1 hr before
             sla1.setExpectedDuration(10 * 60 * 1000); // 10 mins
             dummyCalc.addRegistration(sla1.getId(), sla1);
+            dummyCalc.updateAllSlaStatus();
 
             // Case 2. workflow job submitted to dummy server, start running,
             // then the dummy server is down
-            createWorkflow("job-2");
-            SLARegistrationBean sla2 = TestSLAService._createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+            WorkflowJobBean wfJob2 = createWorkflow("job-2-W");
+            SLARegistrationBean sla2 = TestSLAService._createSLARegistration("job-2-W", AppType.WORKFLOW_JOB);
             sla2.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2hr before
             sla2.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); // 1hr ahead
             sla2.setExpectedDuration(10 * 60 * 1000); // 10 mins
@@ -138,6 +139,12 @@ public class TestHASLAService extends ZKXTestCase {
             ehs.new EventWorker().run();
             assertTrue(output.toString().contains(sla1.getId() + " Sla START - MISS!!!"));
 
+            wfJob1.setStatus(WorkflowJob.Status.SUCCEEDED);
+            wfJob1.setEndTime(new Date());
+            wfJob1.setStartTime(new Date());
+            WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                    WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob1);
+
             // Job 1 succeeded on the living server --> duration met and end miss
             slaCalcMem.addJobStatus(sla1.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(),
                     new Date());
@@ -145,6 +152,12 @@ public class TestHASLAService extends ZKXTestCase {
             assertTrue(output.toString().contains(sla1.getId() + " Sla DURATION - MET!!!"));
             assertTrue(output.toString().contains(sla1.getId() + " Sla END - MISS!!!"));
 
+            wfJob2.setStatus(WorkflowJob.Status.SUCCEEDED);
+            wfJob2.setEndTime(new Date());
+            wfJob2.setStartTime(new Date());
+            WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                    WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob2);
+
             // Job 2 succeeded on the living server --> duration met and end met
             slaCalcMem.addJobStatus(sla2.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(),
                     new Date());
@@ -161,13 +174,14 @@ public class TestHASLAService extends ZKXTestCase {
         }
     }
 
-    public void updateCoordAction(String id, String status) throws JPAExecutorException {
+    public CoordinatorActionBean updateCoordAction(String id, String status) throws JPAExecutorException {
         CoordinatorActionBean action = new CoordinatorActionBean();
         action.setId(id);
         action.setStatusStr(status);
         action.setLastModifiedTime(new Date());
         CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
                 action);
+        return action;
     }
 
     public void testSLAUpdateWithHA() throws Exception {
@@ -187,8 +201,8 @@ public class TestHASLAService extends ZKXTestCase {
         createDBEntry(id3, expectedStartTS, expectedEndTS1);
         createDBEntry(id4, expectedStartTS, expectedEndTS1);
         // Coord Action of jobs 5-6 already started and currently running (to test history set)
-        createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2);
-        createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2);
+        createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2, 1);
+        createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2, 1);
 
         SLAService slas = Services.get().get(SLAService.class);
         SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
@@ -198,7 +212,10 @@ public class TestHASLAService extends ZKXTestCase {
         while (itr.hasNext()) {
             slaMapKeys.add(itr.next());
         }
-        assertEquals(6, slaMapKeys.size());
+        //4 jobs expected end is not yet reached
+        //2 jobs has end miss, waiting for job to complete
+        assertEquals(4, slaMapKeys.size());
+        assertEquals(2, slaCalcMem.getHistorySet().size());
 
         DummyZKOozie dummyOozie_1 = null;
         try {
@@ -214,8 +231,8 @@ public class TestHASLAService extends ZKXTestCase {
             while (itr.hasNext()) {
                 slaMapKeys.add(itr.next());
             }
-            assertEquals(6, slaMapKeys.size());
-
+            assertEquals(4, slaMapKeys.size());
+            assertEquals(2, dummySlaCalcMem.getHistorySet().size());
             // Coord Action 1,3 run and update status on *non-dummy* server
             updateCoordAction(id1, "RUNNING");
             slaCalcMem
@@ -314,14 +331,12 @@ public class TestHASLAService extends ZKXTestCase {
 
     public void testNoDuplicateEventsInHA() throws Exception {
         String id1 = "0000001-130521183438837-oozie-test-C@1";
-        Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // get MISS
-        Date expectedEndTS = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // get MISS
-        createDBEntry(id1, expectedStartTS, expectedEndTS);
 
         SLAService slas = Services.get().get(SLAService.class);
         SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
         slaCalcMem.init(Services.get().getConf()); // loads the job in sla map
 
+
         EventHandlerService ehs = Services.get().get(EventHandlerService.class);
         EventQueue ehs_q = ehs.getEventQueue();
 
@@ -336,20 +351,30 @@ public class TestHASLAService extends ZKXTestCase {
             dummyEhs.init(Services.get());
             EventQueue dummyEhs_q = dummyEhs.getEventQueue();
 
+            Date expectedStartTS = new Date(System.currentTimeMillis() + 2 * 3600 * 1000); // get MISS
+            Date expectedEndTS = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // get MISS
+            SLASummaryBean sla = createDBEntryForStarted(id1, expectedStartTS, expectedEndTS, 0);
+            sla.setExpectedDuration(-1);
+            sla.setLastModifiedTime(new Date());
+            SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
+                    sla);
+
             // Action started on Server 1
             updateCoordAction(id1, "RUNNING");
+
             slaCalcMem
                     .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null);
+            assertEquals(1, ehs_q.size());
             SLACalcStatus s1 = (SLACalcStatus) ehs_q.poll();
-            assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus());
 
+            assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus());
             // Action ended on Server 2
             updateCoordAction(id1, "FAILED");
             dummySlaCalcMem.addJobStatus(id1, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, new Date(
                     System.currentTimeMillis() - 1800 * 1000),
                     new Date());
-            dummyEhs_q.poll(); // getting rid of the duration_miss event
             SLACalcStatus s2 = (SLACalcStatus) dummyEhs_q.poll();
+
             assertEquals(SLAStatus.MISS, s2.getSLAStatus());
 
             slaCalcMem.updateAllSlaStatus();
@@ -464,7 +489,8 @@ public class TestHASLAService extends ZKXTestCase {
         BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
     }
 
-    private void createDBEntryForStarted(String actionId, Date expectedStartTS, Date expectedEndTS) throws Exception {
+    private SLASummaryBean createDBEntryForStarted(String actionId, Date expectedStartTS, Date expectedEndTS,
+            int eventProcessed) throws Exception {
         ArrayList<JsonBean> insertList = new ArrayList<JsonBean>();
         Date modTime = new Date();
         WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
@@ -494,7 +520,7 @@ public class TestHASLAService extends ZKXTestCase {
         sla.setAppType(AppType.COORDINATOR_ACTION);
         sla.setJobStatus("RUNNING");
         sla.setSLAStatus(SLAStatus.IN_PROCESS);
-        sla.setEventProcessed(1);
+        sla.setEventProcessed(eventProcessed);
         sla.setLastModifiedTime(modTime);
         sla.setExpectedStart(expectedStartTS);
         sla.setActualStart(expectedStartTS);
@@ -508,9 +534,10 @@ public class TestHASLAService extends ZKXTestCase {
         insertList.add(sla);
         insertList.add(reg);
         BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+        return sla;
     }
 
-    private void createWorkflow(String id) throws Exception {
+    private WorkflowJobBean createWorkflow(String id) throws Exception {
         List<JsonBean> insertList = new ArrayList<JsonBean>();
         WorkflowJobBean workflow = new WorkflowJobBean();
         workflow.setId(id);
@@ -519,6 +546,7 @@ public class TestHASLAService extends ZKXTestCase {
         workflow.setSlaXml("<sla></sla>");
         insertList.add(workflow);
         BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+        return workflow;
     }
 
     public static class DummySLAEventListener extends SLAEventListener {