You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ry...@apache.org on 2014/06/18 09:19:58 UTC

[1/2] OOZIE-1678 HA support for SLA (ryota)

Repository: oozie
Updated Branches:
  refs/heads/master d5f1e3864 -> 4e015d45e


http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
index 2e170a4..fe7fa97 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
@@ -21,7 +21,9 @@ import java.util.Date;
 
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
+
 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -49,6 +51,19 @@ public class TestSLASummaryQueryExecutor extends XDataTestCase {
     public void testGetQuery() throws Exception {
         EntityManager em = jpaService.getEntityManager();
         SLASummaryBean bean = addRecordToSLASummaryTable("test-sla-summary", SLAStatus.IN_PROCESS);
+        // GET_SLA_SUMMARY
+        Query query = SLASummaryQueryExecutor.getInstance().getSelectQuery(SLASummaryQuery.GET_SLA_SUMMARY, em,
+                bean.getId());
+        assertEquals(query.getParameterValue("id"), bean.getId());
+        // GET_SLA_SUMMARY_EVENTPROCESSED
+        query = SLASummaryQueryExecutor.getInstance().getSelectQuery(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED,
+                em, bean.getId());
+        assertEquals(query.getParameterValue("id"), bean.getId());
+    }
+
+    public void testUpdateQuery() throws Exception {
+        EntityManager em = jpaService.getEntityManager();
+        SLASummaryBean bean = addRecordToSLASummaryTable("test-sla-summary", SLAStatus.IN_PROCESS);
 
         // UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES
         Query query = SLASummaryQueryExecutor.getInstance().getUpdateQuery(
@@ -107,12 +122,38 @@ public class TestSLASummaryQueryExecutor extends XDataTestCase {
         assertEquals(bean.getActualEndTimestamp(), retBean.getActualEndTimestamp());
         assertEquals(SLAStatus.MET, retBean.getSLAStatus());
         assertEquals(createdTime, retBean.getCreatedTime()); // Created time should not be updated
+
+        //test UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES
+        bean = addRecordToSLASummaryTable("test-sla-summary", SLAStatus.IN_PROCESS);
+        bean.setActualStart(startTime);
+        bean.setActualStart(endTime);
+        bean.setActualDuration(endTime.getTime() - startTime.getTime());
+        bean.setLastModifiedTime(new Date());
+        bean.setEventProcessed(8);
+        SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, bean);
+        retBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, bean.getId());
+        assertEquals(bean.getActualStartTimestamp(), retBean.getActualStartTimestamp());
+        assertEquals(bean.getActualEndTimestamp(), retBean.getActualEndTimestamp());
+        assertEquals(bean.getActualDuration(), retBean.getActualDuration());
+        assertEquals(bean.getLastModifiedTimestamp(), retBean.getLastModifiedTimestamp());
+        assertEquals(bean.getEventProcessed(), retBean.getEventProcessed());
     }
 
     public void testGet() throws Exception {
-        // TODO
+        SLASummaryBean bean = addRecordToSLASummaryTable("test-sla-summary", SLAStatus.IN_PROCESS);
+        //GET_SLA_REG_ON_RESTART
+        SLASummaryBean sBean = SLASummaryQueryExecutor.getInstance().get(
+                SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, bean.getId());
+        assertEquals(bean.getEventProcessed(), sBean.getEventProcessed());
     }
 
+    public void testGetValue() throws Exception {
+        SLASummaryBean bean = addRecordToSLASummaryTable("test-sla-summary", SLAStatus.IN_PROCESS);
+        //GET_SLA_REG_ON_RESTART
+        Object ret  = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue(
+                SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, bean.getId());
+        assertEquals(bean.getEventProcessed(), ((Byte)ret).byteValue());
+    }
     public void testGetList() throws Exception {
         // TODO
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
index 9e0f03b..7a10685 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
@@ -202,6 +202,15 @@ public class TestWorkflowJobQueryExecutor extends XDataTestCase {
         assertNull(retBean.getProtoActionConf());
         assertNull(retBean.getSlaXml());
         assertNull(retBean.getConf());
+        // GET_WORKFLOW_START_END_TIME
+        retBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, bean.getId());
+        assertEquals(bean.getId(), retBean.getId());
+        assertEquals(bean.getStartTime().getTime(), retBean.getStartTime().getTime());
+        assertEquals(bean.getEndTime().getTime(), retBean.getEndTime().getTime());
+        assertNull(retBean.getWorkflowInstance());
+        assertNull(retBean.getProtoActionConf());
+        assertNull(retBean.getSlaXml());
+        assertNull(retBean.getConf());
         // GET_WORKFLOW_USER_GROUP
         retBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_USER_GROUP, bean.getId());
         assertEquals(bean.getUser(), retBean.getUser());

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
index ea36720..7257433 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
@@ -60,7 +60,6 @@ public class TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor ext
         days = 1;
         assertEquals(0, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
         days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob1.getEndTime());
-        System.out.println("Debug: days " + days);
         assertEquals(1, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
 
         WorkflowJobBean subwfJob2 = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED, wfJobId);

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
new file mode 100644
index 0000000..eec5369
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
@@ -0,0 +1,446 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.AppType;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.sla.SLACalculatorMemory;
+import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.sla.SLASummaryBean;
+import org.apache.oozie.sla.TestSLAService;
+import org.apache.oozie.sla.listener.SLAEventListener;
+import org.apache.oozie.sla.service.SLAService;
+import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestHASLAService extends ZKXTestCase {
+
+    private static StringBuilder output = new StringBuilder();
+
+    protected void setUp() throws Exception {
+        Configuration conf = new Configuration(false);
+        conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService,"
+                + "org.apache.oozie.sla.service.SLAService");
+        conf.setClass(EventHandlerService.CONF_LISTENERS, DummySLAEventListener.class, SLAEventListener.class);
+        conf.setLong(SLAService.CONF_JOB_EVENT_LATENCY, 0);
+        // manually do check in this test
+        conf.setInt(SLAService.CONF_SLA_CHECK_INITIAL_DELAY, 100000);
+        conf.setInt(SLAService.CONF_SLA_CHECK_INTERVAL, 100000);
+        conf.setInt(EventHandlerService.CONF_WORKER_THREADS, 0);
+        super.setUp(conf);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    public void testSLAFailOverWithHA() throws Exception {
+
+        SLAService slas = Services.get().get(SLAService.class);
+        SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
+        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+
+        // start another dummy oozie instance (dummy sla and eventhandler
+        // services)
+        DummyZKOozie dummyOozie_1 = null;
+        try {
+            dummyOozie_1 = new DummyZKOozie("a", "http://blah");
+            DummySLACalculatorMemory dummyCalc = new DummySLACalculatorMemory();
+            EventHandlerService dummyEhs = new EventHandlerService();
+            dummyCalc.setEventHandlerService(dummyEhs);
+            dummyEhs.init(Services.get());
+            dummyCalc.init(Services.get().getConf());
+
+            // Case 1 workflow job submitted to dummy server,
+            // but before start running, the dummy server is down
+            createWorkflow("job-1");
+            SLARegistrationBean sla1 = TestSLAService._createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+            sla1.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2 hr before
+            sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 * 1000)); // 1 hr before
+            sla1.setExpectedDuration(10 * 60 * 1000); // 10 mins
+            dummyCalc.addRegistration(sla1.getId(), sla1);
+
+            // Case 2. workflow job submitted to dummy server, start running,
+            // then the dummy server is down
+            createWorkflow("job-2");
+            SLARegistrationBean sla2 = TestSLAService._createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+            sla2.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2hr before
+            sla2.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); // 1hr ahead
+            sla2.setExpectedDuration(10 * 60 * 1000); // 10 mins
+            dummyCalc.addRegistration(sla2.getId(), sla2);
+            dummyCalc.addJobStatus(sla2.getId(), WorkflowJob.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
+                    new Date());
+
+            dummyCalc.updateAllSlaStatus();
+            dummyEhs.new EventWorker().run();
+            assertTrue(output.toString().contains(sla2.getId() + " Sla START - MISS!!!"));
+
+            // suppose dummy Server is down
+            dummyCalc.clear();
+            dummyCalc = null;
+            dummyOozie_1.teardown();
+
+            slaCalcMem.updateAllSlaStatus();
+
+            // Job 1 started running on the living server --> start miss
+            slaCalcMem.addJobStatus(sla1.getId(), WorkflowJob.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
+                    new Date());
+            // job 1 is added to slamap of living oozie server
+            assertNotNull(slaCalcMem.get(sla1.getId()));
+            ehs.new EventWorker().run();
+            assertTrue(output.toString().contains(sla1.getId() + " Sla START - MISS!!!"));
+
+            // Job 1 succeeded on the living server --> duration met and end miss
+            slaCalcMem.addJobStatus(sla1.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(),
+                    new Date());
+            ehs.new EventWorker().run();
+            assertTrue(output.toString().contains(sla1.getId() + " Sla DURATION - MET!!!"));
+            assertTrue(output.toString().contains(sla1.getId() + " Sla END - MISS!!!"));
+
+            // Job 2 succeeded on the living server --> duration met and end met
+            slaCalcMem.addJobStatus(sla2.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(),
+                    new Date());
+            // eventProc >= 7(already processed duration/end met), should be removed from slaMap
+            assertNull(slaCalcMem.get(sla2.getId()));
+            ehs.new EventWorker().run();
+            assertTrue(output.toString().contains(sla2.getId() + " Sla DURATION - MET!!!"));
+            assertTrue(output.toString().contains(sla2.getId() + " Sla END - MET!!!"));
+        }
+        finally {
+            if (dummyOozie_1 != null) {
+                dummyOozie_1.teardown();
+            }
+        }
+    }
+
+    public void updateCoordAction(String id, String status) throws JPAExecutorException {
+        CoordinatorActionBean action = new CoordinatorActionBean();
+        action.setId(id);
+        action.setStatusStr(status);
+        action.setLastModifiedTime(new Date());
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
+                action);
+    }
+
+    public void testSLAUpdateWithHA() throws Exception {
+
+        String id1 = "0000000-130521183438837-oozie-test-C@1";
+        String id2 = "0000001-130521183438837-oozie-test-C@1";
+        String id3 = "0000002-130521183438837-oozie-test-C@1";
+        String id4 = "0000003-130521183438837-oozie-test-C@1";
+        String id5 = "0000004-130521183438837-oozie-test-C@1";
+        String id6 = "0000005-130521183438837-oozie-test-C@1";
+        Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000);
+        Date expectedEndTS1 = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // 1 hour ahead
+        Date expectedEndTS2 = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // 1 hour passed
+        // Coord Action 1-4 not started yet
+        createDBEntry(id1, expectedStartTS, expectedEndTS1);
+        createDBEntry(id2, expectedStartTS, expectedEndTS1);
+        createDBEntry(id3, expectedStartTS, expectedEndTS1);
+        createDBEntry(id4, expectedStartTS, expectedEndTS1);
+        // Coord Action 5-6 already started and currently running (to test history set)
+        createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2);
+        createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2);
+
+        SLAService slas = Services.get().get(SLAService.class);
+        SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
+        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+        slaCalcMem.init(Services.get().getConf());
+        List<String> slaMapKeys = new ArrayList<String>();
+        Iterator<String> itr = slaCalcMem.iterator();
+        while (itr.hasNext()) {
+            slaMapKeys.add(itr.next());
+        }
+        assertEquals(6, slaMapKeys.size());
+
+        DummyZKOozie dummyOozie_1 = null;
+        try {
+            // start another dummy oozie instance (dummy sla and event handler services)
+            dummyOozie_1 = new DummyZKOozie("a", "http://blah");
+            DummySLACalculatorMemory dummySlaCalcMem = new DummySLACalculatorMemory();
+            EventHandlerService dummyEhs = new EventHandlerService();
+            dummySlaCalcMem.setEventHandlerService(dummyEhs);
+            dummyEhs.init(Services.get());
+            dummySlaCalcMem.init(Services.get().getConf());
+            slaMapKeys = new ArrayList<String>();
+            itr = dummySlaCalcMem.iterator();
+            while (itr.hasNext()) {
+                slaMapKeys.add(itr.next());
+            }
+            assertEquals(6, slaMapKeys.size());
+
+            // Coord Action 1,3 run and update status on non-dummy server
+            updateCoordAction(id1, "RUNNING");
+            slaCalcMem
+                    .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null);
+            updateCoordAction(id3, "FAILED");
+            slaCalcMem.addJobStatus(id3, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, null, new Date());
+
+            // Coord Action 2,4 run and update status on dummy server
+            updateCoordAction(id2, "RUNNING");
+            dummySlaCalcMem.addJobStatus(id2, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
+                    null);
+            updateCoordAction(id4, "FAILED");
+            dummySlaCalcMem.addJobStatus(id4, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, null,
+                    new Date());
+
+            // Both servers iterate SlaMap (updateAllSlaStatus)
+            slaCalcMem.updateAllSlaStatus();
+            dummySlaCalcMem.updateAllSlaStatus();
+
+            // SlaMap on both Servers synced
+            SLACalcStatus sla1_nodummy = slaCalcMem.get(id1);
+            SLACalcStatus sla1_dummy = dummySlaCalcMem.get(id1);
+            SLACalcStatus sla2_nodummy = slaCalcMem.get(id2);
+            SLACalcStatus sla2_dummy = dummySlaCalcMem.get(id2);
+            assertEquals(1, sla1_nodummy.getEventProcessed());
+            assertEquals(1, sla1_dummy.getEventProcessed());
+            assertEquals(1, sla2_dummy.getEventProcessed());
+            assertEquals(1, sla2_nodummy.getEventProcessed());
+            assertFalse(slaCalcMem.isJobIdInSLAMap(id3));
+            assertFalse(dummySlaCalcMem.isJobIdInSLAMap(id3));
+            assertFalse(slaCalcMem.isJobIdInSLAMap(id4));
+            assertFalse(dummySlaCalcMem.isJobIdInSLAMap(id4));
+
+            Byte eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id3);
+            assertEquals(8, eventProc.intValue());
+            eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id4);
+            assertEquals(8, eventProc.intValue());
+
+            // Action 5 was processed as END_MISS in updateAllSlaStatus, put into history set
+            assertTrue(slaCalcMem.isJobIdInHistorySet(id5));
+            assertTrue(dummySlaCalcMem.isJobIdInHistorySet(id6));
+            // Action 6 was processed as END_MISS in updateAllSlaStatus, put into history set
+            assertTrue(slaCalcMem.isJobIdInHistorySet(id5));
+            assertTrue(dummySlaCalcMem.isJobIdInHistorySet(id6));
+
+            eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id5);
+            assertEquals(7, eventProc.intValue());
+            eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id6);
+            assertEquals(7, eventProc.intValue());
+
+            // Action 1 Succeeded on non-dummy server
+            updateCoordAction(id1, "SUCCEEDED");
+            slaCalcMem.addJobStatus(id1, CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(
+                    System.currentTimeMillis() - 1800 * 1000), new Date());
+
+            // Action 2 Succeeded on dummy server
+            updateCoordAction(id2, "SUCCEEDED");
+            dummySlaCalcMem.addJobStatus(id2, CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(
+                    System.currentTimeMillis() - 1800 * 1000), new Date());
+
+            // Both servers iterate SlaMap (updateAllSlaStatus)
+            slaCalcMem.updateAllSlaStatus();
+            dummySlaCalcMem.updateAllSlaStatus();
+
+            // Action 1, 2 are removed from both servers
+            assertNull(slaCalcMem.get(id1));
+            assertNull(dummySlaCalcMem.get(id1));
+            assertNull(slaCalcMem.get(id2));
+            assertNull(dummySlaCalcMem.get(id2));
+            eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id1);
+            assertEquals(8, eventProc.intValue());
+            eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id2);
+            assertEquals(8, eventProc.intValue());
+
+            // Test HistoryPurgeWorker purges Action 5,6 from history set
+            updateCoordAction(id5, "SUCCEEDED");
+            slaCalcMem.new HistoryPurgeWorker().run();
+            assertFalse(slaCalcMem.isJobIdInHistorySet(id5));
+            updateCoordAction(id6, "SUCCEEDED");
+            dummySlaCalcMem.new HistoryPurgeWorker().run();
+            assertFalse(slaCalcMem.isJobIdInHistorySet(id6));
+
+        }
+        finally {
+            if (dummyOozie_1 != null) {
+                dummyOozie_1.teardown();
+            }
+        }
+    }
+
+    private void createDBEntry(String actionId, Date expectedStartTS, Date expectedEndTS) throws Exception {
+        ArrayList<JsonBean> insertList = new ArrayList<JsonBean>();
+        CoordinatorActionBean coordAction = new CoordinatorActionBean();
+        Date modTime = new Date(System.currentTimeMillis() - 1000 * 3600 * 2);
+        coordAction.setId(actionId);
+        coordAction.setJobId(actionId.split("@", -1)[0]);
+        coordAction.setStatusStr("WAITING");
+        coordAction.setLastModifiedTime(modTime);
+
+        CoordinatorJobBean coordJob = new CoordinatorJobBean();
+        coordJob.setId(actionId.split("@", -1)[0]);
+        coordJob.setUser("dummy");
+        coordJob.setAppName("dummy");
+        coordJob.setStatusStr("RUNNING");
+        coordJob.setAppNamespace("dummy");
+
+        SLASummaryBean sla = new SLASummaryBean();
+        sla.setId(actionId);
+        sla.setAppType(AppType.COORDINATOR_ACTION);
+        sla.setJobStatus("WAITING");
+        sla.setSLAStatus(SLAStatus.NOT_STARTED);
+        sla.setEventProcessed(0);
+        sla.setLastModifiedTime(modTime);
+        sla.setExpectedStart(expectedStartTS);
+        sla.setExpectedEnd(expectedEndTS);
+        sla.setExpectedDuration(10 * 60 * 1000);
+        SLARegistrationBean reg = new SLARegistrationBean();
+        reg.setId(actionId);
+        insertList.add(coordAction);
+        insertList.add(coordJob);
+        insertList.add(sla);
+        insertList.add(reg);
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+    }
+
+    private void createDBEntryForStarted(String actionId, Date expectedStartTS, Date expectedEndTS) throws Exception {
+        ArrayList<JsonBean> insertList = new ArrayList<JsonBean>();
+        Date modTime = new Date();
+        WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
+                actionId);
+        wf.setStatus(wf.getStatus());
+        wf.setStartTime(expectedStartTS);
+        wf.setLastModifiedTime(modTime);
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wf);
+
+        CoordinatorActionBean coordAction = new CoordinatorActionBean();
+        coordAction.setId(actionId);
+        coordAction.setJobId(actionId.split("@", -1)[0]);
+        coordAction.setStatusStr("RUNNING");
+        coordAction.setLastModifiedTime(modTime);
+        coordAction.setExternalId(wf.getId());
+
+        CoordinatorJobBean coordJob = new CoordinatorJobBean();
+        coordJob.setId(actionId.split("@", -1)[0]);
+        coordJob.setUser("dummy");
+        coordJob.setAppName("dummy");
+        coordJob.setStatusStr("RUNNING");
+        coordJob.setAppNamespace("dummy");
+
+        SLASummaryBean sla = new SLASummaryBean();
+        sla.setId(actionId);
+        sla.setAppType(AppType.COORDINATOR_ACTION);
+        sla.setJobStatus("RUNNING");
+        sla.setSLAStatus(SLAStatus.IN_PROCESS);
+        sla.setEventProcessed(1);
+        sla.setLastModifiedTime(modTime);
+        sla.setExpectedStart(expectedStartTS);
+        sla.setActualStart(expectedStartTS);
+        sla.setExpectedEnd(expectedEndTS);
+        sla.setExpectedDuration(10 * 60 * 1000);
+        SLARegistrationBean reg = new SLARegistrationBean();
+        reg.setId(actionId);
+        insertList.add(coordAction);
+
+        insertList.add(coordJob);
+        insertList.add(sla);
+        insertList.add(reg);
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+    }
+
+    private void createWorkflow(String id) throws Exception {
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        WorkflowJobBean workflow = new WorkflowJobBean();
+        workflow.setId(id);
+        workflow.setStatusStr("PREP");
+        workflow.setStartTime(new Date());
+        workflow.setSlaXml("<sla></sla>");
+        insertList.add(workflow);
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+    }
+
+    public static class DummySLAEventListener extends SLAEventListener {
+
+        @Override
+        public void onStartMet(SLAEvent sla) {
+            output.append(sla.getId() + " Sla START - MET!!!");
+        }
+
+        @Override
+        public void onStartMiss(SLAEvent sla) {
+            output.append(sla.getId() + " Sla START - MISS!!!");
+        }
+
+        @Override
+        public void onEndMet(SLAEvent sla) {
+            output.append(sla.getId() + " Sla END - MET!!!");
+        }
+
+        @Override
+        public void onEndMiss(SLAEvent sla) {
+            output.append(sla.getId() + " Sla END - MISS!!!");
+        }
+
+        @Override
+        public void onDurationMet(SLAEvent sla) {
+            output.append(sla.getId() + " Sla DURATION - MET!!!");
+        }
+
+        @Override
+        public void onDurationMiss(SLAEvent sla) {
+            output.append(sla.getId() + " Sla DURATION - MISS!!!");
+        }
+
+        @Override
+        public void init(Configuration conf) {
+        }
+
+        @Override
+        public void destroy() {
+        }
+    }
+}
+
+class DummySLACalculatorMemory extends SLACalculatorMemory {
+    public void setEventHandlerService(EventHandlerService ehs) {
+        this.eventHandler = ehs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
index 9270aa2..2ae35ad 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
@@ -31,7 +31,6 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
@@ -88,9 +87,7 @@ public class TestSLACalculationJPAExecutor extends XDataTestCase {
         List<JsonBean> insertList = new ArrayList<JsonBean>();
         insertList.add(bean2);
         BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
-
-        SLASummaryGetJPAExecutor readCmd2 = new SLASummaryGetJPAExecutor(wfId);
-        SLASummaryBean sBean = jpaService.execute(readCmd2);
+        SLASummaryBean sBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, wfId);
         assertEquals(wfId, sBean.getId());
         assertEquals("RUNNING", sBean.getJobStatus());
         assertEquals(EventStatus.START_MISS, sBean.getEventStatus());
@@ -136,8 +133,7 @@ public class TestSLACalculationJPAExecutor extends XDataTestCase {
         List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
         SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, bean2);
 
-        SLASummaryGetJPAExecutor readCmd2 = new SLASummaryGetJPAExecutor(wfId);
-        SLASummaryBean sBean = jpaService.execute(readCmd2);
+        SLASummaryBean sBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, wfId);
         // check updated + original fields
         assertEquals(wfId, sBean.getId());
         assertEquals(EventStatus.DURATION_MISS, sBean.getEventStatus());
@@ -195,14 +191,13 @@ public class TestSLACalculationJPAExecutor extends XDataTestCase {
         FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
 
         // Check whether transactions are rolled back or not
-        SLASummaryGetJPAExecutor readCmd = new SLASummaryGetJPAExecutor(wfId1);
-        SLASummaryBean sBean = jpaService.execute(readCmd);
+        SLASummaryBean sBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, wfId1);
+
         // isSlaProcessed should NOT be changed to 1
         // actualEnd should be null as before
         assertNull(sBean.getActualEnd());
 
-        SLASummaryGetJPAExecutor readCmd1 = new SLASummaryGetJPAExecutor(wfId2);
-        sBean = jpaService.execute(readCmd1);
+        sBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, wfId2);
         assertNull(sBean); //new bean should not have been inserted due to rollback
 
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
index 9a16722..438f2c2 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
@@ -20,7 +20,9 @@ package org.apache.oozie.sla;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.AppType;
@@ -33,16 +35,17 @@ import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.event.SLAEvent;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -75,6 +78,18 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         super.tearDown();
     }
 
+    private void createWorkflow(List<String> idList) throws Exception {
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        for (String id : idList) {
+            WorkflowJobBean workflow = new WorkflowJobBean();
+            workflow.setId(id);
+            workflow.setStatusStr("PREP");
+            workflow.setStartTime(new Date());
+            insertList.add(workflow);
+        }
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+    }
+
     @Test
     public void testLoadOnRestart() throws Exception {
         SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
@@ -85,6 +100,11 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         String jobId2 = slaRegBean2.getId();
         SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3", AppType.WORKFLOW_JOB);
         String jobId3 = slaRegBean3.getId();
+        List<String> idList = new ArrayList<String>();
+        idList.add(slaRegBean1.getId());
+        idList.add(slaRegBean2.getId());
+        idList.add(slaRegBean3.getId());
+        createWorkflow(idList);
 
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
         slaRegBean1.setAppName("app-name");
@@ -166,7 +186,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         assertEquals(2, slaCalcMemory.size()); // 2 out of 3 jobs in map
         slaCalcMemory.addJobStatus(jobId3, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
                 sdf.parse("2011-03-09"), sdf.parse("2011-04-09"));
-        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId3));
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId3);
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(sdf.parse("2011-03-09"), slaSummary.getActualStart());
         assertEquals(sdf.parse("2011-04-09"), slaSummary.getActualEnd());
@@ -193,7 +213,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
 
         SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,slaSummaryBean);
 
-        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
 
         // Simulate a lost success event
         WorkflowJobBean wjb = new WorkflowJobBean();
@@ -209,7 +229,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
 
         // As job succeeded, it should not be in memory
         assertEquals(0, slaCalcMemory.size());
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
         assertEquals("job-1", slaSummary.getId());
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(AppType.WORKFLOW_JOB, slaSummary.getAppType());
@@ -237,7 +257,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         slaCalcMemory.init(new Configuration(false));
 
         assertEquals(0, slaCalcMemory.size());
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
         assertEquals("FAILED", slaSummary.getJobStatus());
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
@@ -303,7 +323,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
 
         // As job succeeded, it should not be in memory
         assertEquals(0, slaCalcMemory.size());
-        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
         assertEquals("job-1", slaSummary.getId());
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(AppType.WORKFLOW_ACTION, slaSummary.getAppType());
@@ -354,7 +374,8 @@ public class TestSLACalculatorMemory extends XDataTestCase {
 
         // As job succeeded, it should not be in memory
         assertEquals(0, slaCalcMemory.size());
-        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
+
         assertEquals("job-1", slaSummary.getId());
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(AppType.COORDINATOR_ACTION, slaSummary.getAppType());
@@ -379,12 +400,12 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         String jobId = slaRegBean.getId();
         slaCalcMemory.addRegistration(jobId, slaRegBean);
         assertEquals(1, slaCalcMemory.size());
-        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
         assertEquals("PREP", slaSummary.getJobStatus());
         slaCalcMemory.updateJobSla(jobId);
         assertEquals(2, ehs.getEventQueue().size());
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         // both start miss and end miss (101)
         assertEquals(5, slaSummary.getEventProcessed());
         assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
@@ -397,14 +418,14 @@ public class TestSLACalculatorMemory extends XDataTestCase {
                 sdf.parse("2012-01-01"), null);
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUSPENDED.toString(), EventStatus.SUSPEND,
                 sdf.parse("2012-01-01"), null);
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         assertEquals(WorkflowJob.Status.SUSPENDED.toString(), slaSummary.getJobStatus());
         assertEquals(5, slaSummary.getEventProcessed());
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
                 sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
 
         assertEquals(3, ehs.getEventQueue().size());
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         // All events processed and actual times stored (1000)
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
@@ -432,7 +453,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         slaCalcMemory.addRegistration(jobId, slaRegBean);
         assertEquals(1, slaCalcMemory.size());
         slaCalcMemory.updateJobSla(jobId);
-        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         // Duration bit should be processed as expected duration is not set
         assertEquals(3, slaSummary.getEventProcessed());
         // check only start event in queue
@@ -446,7 +467,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
                 sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         // all should be processed
         assertEquals(8, slaSummary.getEventProcessed());
         // check only end event is in queue
@@ -466,7 +487,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
 
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null,
                 sdf.parse("2012-01-02"));
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         // Actual start null, so all events processed
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(1, ehs.getEventQueue().size());
@@ -491,13 +512,15 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         String jobId = slaRegBean.getId();
         slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
         slaCalcMemory.updateJobSla(jobId);
-        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+
         assertEquals(1, slaSummary.getEventProcessed());
         assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
                 new Date(System.currentTimeMillis()), null);
         slaCalcMemory.updateJobSla(jobId);
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+
         assertEquals(1, slaSummary.getEventProcessed());
         assertEquals(SLAStatus.IN_PROCESS, slaSummary.getSLAStatus());
         assertEquals(WorkflowJob.Status.RUNNING.toString(), slaSummary.getJobStatus());
@@ -518,19 +541,20 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         String jobId = slaRegBean.getId();
         slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
         slaCalcMemory.updateJobSla(jobId);
-        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
-        slaRegBean = jpaService.execute(new SLARegistrationGetJPAExecutor(jobId));
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+        slaRegBean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
         assertNotNull(slaRegBean.getCreatedTimestamp());
         assertEquals(slaRegBean.getCreatedTimestamp(), slaSummary.getCreatedTimestamp());
         // Only end sla should be processed (100)
         assertEquals(4, slaSummary.getEventProcessed());
         slaCalcMemory.updateJobSla(jobId);
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         assertEquals(4, slaSummary.getEventProcessed());
         assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
                 new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+
         // Only Duration sla should be processed as end is already processed
         // (110)
         assertEquals(6, slaSummary.getEventProcessed());
@@ -538,7 +562,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         // Recieve start event
         assertTrue(slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
                 new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)));
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         // Start event received so all bits should be processed (111)
         assertEquals(8, slaSummary.getEventProcessed());
         assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
@@ -563,7 +587,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
             slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
                     System.currentTimeMillis() - 3600 * 1000), null);
             slaCalcMemory.updateJobSla(jobId);
-            SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+            SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
             // The actual end times are not stored, but sla's processed so (111)
             assertEquals(7, slaSummary.getEventProcessed());
             // Moved from map to history set
@@ -571,11 +595,10 @@ public class TestSLACalculatorMemory extends XDataTestCase {
             // Add terminal state event so actual end time is stored
             slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, new Date(
                     System.currentTimeMillis() - 3600 * 1000), new Date(System.currentTimeMillis()));
-            slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+            slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
             // The actual times are stored, so event processed(1000)
             assertEquals(8, slaSummary.getEventProcessed());
             assertEquals(3, ehs.getEventQueue().size());
-
     }
 
     private SLARegistrationBean _createSLARegistration(String jobId, AppType appType) {
@@ -585,4 +608,31 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         return bean;
     }
 
+    public void testHistoryPurge() throws Exception{
+        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+        WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
+        Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000);
+        slaRegBean.setExpectedStart(startTime); // 1 hour back
+        slaRegBean.setExpectedDuration(1000);
+        slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
+        String jobId = slaRegBean.getId();
+        slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+        slaCalcMemory.updateJobSla(jobId);
+        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
+                System.currentTimeMillis() - 3600 * 1000), null);
+        slaCalcMemory.updateJobSla(jobId);
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+        // The actual end times are not stored, but sla's processed so (111)
+        assertEquals(7, slaSummary.getEventProcessed());
+        assertTrue(slaCalcMemory.isJobIdInHistorySet(job1.getId()));
+        job1.setStatusStr("SUCCEEDED");
+        job1.setLastModifiedTime(new Date());
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, job1);
+        slaCalcMemory.new HistoryPurgeWorker().run();
+        assertFalse(slaCalcMemory.isJobIdInHistorySet(job1.getId()));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java b/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
index cb4e434..31c2a47 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
@@ -53,11 +53,12 @@ import org.apache.oozie.event.listener.JobEventListener;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -242,7 +243,7 @@ public class TestSLAEventGeneration extends XDataTestCase {
         assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd()));
 
         // assert for values in summary bean to be reset
-        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
         assertEquals( 0, slaSummary.getEventProcessed());
         assertEquals(-1, slaSummary.getActualDuration());
         assertNull(slaSummary.getActualStart());
@@ -443,6 +444,10 @@ public class TestSLAEventGeneration extends XDataTestCase {
         new CoordActionStartXCommand(actionId, getTestUser(), appName, jobId).call();
         slaEvent = slas.getSLACalculator().get(actionId);
         slaEvent.setEventProcessed(0); //resetting for testing sla event
+        SLASummaryBean suBean = new SLASummaryBean();
+        suBean.setId(actionId);
+        suBean.setEventProcessed(0);
+        SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_EVENTPROCESSED, suBean);
         ehs.new EventWorker().run();
         slaEvent = skipToSLAEvent();
         assertEquals(actionId, slaEvent.getId());
@@ -576,6 +581,10 @@ public class TestSLAEventGeneration extends XDataTestCase {
         new StartXCommand(jobId).call();
         slaEvent = slas.getSLACalculator().get(jobId);
         slaEvent.setEventProcessed(0); //resetting to receive sla events
+        SLASummaryBean suBean = new SLASummaryBean();
+        suBean.setId(jobId);
+        suBean.setEventProcessed(0);
+        SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_EVENTPROCESSED, suBean);
         waitForEventGeneration(200);
         ehs.new EventWorker().run();
         waitForEventGeneration(300);

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java b/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
index 3ce86ab..b033d4d 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
@@ -31,9 +31,9 @@ import org.apache.oozie.event.CoordinatorJobEvent;
 import org.apache.oozie.event.WorkflowActionEvent;
 import org.apache.oozie.event.WorkflowJobEvent;
 import org.apache.oozie.event.listener.JobEventListener;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import org.apache.oozie.service.EventHandlerService;
-import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.sla.listener.SLAJobEventListener;
 import org.apache.oozie.sla.service.SLAService;
@@ -114,7 +114,7 @@ public class TestSLAJobEventListener extends XTestCase {
                 "coord-app-name1", actualStart, actualEnd);
         listener.onCoordinatorJobEvent(cje);
 
-        SLASummaryBean summary = Services.get().get(JPAService.class).execute(new SLASummaryGetJPAExecutor("cj1"));
+        SLASummaryBean summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1");
         // check that end and duration sla has been calculated
         assertEquals(6, summary.getEventProcessed());
 
@@ -130,7 +130,7 @@ public class TestSLAJobEventListener extends XTestCase {
         cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.KILLED, "user1",
                 "coord-app-name1", null, actualEnd, null);
         listener.onCoordinatorActionEvent(cae);
-        summary = Services.get().get(JPAService.class).execute(new SLASummaryGetJPAExecutor("ca1"));
+        summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "ca1");
         // check that all events are processed
         assertEquals(8, summary.getEventProcessed());
         assertEquals(EventStatus.END_MISS, summary.getEventStatus());

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java b/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java
index 40376a5..1c65d12 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java
@@ -25,7 +25,8 @@ import org.apache.oozie.AppType;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.sla.SLARegistrationBean;
@@ -55,8 +56,7 @@ public class TestSLARegistrationGetJPAExecutor extends XDataTestCase {
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
 
-        SLARegistrationGetJPAExecutor readCmd = new SLARegistrationGetJPAExecutor(jobId);
-        SLARegistrationBean bean = jpaService.execute(readCmd);
+        SLARegistrationBean bean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
         assertEquals(jobId, bean.getId());
         assertEquals(AppType.WORKFLOW_JOB, bean.getAppType());
         assertEquals(current, bean.getExpectedStart());

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java b/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
deleted file mode 100644
index cd2fbae..0000000
--- a/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.sla;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.oozie.client.rest.JsonBean;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.SLARegistrationBean;
-import org.apache.oozie.test.XDataTestCase;
-
-public class TestSLARegistrationGetRecordsOnRestartJPAExecutor extends XDataTestCase {
-    Services services;
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        services = new Services();
-        services.init();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        services.destroy();
-        super.tearDown();
-    }
-
-    public void testSLARegistrationGetRecordsOnRestart() throws Exception {
-        Date current = new Date();
-        final String jobId = "0000000-" + current.getTime() + "-TestSLARegGetRestartJPAExecutor-W";
-        SLARegistrationBean reg = new SLARegistrationBean();
-        reg.setId(jobId);
-        reg.setNotificationMsg("dummyMessage");
-        reg.setUpstreamApps("upApps");
-        reg.setAlertEvents("miss");
-        reg.setAlertContact("abc@y.com");
-        reg.setJobData("jobData");
-        JPAService jpaService = Services.get().get(JPAService.class);
-        List<JsonBean> insert = new ArrayList<JsonBean>();
-        insert.add(reg);
-        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insert, null, null);
-        assertNotNull(jpaService);
-        SLARegistrationGetOnRestartJPAExecutor readCmd = new SLARegistrationGetOnRestartJPAExecutor(jobId);
-        SLARegistrationBean bean = jpaService.execute(readCmd);
-        assertEquals("dummyMessage", bean.getNotificationMsg());
-        assertEquals ("upApps", bean.getUpstreamApps());
-        assertEquals ("miss", bean.getAlertEvents());
-        assertEquals ("abc@y.com", bean.getAlertContact());
-        assertEquals ("jobData", bean.getJobData());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAService.java b/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
index 291d850..205bcd1 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
@@ -29,12 +29,14 @@ import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
 import org.apache.oozie.client.event.SLAEvent;
 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -244,7 +246,7 @@ public class TestSLAService extends XDataTestCase {
         assertEventNoDuplicates(output.toString(), action3.getId() + " Sla END - MET!!!");
 
         // negative on MISS after DB check, updated with actual times
-        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(job2.getId()));
+        SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, job2.getId());
         assertEquals(job2.getStartTime(), slaSummary.getActualStart());
         assertEquals(job2.getEndTime(), slaSummary.getActualEnd());
         assertEquals(job2.getEndTime().getTime() - job2.getStartTime().getTime(), slaSummary.getActualDuration());
@@ -255,7 +257,7 @@ public class TestSLAService extends XDataTestCase {
         assertNull(slas.getSLACalculator().get(job2.getId())); //removed from memory
 
         // positives but also updated with actual times immediately after DB check
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(action2.getId()));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, action2.getId());
         extWf = jpaService.execute(new WorkflowJobGetJPAExecutor(action2.getExternalId()));
         assertEquals(extWf.getStartTime(), slaSummary.getActualStart());
         assertEquals(extWf.getEndTime(), slaSummary.getActualEnd());
@@ -266,7 +268,7 @@ public class TestSLAService extends XDataTestCase {
         assertEquals(8, slaSummary.getEventProcessed());
         assertNull(slas.getSLACalculator().get(action2.getId())); //removed from memory
 
-        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(action1.getId()));
+        slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, action1.getId());
         extWf = jpaService.execute(new WorkflowJobGetJPAExecutor(action1.getExternalId()));
         assertEquals(extWf.getStartTime(), slaSummary.getActualStart());
         assertEquals(extWf.getEndTime(), slaSummary.getActualEnd());
@@ -292,19 +294,18 @@ public class TestSLAService extends XDataTestCase {
         Element eSla = XmlUtils.parseXml(slaXml);
         SLAOperations.createSlaRegistrationEvent(eSla, "job-id1", "parent-id1", AppType.WORKFLOW_JOB, getTestUser(),
                 "test-appname", log, false);
-        SLARegistrationBean reg = Services.get().get(JPAService.class)
-                .execute(new SLARegistrationGetJPAExecutor("job-id1"));
+        SLARegistrationBean reg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, "job-id1");
         assertEquals("END_MISS", reg.getAlertEvents());
     }
 
-    static SLARegistrationBean _createSLARegistration(String jobId, AppType appType) {
+    public static SLARegistrationBean _createSLARegistration(String jobId, AppType appType) {
         SLARegistrationBean bean = new SLARegistrationBean();
         bean.setId(jobId);
         bean.setAppType(appType);
         return bean;
     }
 
-    private void assertEventNoDuplicates(String outputStr, String eventMsg) {
+    public static void assertEventNoDuplicates(String outputStr, String eventMsg) {
         int index = outputStr.indexOf(eventMsg);
         assertTrue(index != -1);
         //No duplicates

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java b/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
index 3a9e72e..bd543c2 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
@@ -22,16 +22,11 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
-import org.apache.oozie.AppType;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.SLARegistrationBean;
 import org.apache.oozie.test.XDataTestCase;
 
 public class TestSLASummaryGetOnRestartJPAExecutor extends XDataTestCase {

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
index 3d37d48..946a4b9 100644
--- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
@@ -18,7 +18,9 @@
 package org.apache.oozie.test;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -33,6 +35,7 @@ import org.apache.curator.x.discovery.details.InstanceSerializer;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.FixedJsonInstanceSerializer;
 import org.apache.oozie.util.ZKUtils;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Provides a version of XTestCase that also runs a ZooKeeper server and provides some utilities for interacting and simulating ZK
@@ -63,6 +66,23 @@ public abstract class ZKXTestCase extends XDataTestCase {
     protected void setUp() throws Exception {
         super.setUp();
         new Services().init();
+        setUpZK();
+    }
+
+    protected void setUp(Configuration conf) throws Exception {
+        super.setUp();
+        Services services = new Services();
+        if(conf != null && conf.size()>0){
+            for (Iterator<Entry<String, String>> itr = (Iterator<Entry<String, String>>) conf.iterator(); itr.hasNext();) {
+                Entry<String, String> entry = itr.next();
+                services.getConf().set(entry.getKey(), entry.getValue());
+            }
+        }
+        services.init();
+        setUpZK();
+    }
+
+    private void setUpZK() throws Exception {
         zkServer = setupZKServer();
         Services.get().getConf().set("oozie.zookeeper.connection.string", zkServer.getConnectString());
         setSystemProperty("oozie.instance.id", ZK_ID);

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/resources/coord-action-sla.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/coord-action-sla.xml b/core/src/test/resources/coord-action-sla.xml
index 4893c61..ac93862 100644
--- a/core/src/test/resources/coord-action-sla.xml
+++ b/core/src/test/resources/coord-action-sla.xml
@@ -16,7 +16,7 @@
   limitations under the License.
 -->
 <coordinator-app name="test-coord-sla" frequency="${coord:days(1)}"
-                 start="2009-01-01T08:01Z" end="2009-01-02T08:01Z"
+                 start="2009-01-02T08:01Z" end="2009-01-03T08:00Z"
                  timezone="America/Los_Angeles"
                  xmlns="uri:oozie:coordinator:0.4"
                  xmlns:sla="uri:oozie:sla:0.2">

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index e8fa1a8..4ea53ec 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1678 HA support for SLA (ryota)
 OOZIE-1685 Oozie doesn’t process correctly workflows with a non-default name node (benjzh via rohini)
 OOZIE-1875 Add "NONE" to coordinator job execution_order (bzhang)
 OOZIE-1879 Workflow Rerun causes error depending on the order of forked nodes (rkanter)


[2/2] git commit: OOZIE-1678 HA support for SLA (ryota)

Posted by ry...@apache.org.
OOZIE-1678 HA support for SLA (ryota)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4e015d45
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4e015d45
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4e015d45

Branch: refs/heads/master
Commit: 4e015d45ee43008a4b82e9d9cf0b89b0ed894f88
Parents: d5f1e38
Author: egashira <ry...@yahoo.com>
Authored: Wed Jun 18 00:15:04 2014 -0700
Committer: egashira <ry...@yahoo.com>
Committed: Wed Jun 18 00:15:04 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/oozie/WorkflowJobBean.java  |   2 +
 .../command/coord/CoordChangeXCommand.java      |  12 +-
 .../executor/jpa/BundleActionQueryExecutor.java |   7 +
 .../executor/jpa/BundleJobQueryExecutor.java    |   6 +
 .../executor/jpa/CoordActionQueryExecutor.java  |   5 +
 .../executor/jpa/CoordJobQueryExecutor.java     |   5 +
 .../oozie/executor/jpa/QueryExecutor.java       |   3 +
 .../jpa/SLARegistrationQueryExecutor.java       |  55 ++-
 .../executor/jpa/SLASummaryQueryExecutor.java   |  57 ++-
 .../jpa/WorkflowActionQueryExecutor.java        |   6 +
 .../executor/jpa/WorkflowJobQueryExecutor.java  |  14 +
 .../jpa/sla/SLARegistrationGetJPAExecutor.java  |  66 ---
 .../SLARegistrationGetOnRestartJPAExecutor.java |  74 ---
 .../jpa/sla/SLASummaryGetJPAExecutor.java       |  66 ---
 .../org/apache/oozie/service/JPAService.java    |   1 -
 .../oozie/service/JobsConcurrencyService.java   |   1 +
 .../oozie/service/ZKJobsConcurrencyService.java |   1 +
 .../org/apache/oozie/sla/SLACalcStatus.java     |  64 +++
 .../apache/oozie/sla/SLACalculatorMemory.java   | 439 ++++++++++++++----
 .../org/apache/oozie/sla/SLAOperations.java     |   6 +-
 .../org/apache/oozie/sla/SLASummaryBean.java    |   9 +-
 .../apache/oozie/sla/service/SLAService.java    |   9 +-
 .../command/coord/TestCoordChangeXCommand.java  |  18 +-
 .../jpa/TestSLARegistrationQueryExecutor.java   |  51 ++-
 .../jpa/TestSLASummaryQueryExecutor.java        |  43 +-
 .../jpa/TestWorkflowJobQueryExecutor.java       |   9 +
 ...ForPurgeFromWorkflowParentIdJPAExecutor.java |   1 -
 .../apache/oozie/service/TestHASLAService.java  | 446 +++++++++++++++++++
 .../sla/TestSLACalculationJPAExecutor.java      |  15 +-
 .../oozie/sla/TestSLACalculatorMemory.java      | 100 +++--
 .../oozie/sla/TestSLAEventGeneration.java       |  13 +-
 .../oozie/sla/TestSLAJobEventListener.java      |   8 +-
 .../sla/TestSLARegistrationGetJPAExecutor.java  |   6 +-
 ...istrationGetRecordsOnRestartJPAExecutor.java |  72 ---
 .../org/apache/oozie/sla/TestSLAService.java    |  19 +-
 .../TestSLASummaryGetOnRestartJPAExecutor.java  |   5 -
 .../java/org/apache/oozie/test/ZKXTestCase.java |  20 +
 core/src/test/resources/coord-action-sla.xml    |   2 +-
 release-log.txt                                 |   1 +
 39 files changed, 1275 insertions(+), 462 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
index 0b8ee96..5fbee82 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
@@ -89,6 +89,8 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "GET_WORKFLOW_STARTTIME", query = "select w.id, w.startTimestamp from WorkflowJobBean w where w.id = :id"),
 
+    @NamedQuery(name = "GET_WORKFLOW_START_END_TIME", query = "select w.id, w.startTimestamp, w.endTimestamp from WorkflowJobBean w where w.id = :id"),
+
     @NamedQuery(name = "GET_WORKFLOW_USER_GROUP", query = "select w.user, w.group from WorkflowJobBean w where w.id = :id"),
 
     @NamedQuery(name = "GET_WORKFLOW_SUSPEND", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance  from WorkflowJobBean w where w.id = :id"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
index 436b999..805856c 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
@@ -40,10 +41,12 @@ import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.service.JPAService;
@@ -266,12 +269,13 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
                 if (SLAService.isEnabled()) {
                     Services.get().get(SLAService.class).removeRegistration(actionId);
                 }
-                SLARegistrationBean slaReg = jpaService.execute(new SLARegistrationGetJPAExecutor(actionId));
+                SLARegistrationBean slaReg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, actionId);
                 if (slaReg != null) {
                     LOG.debug("Deleting registration bean corresponding to action " + slaReg.getId());
                     deleteList.add(slaReg);
                 }
-                SLASummaryBean slaSummaryBean = jpaService.execute(new SLASummaryGetJPAExecutor(actionId));
+                SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
+                        SLASummaryQuery.GET_SLA_SUMMARY, actionId);
                 if (slaSummaryBean != null) {
                     LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId());
                     deleteList.add(slaSummaryBean);

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
index d2331e8..44f5d87 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
@@ -21,8 +21,10 @@ import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
+
 import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
@@ -206,4 +208,9 @@ public class BundleActionQueryExecutor extends
         }
     }
 
+    @Override
+    public Object getSingleValue(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
+        throw new UnsupportedOperationException();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
index 319aea0..36cd968 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
@@ -20,6 +20,7 @@ package org.apache.oozie.executor.jpa;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
@@ -227,4 +228,9 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ
             instance = null;
         }
     }
+
+    @Override
+    public Object getSingleValue(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index d56af7b..3008393 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -258,4 +258,9 @@ public class CoordActionQueryExecutor extends
             instance = null;
         }
     }
+
+    @Override
+    public Object getSingleValue(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index cddeaf7..04e6e29 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -372,4 +372,9 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
         }
     }
 
+    @Override
+    public Object getSingleValue(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
+        throw new UnsupportedOperationException();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
index 536743b..e319c07 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
@@ -72,4 +72,7 @@ public abstract class QueryExecutor<T, E extends Enum<E>> {
     public abstract Query getSelectQuery(E namedQuery, EntityManager em, Object... parameters)
             throws JPAExecutorException;
 
+    public abstract Object getSingleValue(E namedQuery, Object... parameters)
+            throws JPAExecutorException;
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
index e3b115f..3a43d7e 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.oozie.executor.jpa;
 
 import java.util.List;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
@@ -35,7 +36,9 @@ import com.google.common.annotations.VisibleForTesting;
 public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationBean, SLARegistrationQueryExecutor.SLARegQuery> {
 
     public enum SLARegQuery {
-        UPDATE_SLA_REG_ALL
+        UPDATE_SLA_REG_ALL,
+        GET_SLA_REG_ALL,
+        GET_SLA_REG_ON_RESTART
     };
 
     private static SLARegistrationQueryExecutor instance = new SLARegistrationQueryExecutor();
@@ -86,8 +89,17 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
     @Override
     public Query getSelectQuery(SLARegQuery namedQuery, EntityManager em, Object... parameters)
             throws JPAExecutorException {
-        // TODO
-        return null;
+        Query query = em.createNamedQuery(namedQuery.name());
+        switch (namedQuery) {
+            case GET_SLA_REG_ALL:
+            case GET_SLA_REG_ON_RESTART:
+                query.setParameter("id", parameters[0]);
+                break;
+            default:
+                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+                        + namedQuery.name());
+        }
+        return query;
     }
 
     @Override
@@ -102,8 +114,11 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
     public SLARegistrationBean get(SLARegQuery namedQuery, Object... parameters) throws JPAExecutorException {
         EntityManager em = jpaService.getEntityManager();
         Query query = getSelectQuery(namedQuery, em, parameters);
-        @SuppressWarnings("unchecked")
-        SLARegistrationBean bean = (SLARegistrationBean) jpaService.executeGet(namedQuery.name(), query, em);
+        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
+        if (ret == null && !namedQuery.equals(SLARegQuery.GET_SLA_REG_ALL)) {
+            throw new JPAExecutorException(ErrorCode.E0604, query.toString());
+        }
+        SLARegistrationBean bean = constructBean(namedQuery, ret, parameters);
         return bean;
     }
 
@@ -117,6 +132,32 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
         return beanList;
     }
 
+    private SLARegistrationBean constructBean(SLARegQuery namedQuery, Object ret, Object... parameters)
+            throws JPAExecutorException {
+        SLARegistrationBean bean;
+        Object[] arr;
+        switch (namedQuery) {
+            case GET_SLA_REG_ALL:
+                bean = (SLARegistrationBean) ret;
+                if(bean != null) {
+                    bean.setSlaConfig(bean.getSlaConfig());
+                }
+                break;
+            case GET_SLA_REG_ON_RESTART:
+                bean = new SLARegistrationBean();
+                arr = (Object[]) ret;
+                bean.setNotificationMsg((String) arr[0]);
+                bean.setUpstreamApps((String) arr[1]);
+                bean.setSlaConfig((String) arr[2]);
+                bean.setJobData((String) arr[3]);
+                break;
+            default:
+                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+                        + namedQuery.name());
+        }
+        return bean;
+    }
+
     @VisibleForTesting
     public static void destroy() {
         if (instance != null) {
@@ -125,4 +166,8 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
         }
     }
 
+    @Override
+    public Object getSingleValue(SLARegQuery namedQuery, Object... parameters) throws JPAExecutorException {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
index 79d11ed..f60e489 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
@@ -18,12 +18,15 @@
 package org.apache.oozie.executor.jpa;
 
 import java.util.List;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLARegistrationBean;
 import org.apache.oozie.sla.SLASummaryBean;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -36,8 +39,11 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
 
     public enum SLASummaryQuery {
         UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+        UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES,
         UPDATE_SLA_SUMMARY_ALL,
-        GET_SLA_SUMMARY
+        UPDATE_SLA_SUMMARY_EVENTPROCESSED,
+        GET_SLA_SUMMARY,
+        GET_SLA_SUMMARY_EVENTPROCESSED
     };
 
     private static SLASummaryQueryExecutor instance = new SLASummaryQueryExecutor();
@@ -73,6 +79,14 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
                 query.setParameter("actualEndTS", bean.getActualEndTimestamp());
                 query.setParameter("actualDuration", bean.getActualDuration());
                 break;
+            case UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES:
+                query.setParameter("jobId", bean.getId());
+                query.setParameter("eventProcessed", bean.getEventProcessed());
+                query.setParameter("actualStartTS", bean.getActualStartTimestamp());
+                query.setParameter("actualEndTS", bean.getActualEndTimestamp());
+                query.setParameter("actualDuration", bean.getActualDuration());
+                query.setParameter("lastModifiedTS", bean.getLastModifiedTimestamp());
+                break;
             case UPDATE_SLA_SUMMARY_ALL:
                 query.setParameter("appName", bean.getAppName());
                 query.setParameter("appType", bean.getAppType().toString());
@@ -92,6 +106,10 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
                 query.setParameter("actualStartTS", bean.getActualStartTimestamp());
                 query.setParameter("jobId", bean.getId());
                 break;
+            case UPDATE_SLA_SUMMARY_EVENTPROCESSED:
+                query.setParameter("eventProcessed", bean.getEventProcessed());
+                query.setParameter("jobId", bean.getId());
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
                         + namedQuery.name());
@@ -105,6 +123,7 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
         Query query = em.createNamedQuery(namedQuery.name());
         switch (namedQuery) {
             case GET_SLA_SUMMARY:
+            case GET_SLA_SUMMARY_EVENTPROCESSED:
                 query.setParameter("id", parameters[0]);
                 break;
         }
@@ -123,8 +142,11 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
     public SLASummaryBean get(SLASummaryQuery namedQuery, Object... parameters) throws JPAExecutorException {
         EntityManager em = jpaService.getEntityManager();
         Query query = getSelectQuery(namedQuery, em, parameters);
-        @SuppressWarnings("unchecked")
-        SLASummaryBean bean = (SLASummaryBean) jpaService.executeGet(namedQuery.name(), query, em);
+        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
+        if (ret == null && !namedQuery.equals(SLASummaryQuery.GET_SLA_SUMMARY)) {
+            throw new JPAExecutorException(ErrorCode.E0604, query.toString());
+        }
+        SLASummaryBean bean = constructBean(namedQuery, ret, parameters);
         return bean;
     }
 
@@ -137,6 +159,35 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
         return beanList;
     }
 
+    @Override
+    public Object getSingleValue(SLASummaryQuery namedQuery, Object... parameters) throws JPAExecutorException {
+        EntityManager em = jpaService.getEntityManager();
+        Query query = getSelectQuery(namedQuery, em, parameters);
+        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
+        if (ret == null) {
+            throw new JPAExecutorException(ErrorCode.E0604, query.toString());
+        }
+        return ret;
+    }
+
+    private SLASummaryBean constructBean(SLASummaryQuery namedQuery, Object ret, Object... parameters)
+            throws JPAExecutorException {
+        SLASummaryBean bean;
+        switch (namedQuery) {
+            case GET_SLA_SUMMARY:
+                bean = (SLASummaryBean) ret;
+                break;
+            case GET_SLA_SUMMARY_EVENTPROCESSED:
+                bean = new SLASummaryBean();
+                bean.setEventProcessed(((Byte)ret).intValue());
+                break;
+            default:
+                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+                        + namedQuery.name());
+        }
+        return bean;
+    }
+
     @VisibleForTesting
     public static void destroy() {
         if (instance != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 0c323a3..86f2c1b 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -20,6 +20,7 @@ package org.apache.oozie.executor.jpa;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
@@ -407,4 +408,9 @@ public class WorkflowActionQueryExecutor extends
             instance = null;
         }
     }
+
+    @Override
+    public Object getSingleValue(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
index e7d42e9..e2a9438 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
@@ -50,6 +50,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
         UPDATE_WORKFLOW_RERUN,
         GET_WORKFLOW,
         GET_WORKFLOW_STARTTIME,
+        GET_WORKFLOW_START_END_TIME,
         GET_WORKFLOW_USER_GROUP,
         GET_WORKFLOW_SUSPEND,
         GET_WORKFLOW_ACTION_OP,
@@ -170,6 +171,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
         switch (namedQuery) {
             case GET_WORKFLOW:
             case GET_WORKFLOW_STARTTIME:
+            case GET_WORKFLOW_START_END_TIME:
             case GET_WORKFLOW_USER_GROUP:
             case GET_WORKFLOW_SUSPEND:
             case GET_WORKFLOW_ACTION_OP:
@@ -212,6 +214,13 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
                 bean.setId((String) arr[0]);
                 bean.setStartTime(DateUtils.toDate((Timestamp) arr[1]));
                 break;
+            case GET_WORKFLOW_START_END_TIME:
+                bean = new WorkflowJobBean();
+                arr = (Object[]) ret;
+                bean.setId((String) arr[0]);
+                bean.setStartTime(DateUtils.toDate((Timestamp) arr[1]));
+                bean.setEndTime(DateUtils.toDate((Timestamp) arr[2]));
+                break;
             case GET_WORKFLOW_USER_GROUP:
                 bean = new WorkflowJobBean();
                 arr = (Object[]) ret;
@@ -353,4 +362,9 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
             instance = null;
         }
     }
+
+    @Override
+    public Object getSingleValue(WorkflowJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java
deleted file mode 100644
index 7b4e177..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa.sla;
-
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.executor.jpa.JPAExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.sla.SLARegistrationBean;
-
-/**
- * Load the list of SLARegistrationBean and return the list.
- */
-public class SLARegistrationGetJPAExecutor implements JPAExecutor<SLARegistrationBean> {
-
-    private String id = null;
-
-    public SLARegistrationGetJPAExecutor(String id) {
-        this.id = id;
-    }
-
-    @Override
-    public String getName() {
-        return "SLARegistrationGetJPAExecutor";
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public SLARegistrationBean execute(EntityManager em) throws JPAExecutorException {
-        List<SLARegistrationBean> regBeans;
-        try {
-            Query q = em.createNamedQuery("GET_SLA_REG_ALL");
-            q.setParameter("id", id);
-            regBeans = q.getResultList();
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-        SLARegistrationBean slaRegBean = null;
-        if (regBeans != null && regBeans.size() > 0) {
-            slaRegBean = regBeans.get(0);
-            slaRegBean.setSlaConfig(slaRegBean.getSlaConfig());
-        }
-        return slaRegBean;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java
deleted file mode 100644
index 1510daf..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa.sla;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.executor.jpa.JPAExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.sla.SLARegistrationBean;
-
-/**
- * Load SLARegistrationBean on restart
- */
-public class SLARegistrationGetOnRestartJPAExecutor implements JPAExecutor<SLARegistrationBean> {
-
-    private String id;
-
-    public SLARegistrationGetOnRestartJPAExecutor(String id) {
-        this.id = id;
-    }
-
-    @Override
-    public String getName() {
-        return "SLARegistrationGetOnRestartJPAExecutor";
-    }
-
-    @Override
-    public SLARegistrationBean execute(EntityManager em) throws JPAExecutorException {
-        try {
-            Query q = em.createNamedQuery("GET_SLA_REG_ON_RESTART");
-            q.setParameter("id", id);
-            Object[] obj = (Object[]) q.getSingleResult();
-            return getBeanFromObj(obj);
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-    }
-
-    private SLARegistrationBean getBeanFromObj(Object[] arr) {
-        SLARegistrationBean bean = new SLARegistrationBean();
-        if (arr[0] != null) {
-            bean.setNotificationMsg((String) arr[0]);
-        }
-        if (arr[1] != null) {
-            bean.setUpstreamApps((String) arr[1]);
-        }
-        if (arr[2] != null) {
-            bean.setSlaConfig((String) arr[2]);
-        }
-        if (arr[3] != null) {
-            bean.setJobData((String) arr[3]);
-        }
-        return bean;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetJPAExecutor.java
deleted file mode 100644
index 1f7cb4d..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetJPAExecutor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa.sla;
-
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.executor.jpa.JPAExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.sla.SLASummaryBean;
-
-/**
- * Load the list of SLASummaryBean (for dashboard) and return the list.
- */
-public class SLASummaryGetJPAExecutor implements JPAExecutor<SLASummaryBean> {
-
-    private String id = null;
-
-    public SLASummaryGetJPAExecutor(String id) {
-        this.id = id;
-    }
-
-    @Override
-    public String getName() {
-        return "SLASummaryGetJPAExecutor";
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public SLASummaryBean execute(EntityManager em) throws JPAExecutorException {
-        List<SLASummaryBean> ssBeans;
-        Query q;
-        try {
-            q = em.createNamedQuery("GET_SLA_SUMMARY");
-            q.setParameter("id", id);
-            ssBeans = q.getResultList();
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-        SLASummaryBean slaSummaryBean = null;
-        if (ssBeans != null && ssBeans.size() > 0) {
-            slaSummaryBean = ssBeans.get(0);
-        }
-        return slaSummaryBean;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/service/JPAService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/JPAService.java b/core/src/main/java/org/apache/oozie/service/JPAService.java
index 1b8f53e..857ec29 100644
--- a/core/src/main/java/org/apache/oozie/service/JPAService.java
+++ b/core/src/main/java/org/apache/oozie/service/JPAService.java
@@ -71,7 +71,6 @@ public class JPAService implements Service, Instrumentable {
     public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password";
     public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source";
     public static final String CONF_CONN_PROPERTIES = CONF_PREFIX + "connection.properties";
-
     public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn";
     public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema";
     public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection";

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
index 36adbd6..69025fc 100644
--- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
@@ -20,6 +20,7 @@ package org.apache.oozie.service;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.oozie.util.ConfigUtils;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
index 1d5f4a4..58580e5 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.rest.RestConstants;

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index ea53712..f148db3 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -19,8 +19,19 @@
 package org.apache.oozie.sla;
 
 import java.util.Date;
+
 import org.apache.oozie.AppType;
+import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.lock.LockToken;
+import org.apache.oozie.service.InstrumentationService;
+import org.apache.oozie.service.JobsConcurrencyService;
+import org.apache.oozie.service.MemoryLocksService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.service.SLAService;
+import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.util.XLog;
 
 /**
  * Class used by SLAService to store SLA objects and perform calculations and
@@ -28,6 +39,7 @@ import org.apache.oozie.client.event.SLAEvent;
  */
 public class SLACalcStatus extends SLAEvent {
 
+    public static String SLA_ENTITYKEY_PREFIX = "sla-";
     private SLARegistrationBean regBean;
     private String jobStatus;
     private SLAStatus slaStatus;
@@ -37,6 +49,7 @@ public class SLACalcStatus extends SLAEvent {
     private long actualDuration = -1;
     private Date lastModifiedTime;
     private byte eventProcessed;
+    private LockToken lock;
 
     public SLACalcStatus(SLARegistrationBean reg) {
         this();
@@ -264,4 +277,55 @@ public class SLACalcStatus extends SLAEvent {
         return lastModifiedTime;
     }
 
+    public String getEntityKey() {
+        return SLA_ENTITYKEY_PREFIX + this.getId();
+    }
+    /**
+     * Obtain an exclusive lock on the {link #getEntityKey}.
+     * <p/>
+     * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
+     *
+     * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
+     */
+    public void acquireLock() throws InterruptedException {
+        // only get ZK lock when multiple servers running
+        if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
+            lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
+            if (lock == null) {
+                XLog.getLog(getClass()).debug("Could not aquire lock for [{0}]", getEntityKey());
+            }
+            else {
+                XLog.getLog(getClass()).debug("Acquired lock for [{0}]", getEntityKey());
+            }
+        }
+        else {
+            lock = new DummyToken();
+        }
+    }
+
+    private static class DummyToken implements LockToken {
+        @Override
+        public void release() {
+        }
+    }
+
+    public boolean isLocked() {
+        boolean locked = false;
+        if(lock != null) {
+            locked = true;
+        }
+        return locked;
+    }
+
+    public void releaseLock(){
+        if (lock != null) {
+            lock.release();
+            lock = null;
+            XLog.getLog(getClass()).debug("Released lock for [{0}]", getEntityKey());
+        }
+    }
+
+    public long getLockTimeOut() {
+        return Services.get().getConf().getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000);
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index 618d899..47c723d 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -31,9 +31,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.AppType;
 import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.XException;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
@@ -43,24 +45,37 @@ import org.apache.oozie.client.event.SLAEvent.SLAStatus;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.JobsConcurrencyService;
+import org.apache.oozie.service.MemoryLocksService;
+import org.apache.oozie.service.SchedulerService;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.sla.service.SLAService;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.XLog;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /**
  * Implementation class for SLACalculator that calculates SLA related to
@@ -70,11 +85,11 @@ public class SLACalculatorMemory implements SLACalculator {
 
     private static final XLog LOG = XLog.getLog(SLACalculatorMemory.class);
     // TODO optimization priority based insertion/processing/bumping up-down
-    private static Map<String, SLACalcStatus> slaMap;
-    private static Set<String> historySet;
+    protected Map<String, SLACalcStatus> slaMap;
+    protected static Set<String> historySet;
     private static int capacity;
     private static JPAService jpaService;
-    private EventHandlerService eventHandler;
+    protected EventHandlerService eventHandler;
     private static int modifiedAfter;
     private static long jobEventLatency;
 
@@ -89,7 +104,154 @@ public class SLACalculatorMemory implements SLACalculator {
         // load events modified after
         modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
         loadOnRestart();
+        Runnable purgeThread = new HistoryPurgeWorker();
+        // schedule runnable by default 1 day
+        Services.get()
+                .get(SchedulerService.class)
+                .schedule(purgeThread, 86400, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 86400),
+                        SchedulerService.Unit.SEC);
+    }
+
+    public class HistoryPurgeWorker implements Runnable {
+
+        public HistoryPurgeWorker() {
+        }
+
+        @Override
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                return;
+            }
+            Iterator<String> jobItr = historySet.iterator();
+            while (jobItr.hasNext()) {
+                String jobId = jobItr.next();
+
+                if (jobId.endsWith("-W")) {
+                    WorkflowJobBean wfJob = null;
+                    try {
+                        wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
+                    }
+                    catch (JPAExecutorException e) {
+                        if (e.getErrorCode().equals(ErrorCode.E0604)) {
+                            jobItr.remove();
+                        }
+                        else {
+                            LOG.info("Failed to fetch the workflow job: " + jobId, e);
+                        }
+                    }
+                    if (wfJob != null && wfJob.inTerminalState()) {
+                        try {
+                            updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime());
+                            jobItr.remove();
+                        }
+                        catch (JPAExecutorException e) {
+                            LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
+                        }
 
+                    }
+                }
+                else if (jobId.contains("-W@")) {
+                    WorkflowActionBean wfAction = null;
+                    try {
+                        wfAction = WorkflowActionQueryExecutor.getInstance().get(
+                                WorkflowActionQuery.GET_ACTION_COMPLETED, jobId);
+                    }
+                    catch (JPAExecutorException e) {
+                        if (e.getErrorCode().equals(ErrorCode.E0605)) {
+                            jobItr.remove();
+                        }
+                        else {
+                            LOG.info("Failed to fetch the workflow action: " + jobId, e);
+                        }
+                    }
+                    if (wfAction != null && (wfAction.isComplete() || wfAction.isTerminalWithFailure())) {
+                        try {
+                            updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime());
+                            jobItr.remove();
+                        }
+                        catch (JPAExecutorException e) {
+                            LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
+                        }
+                    }
+                }
+                else if (jobId.contains("-C@")) {
+                    CoordinatorActionBean cAction = null;
+                    try {
+                        cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId);
+                    }
+                    catch (JPAExecutorException e) {
+                        if (e.getErrorCode().equals(ErrorCode.E0605)) {
+                            jobItr.remove();
+                        }
+                        else {
+                            LOG.info("Failed to fetch the coord action: " + jobId, e);
+                        }
+                    }
+                    if (cAction != null && cAction.isTerminalStatus()) {
+                        try {
+                            updateSLASummaryForCoordAction(cAction);
+                            jobItr.remove();
+                        }
+                        catch (JPAExecutorException e) {
+                            XLog.getLog(SLACalculatorMemory.class).info(
+                                    "Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
+                        }
+
+                    }
+                }
+                else if (jobId.endsWith("-C")) {
+                    CoordinatorJobBean cJob = null;
+                    try {
+                        cJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID,
+                                jobId);
+                    }
+                    catch (JPAExecutorException e) {
+                        if (e.getErrorCode().equals(ErrorCode.E0604)) {
+                            jobItr.remove();
+                        }
+                        else {
+                            LOG.info("Failed to fetch the coord job: " + jobId, e);
+                        }
+                    }
+                    if (cJob != null && cJob.isTerminalStatus()) {
+                        try {
+                            updateSLASummary(cJob.getId(), cJob.getStartTime(), cJob.getEndTime());
+                            jobItr.remove();
+                        }
+                        catch (JPAExecutorException e) {
+                            LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
+                        }
+
+                    }
+                }
+            }
+        }
+
+        private void updateSLASummary(String id, Date startTime, Date endTime) throws JPAExecutorException {
+            SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
+            if (sla != null) {
+                sla.setActualStart(startTime);
+                sla.setActualEnd(endTime);
+                if (startTime != null && endTime != null) {
+                    sla.setActualDuration(endTime.getTime() - startTime.getTime());
+                }
+                sla.setLastModifiedTime(new Date());
+                sla.setEventProcessed(8);
+                SLASummaryQueryExecutor.getInstance().executeUpdate(
+                        SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, sla);
+            }
+        }
+
+        private void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException {
+            String wrkflowId = bean.getExternalId();
+            if (wrkflowId != null) {
+                WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get(
+                        WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId);
+                if (wrkflow != null) {
+                    updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime());
+                }
+            }
+        }
     }
 
     private void loadOnRestart() {
@@ -101,28 +263,50 @@ public class SLACalculatorMemory implements SLACalculator {
                     modifiedAfter));
             for (SLASummaryBean summaryBean : summaryBeans) {
                 String jobId = summaryBean.getId();
-                try {
-                    switch (summaryBean.getAppType()) {
-                        case COORDINATOR_ACTION:
-                            isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
-                            break;
-                        case WORKFLOW_ACTION:
-                            isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
-                            break;
-                        case WORKFLOW_JOB:
-                            isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
-                            break;
-                        default:
-                            break;
-                    }
-                    if (isJobModified) {
-                        summaryBean.setLastModifiedTime(new Date());
-                        SLASummaryQueryExecutor.getInstance().executeUpdate(
-                                SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean);
-                    }
+                LockToken lock = null;
+                switch (summaryBean.getAppType()) {
+                    case COORDINATOR_ACTION:
+                        isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
+                        break;
+                    case WORKFLOW_ACTION:
+                        isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
+                        break;
+                    case WORKFLOW_JOB:
+                        isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
+                        break;
+                    default:
+                        break;
                 }
-                catch (Exception e) {
-                    LOG.warn("Failed to load records for " + jobId, e);
+                if (isJobModified) {
+                    try {
+                        boolean update = true;
+                        if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
+                            lock = Services
+                                    .get()
+                                    .get(MemoryLocksService.class)
+                                    .getWriteLock(
+                                            SLACalcStatus.SLA_ENTITYKEY_PREFIX + jobId,
+                                            Services.get().getConf()
+                                                    .getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000));
+                            if (lock == null) {
+                                update = false;
+                            }
+                        }
+                        if (update) {
+                            summaryBean.setLastModifiedTime(new Date());
+                            SLASummaryQueryExecutor.getInstance().executeUpdate(
+                                    SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean);
+                        }
+                    }
+                    catch (Exception e) {
+                        LOG.warn("Failed to load records for " + jobId, e);
+                    }
+                    finally {
+                        if (lock != null) {
+                            lock.release();
+                            lock = null;
+                        }
+                    }
                 }
                 try {
                     if (summaryBean.getEventProcessed() == 7) {
@@ -130,8 +314,8 @@ public class SLACalculatorMemory implements SLACalculator {
                         statusPendingCount++;
                     }
                     else if (summaryBean.getEventProcessed() <= 7) {
-                        SLARegistrationBean slaRegBean = jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(
-                                jobId));
+                        SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
+                                SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
                         SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
                         slaMap.put(jobId, slaCalcStatus);
                         slaPendingCount++;
@@ -260,8 +444,8 @@ public class SLACalculatorMemory implements SLACalculator {
         SLACalcStatus memObj;
         memObj = slaMap.get(jobId);
         if (memObj == null && historySet.contains(jobId)) {
-            memObj = new SLACalcStatus(jpaService.execute(new SLASummaryGetJPAExecutor(jobId)),
-                    jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(jobId)));
+            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId),
+                    SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
         }
         return memObj;
     }
@@ -285,11 +469,15 @@ public class SLACalculatorMemory implements SLACalculator {
     /**
      * Invoked via periodic run, update the SLA for registered jobs
      */
-    protected void updateJobSla(String jobId) throws JPAExecutorException, ServiceException {
+    protected void updateJobSla(String jobId) throws Exception {
         SLACalcStatus slaCalc = slaMap.get(jobId);
         synchronized (slaCalc) {
             boolean change = false;
-            byte eventProc = slaCalc.getEventProcessed();
+            // get eventProcessed on DB for validation in HA
+            Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue(
+                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId);
+            byte eventProc = ((Byte) eventProcObj).byteValue();
+            slaCalc.setEventProcessed(eventProc);
             SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
             // calculation w.r.t current time and status
             if ((eventProc & 1) == 0) { // first bit (start-processed) unset
@@ -297,8 +485,8 @@ public class SLACalculatorMemory implements SLACalculator {
                     if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
                         confirmWithDB(slaCalc);
                         eventProc = slaCalc.getEventProcessed();
-                        if (eventProc != 8 && (eventProc & 1 ) == 0) {
-                            //Some DB exception
+                        if (eventProc != 8 && (eventProc & 1) == 0) {
+                            // Some DB exception
                             slaCalc.setEventStatus(EventStatus.START_MISS);
                             eventHandler.queueEvent(new SLACalcStatus(slaCalc));
                             eventProc++;
@@ -307,11 +495,13 @@ public class SLACalculatorMemory implements SLACalculator {
                     }
                 }
                 else {
-                    eventProc++; //disable further processing for optional start sla condition
+                    eventProc++; // disable further processing for optional
+                                 // start sla condition
                     change = true;
                 }
             }
-            if (((eventProc >> 1) & 1) == 0 && eventProc != 8) { // check if second bit (duration-processed) is unset
+            // check if second bit (duration-processed) is unset
+            if (((eventProc >> 1) & 1) == 0 && eventProc != 8) {
                 if (reg.getExpectedDuration() == -1) {
                     eventProc += 2;
                     change = true;
@@ -322,8 +512,8 @@ public class SLACalculatorMemory implements SLACalculator {
                         slaCalc.setEventProcessed(eventProc);
                         confirmWithDB(slaCalc);
                         eventProc = slaCalc.getEventProcessed();
-                        if (eventProc != 8 && ((eventProc >> 1) & 1 ) == 0) {
-                            //Some DB exception
+                        if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+                            // Some DB exception
                             slaCalc.setEventStatus(EventStatus.DURATION_MISS);
                             eventHandler.queueEvent(new SLACalcStatus(slaCalc));
                             eventProc += 2;
@@ -341,32 +531,51 @@ public class SLACalculatorMemory implements SLACalculator {
                 }
             }
             if (change) {
-                if (slaCalc.getEventProcessed() >= 8) { //no more processing, no transfer to history set
-                    eventProc = 8;
-                    slaCalc.setEventProcessed(8); // Should not be > 8. But to handle any corner cases.
-                    slaMap.remove(jobId);
+                try {
+                    boolean locked = true;
+                    slaCalc.acquireLock();
+                    locked = slaCalc.isLocked();
+                    if (locked) {
+                        // no more processing, no transfer to history set
+                        if (slaCalc.getEventProcessed() >= 8) {
+                            eventProc = 8;
+                            // Should not be > 8. But to handle any corner cases
+                            slaCalc.setEventProcessed(8);
+                            slaMap.remove(jobId);
+                        }
+                        else {
+                            slaCalc.setEventProcessed(eventProc);
+                        }
+                        SLASummaryBean slaSummaryBean = new SLASummaryBean();
+                        slaSummaryBean.setId(slaCalc.getId());
+                        slaSummaryBean.setEventProcessed(eventProc);
+                        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+                        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+                        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+                        slaSummaryBean.setActualStart(slaCalc.getActualStart());
+                        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+                        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+                        slaSummaryBean.setLastModifiedTime(new Date());
+                        SLASummaryQueryExecutor.getInstance().executeUpdate(
+                                SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
+                        if (eventProc == 7) {
+                            historySet.add(jobId);
+                            slaMap.remove(jobId);
+                            LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
+                        }
+                    }
                 }
-                else {
-                    slaCalc.setEventProcessed(eventProc);
+                catch (InterruptedException e) {
+                    throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut());
                 }
-                SLASummaryBean slaSummaryBean = new SLASummaryBean();
-                slaSummaryBean.setId(slaCalc.getId());
-                slaSummaryBean.setEventProcessed(eventProc);
-                slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
-                slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
-                slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
-                slaSummaryBean.setActualStart(slaCalc.getActualStart());
-                slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
-                slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
-                slaSummaryBean.setLastModifiedTime(new Date());
-                SLASummaryQueryExecutor.getInstance().executeUpdate(
-                        SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
-                if (eventProc == 7) {
-                    historySet.add(jobId);
-                    slaMap.remove(jobId);
-                    LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
+                finally {
+                    slaCalc.releaseLock();
                 }
-
+            }
+            else if (eventProc >= 7) {
+                historySet.add(jobId);
+                slaMap.remove(jobId);
+                LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
             }
         }
     }
@@ -494,46 +703,77 @@ public class SLACalculatorMemory implements SLACalculator {
         SLACalcStatus slaCalc = slaMap.get(jobId);
         SLASummaryBean slaInfo = null;
         boolean hasSla = false;
-        if (slaCalc != null) {
-            synchronized (slaCalc) {
-                slaCalc.setJobStatus(jobStatus);
-                switch (jobEventStatus) {
-                    case STARTED:
-                        slaInfo = processJobStartSLA(slaCalc, startTime);
-                        break;
-                    case SUCCESS:
-                        slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
-                        break;
-                    case FAILURE:
-                        slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
-                        break;
-                    default:
-                        LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
-                        slaInfo = getSLASummaryBean(slaCalc);
+        if (slaCalc == null) {
+            if (historySet.contains(jobId)) {
+                slaInfo = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+                if (slaInfo == null) {
+                    throw new JPAExecutorException(ErrorCode.E0604, jobId);
                 }
-
-                if (slaCalc.getEventProcessed() == 7) {
-                    slaInfo.setEventProcessed(8);
-                    slaMap.remove(jobId);
+                slaInfo.setJobStatus(jobStatus);
+                slaInfo.setActualStart(startTime);
+                slaInfo.setActualEnd(endTime);
+                if (endTime != null) {
+                    slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
                 }
+                slaInfo.setEventProcessed(8);
+                historySet.remove(jobId);
                 hasSla = true;
+            } else {
+                // jobid might not exist in slaMap in HA Setting
+                SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
+                        SLARegQuery.GET_SLA_REG_ALL, jobId);
+                SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY,
+                        jobId);
+                slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
+                if(slaCalc.getEventProcessed() < 7){
+                    slaMap.put(jobId, slaCalc);
+                }
             }
-            LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
         }
-        else if (historySet.contains(jobId)) {
-            slaInfo = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
-            if (slaInfo == null) {
-                throw new JPAExecutorException(ErrorCode.E0604, jobId);
-            }
-            slaInfo.setJobStatus(jobStatus);
-            slaInfo.setActualStart(startTime);
-            slaInfo.setActualEnd(endTime);
-            if (endTime != null) {
-                slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
+        if (slaCalc != null) {
+            synchronized (slaCalc) {
+                try {
+                    // only get ZK lock when multiple servers running
+                    boolean locked = true;
+                    slaCalc.acquireLock();
+                    locked = slaCalc.isLocked();
+                    if (locked) {
+                        // get eventProcessed on DB for validation in HA
+                        Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance())
+                                .getSingleValue(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId);
+                        byte eventProc = ((Byte) eventProcObj).byteValue();
+                        slaCalc.setEventProcessed(eventProc);
+                        slaCalc.setJobStatus(jobStatus);
+                        switch (jobEventStatus) {
+                            case STARTED:
+                                slaInfo = processJobStartSLA(slaCalc, startTime);
+                                break;
+                            case SUCCESS:
+                                slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
+                                break;
+                            case FAILURE:
+                                slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
+                                break;
+                            default:
+                                LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
+                                slaInfo = getSLASummaryBean(slaCalc);
+                        }
+
+                        if (slaCalc.getEventProcessed() == 7) {
+                            slaInfo.setEventProcessed(8);
+                            slaMap.remove(jobId);
+                        }
+                        hasSla = true;
+                    }
+                }
+                catch (InterruptedException e) {
+                    throw new ServiceException(ErrorCode.E0606, slaCalc.getEntityKey(), slaCalc.getLockTimeOut());
+                }
+                finally {
+                    slaCalc.releaseLock();
+                }
             }
-            slaInfo.setEventProcessed(8);
-            historySet.remove(jobId);
-            hasSla = true;
+            LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
         }
 
         if (hasSla) {
@@ -829,4 +1069,13 @@ public class SLACalculatorMemory implements SLACalculator {
         }
     }
 
+    @VisibleForTesting
+    public boolean isJobIdInSLAMap(String jobId) {
+        return this.slaMap.containsKey(jobId);
+    }
+
+    @VisibleForTesting
+    public boolean isJobIdInHistorySet(String jobId) {
+        return this.historySet.contains(jobId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLAOperations.java b/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
index b0da3dc..0cad071 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
@@ -19,12 +19,14 @@ package org.apache.oozie.sla;
 
 import java.text.ParseException;
 import java.util.Date;
+
 import org.apache.oozie.AppType;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.event.SLAEvent.EventStatus;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
@@ -165,7 +167,7 @@ public class SLAOperations {
         JPAService jpaService = Services.get().get(JPAService.class);
         SLAService slaService = Services.get().get(SLAService.class);
         try {
-            SLARegistrationBean reg = jpaService.execute(new SLARegistrationGetJPAExecutor(jobId));
+            SLARegistrationBean reg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
             if (reg != null) { //handle coord rerun with different config without sla
                 slaService.updateRegistrationEvent(reg);
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
index 0a70326..80883ee 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
@@ -47,11 +47,18 @@ import org.json.simple.JSONObject;
 
  @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES", query = "update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.jobStatus = :jobStatus, w.lastModifiedTS = :lastModifiedTS, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration where w.jobId = :jobId"),
 
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration, w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"),
+
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_EVENTPROCESSED", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed where w.jobId = :jobId"),
+
  @NamedQuery(name = "UPDATE_SLA_SUMMARY_ALL", query = "update SLASummaryBean w set w.jobId = :jobId, w.appName = :appName, w.appType = :appType, w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration, w.jobStatus = :jobStatus, w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.lastModifiedTS = :lastModTime, w.user = :user, w.parentId = :parentId, w.eventProcessed = :eventProcessed, w.actualDuration = :actualDuration, w.actualEndTS = :actualEndTS, w.actualStartTS = :actualStartTS where w.jobId = :jobId"),
 
  @NamedQuery(name = "GET_SLA_SUMMARY", query = "select OBJECT(w) from SLASummaryBean w where w.jobId = :id"),
 
- @NamedQuery(name = "GET_SLA_SUMMARY_RECORDS_RESTART", query = "select OBJECT(w) from SLASummaryBean w where w.eventProcessed <= 7 AND w.lastModifiedTS >= :lastModifiedTime") })
+ @NamedQuery(name = "GET_SLA_SUMMARY_RECORDS_RESTART", query = "select OBJECT(w) from SLASummaryBean w where w.eventProcessed <= 7 AND w.lastModifiedTS >= :lastModifiedTime"),
+
+ @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED", query = "select w.eventProcessed from SLASummaryBean w where w.jobId = :id")
+})
 
 /**
  * Class to store all the SLA related details (summary) per job

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
index 7fcb334..2349329 100644
--- a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
+++ b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
@@ -18,6 +18,7 @@
 package org.apache.oozie.sla.service;
 
 import java.util.Date;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
@@ -31,6 +32,7 @@ import org.apache.oozie.sla.SLACalculator;
 import org.apache.oozie.sla.SLACalculatorMemory;
 import org.apache.oozie.sla.SLARegistrationBean;
 import org.apache.oozie.util.XLog;
+
 import com.google.common.annotations.VisibleForTesting;
 
 public class SLAService implements Service {
@@ -43,6 +45,9 @@ public class SLAService implements Service {
     public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency";
     //Time interval, in seconds, at which SLA Worker will be scheduled to run
     public static final String CONF_SLA_CHECK_INTERVAL = CONF_PREFIX + "check.interval";
+    public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay";
+    public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout";
+    public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval";
 
     private static SLACalculator calcImpl;
     private static boolean slaEnabled = false;
@@ -69,7 +74,9 @@ public class SLAService implements Service {
             Runnable slaThread = new SLAWorker(calcImpl);
             // schedule runnable by default every 30 sec
             int slaCheckInterval = services.getConf().getInt(CONF_SLA_CHECK_INTERVAL, 30);
-            services.get(SchedulerService.class).schedule(slaThread, 10, slaCheckInterval, SchedulerService.Unit.SEC);
+            int slaCheckInitialDelay = services.getConf().getInt(CONF_SLA_CHECK_INITIAL_DELAY, 10);
+            services.get(SchedulerService.class).schedule(slaThread, slaCheckInitialDelay, slaCheckInterval,
+                    SchedulerService.Unit.SEC);
             slaEnabled = true;
             LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(),
                     conf.get(SLAService.CONF_CAPACITY));

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
index 86c2d32..4e842cf 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
@@ -36,10 +36,12 @@ import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.StatusTransitService;
@@ -611,17 +613,17 @@ public class TestCoordChangeXCommand extends XDataTestCase {
             assertEquals(ErrorCode.E0603, jpae.getErrorCode());
         }
 
-        slaRegBean1 = jpaService.execute(new SLARegistrationGetJPAExecutor(slaRegBean1.getId()));
+        slaRegBean1 = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, slaRegBean1.getId());
         assertNotNull(slaRegBean1);
-        slaRegBean2 = jpaService.execute(new SLARegistrationGetJPAExecutor(slaRegBean2.getId()));
+        slaRegBean2 = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, slaRegBean2.getId());
         assertNotNull(slaRegBean2);
-        slaRegBean3 = jpaService.execute(new SLARegistrationGetJPAExecutor(slaRegBean3.getId()));
+        slaRegBean3 = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, slaRegBean3.getId());
         assertNull(slaRegBean3);
-        slaRegBean4 = jpaService.execute(new SLARegistrationGetJPAExecutor(slaRegBean4.getId()));
+        slaRegBean4 = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, slaRegBean4.getId());
         assertNull(slaRegBean4);
-        slaSummaryBean3 = jpaService.execute(new SLASummaryGetJPAExecutor(slaSummaryBean3.getId()));
+        slaSummaryBean3 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, slaSummaryBean3.getId());
         assertNull(slaSummaryBean3);
-        slaSummaryBean1 = jpaService.execute(new SLASummaryGetJPAExecutor(slaSummaryBean1.getId()));
+        slaSummaryBean1 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, slaSummaryBean1.getId());
         assertNotNull(slaSummaryBean1);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/executor/jpa/TestSLARegistrationQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLARegistrationQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLARegistrationQueryExecutor.java
index 00fb677..b820437 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLARegistrationQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLARegistrationQueryExecutor.java
@@ -44,7 +44,7 @@ public class TestSLARegistrationQueryExecutor extends XDataTestCase {
         super.tearDown();
     }
 
-    public void testGetQuery() throws Exception {
+    public void testGetUpdateQuery() throws Exception {
         EntityManager em = jpaService.getEntityManager();
         SLARegistrationBean bean = addRecordToSLARegistrationTable("test-application", SLAStatus.MET);
 
@@ -69,11 +69,56 @@ public class TestSLARegistrationQueryExecutor extends XDataTestCase {
         em.close();
     }
 
-    public void testExecuteUpdate() throws Exception {
-        // TODO
+    public void testGetSelectQuery() throws Exception {
+        EntityManager em = jpaService.getEntityManager();
+        SLARegistrationBean bean = addRecordToSLARegistrationTable("test-application", SLAStatus.MET);
+        // GET_SLA_REG_ALL
+        Query query = SLARegistrationQueryExecutor.getInstance().getSelectQuery(SLARegQuery.GET_SLA_REG_ALL, em,
+                bean.getId());
+        assertEquals(query.getParameterValue("id"), bean.getId());
+
+        // GET_WORKFLOW_SUSPEND
+        query = SLARegistrationQueryExecutor.getInstance().getSelectQuery(SLARegQuery.GET_SLA_REG_ON_RESTART, em,
+                bean.getId());
+        assertEquals(query.getParameterValue("id"), bean.getId());
     }
 
     public void testGet() throws Exception {
+
+        SLARegistrationBean bean = addRecordToSLARegistrationTable("test-application", SLAStatus.MET);
+        //GET_SLA_REG_ON_RESTART
+        SLARegistrationBean retBean = SLARegistrationQueryExecutor.getInstance().get(
+                SLARegQuery.GET_SLA_REG_ON_RESTART, bean.getId());
+        assertEquals(bean.getJobData(), retBean.getJobData());
+        assertEquals(bean.getSlaConfig(), retBean.getSlaConfig());
+        assertEquals(bean.getUpstreamApps(), retBean.getUpstreamApps());
+        assertEquals(bean.getNotificationMsg(), retBean.getNotificationMsg());
+        assertNull(retBean.getAppName());
+        assertNull(retBean.getExpectedEnd());
+        assertNull(retBean.getExpectedStart());
+        assertNull(retBean.getCreatedTime());
+        assertNull(retBean.getNominalTime());
+        assertNull(retBean.getUser());
+        assertNull(retBean.getParentId());
+        //GET_SLA_REG_ALL
+        retBean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, bean.getId());
+        assertEquals(bean.getId(), retBean.getId());
+        assertEquals(bean.getAppName(), retBean.getAppName());
+        assertEquals(bean.getAppType(), retBean.getAppType());
+        assertEquals(bean.getExpectedDuration(), retBean.getExpectedDuration());
+        assertEquals(bean.getExpectedStart().getTime(), retBean.getExpectedStart().getTime());
+        assertEquals(bean.getExpectedEnd().getTime(), retBean.getExpectedEnd().getTime());
+        assertEquals(bean.getCreatedTime().getTime(), retBean.getCreatedTime().getTime());
+        assertEquals(bean.getNominalTime().getTime(), retBean.getNominalTime().getTime());
+        assertEquals(bean.getNotificationMsg(), retBean.getNotificationMsg());
+        assertEquals(bean.getJobData(), retBean.getJobData());
+        assertEquals(bean.getParentId(), retBean.getParentId());
+        assertEquals(bean.getSlaConfig(), retBean.getSlaConfig());
+        assertEquals(bean.getUpstreamApps(), retBean.getUpstreamApps());
+        assertEquals(bean.getUser(), retBean.getUser());
+}
+
+    public void testExecuteUpdate() throws Exception {
         // TODO
     }