You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/06/10 06:37:23 UTC

svn commit: r1491333 [1/2] - in /oozie/trunk: ./ client/src/main/java/org/apache/oozie/client/event/message/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/command/wf/ core/src/main/java/org/apache/oozie/event/me...

Author: virag
Date: Mon Jun 10 04:37:20 2013
New Revision: 1491333

URL: http://svn.apache.org/r1491333
Log:
OOZIE-1339 Implement SLA Bootstrap Service and fix bugs in SLACalculator (virag)

Added:
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetRecordsOnRestartJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
Modified:
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/SLAMessage.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAEmailEventListener.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
    oozie/trunk/core/src/main/resources/oozie-default.xml
    oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java
    oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
    oozie/trunk/release-log.txt

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/SLAMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/SLAMessage.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/SLAMessage.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/SLAMessage.java Mon Jun 10 04:37:20 2013
@@ -68,7 +68,7 @@ public class SLAMessage extends EventMes
 
     public SLAMessage(SLAEvent.EventStatus eventStatus, SLAEvent.SLAStatus slaStatus, AppType appType, String appName,
             String user, String jobId, String parentJobId, Date nominalTime, Date expectedStartTime,
-            Date actualStartTime, Date expectedEndTime, Date actualEndTime, long expectedDuration,
+            Date actualStartTime, Date expectedEndTime, Date actualEndTime, long expectedDuration, long actualDuration,
             String notificationMessage, String upstreamApps) {
 
         super(MessageType.SLA, appType);
@@ -84,8 +84,7 @@ public class SLAMessage extends EventMes
         this.expectedEndTime = expectedEndTime;
         this.actualEndTime = actualEndTime;
         this.expectedDuration = expectedDuration;
-        this.actualDuration = (actualEndTime != null && actualStartTime != null) ? ( actualEndTime.getTime()
-                - actualStartTime.getTime()) / (1000 * 60) : -1;
+        this.actualDuration = actualDuration;
         this.notificationMessage = notificationMessage;
         this.upstreamApps = upstreamApps;
     }
@@ -183,7 +182,7 @@ public class SLAMessage extends EventMes
     /**
      * Get expected end time
      *
-     * @return
+     * @return expectedEndTime
      */
     public Date getExpectedEndTime() {
         return expectedEndTime;

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java Mon Jun 10 04:37:20 2013
@@ -279,7 +279,7 @@ public class CoordActionMaterializeComma
         }
         // insert into new sla reg table too
         SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
-                AppType.COORDINATOR_ACTION, user, appName, log);
+                AppType.COORDINATOR_ACTION, user, appName, log, false);
     }
 
     /**

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Mon Jun 10 04:37:20 2013
@@ -390,7 +390,7 @@ public class CoordMaterializeTransitionX
         }
         // inserting into new table also
         SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
-                AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG);
+                AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false);
     }
 
     private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Mon Jun 10 04:37:20 2013
@@ -49,6 +49,8 @@ import org.apache.oozie.executor.jpa.JPA
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLAOperations;
+import org.apache.oozie.sla.service.SLAService;
 import org.apache.oozie.util.InstrumentUtils;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.ParamChecker;
@@ -333,7 +335,9 @@ public class CoordRerunXCommand extends 
                         refreshAction(coordJob, coordAction);
                     }
                     updateAction(coordJob, coordAction, actionXml);
-
+                    if (SLAService.isEnabled()) {
+                        SLAOperations.updateRegistrationEvent(coordAction.getId());
+                    }
                     queue(new CoordActionNotificationXCommand(coordAction), 100);
                     queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
                 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java Mon Jun 10 04:37:20 2013
@@ -32,9 +32,11 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.oozie.AppType;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
@@ -50,9 +52,14 @@ import org.apache.oozie.service.HadoopAc
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.service.WorkflowStoreService;
+import org.apache.oozie.sla.SLAOperations;
+import org.apache.oozie.sla.service.SLAService;
 import org.apache.oozie.util.ConfigUtils;
+import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.ELUtils;
 import org.apache.oozie.util.InstrumentUtils;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.ParamChecker;
@@ -65,6 +72,8 @@ import org.apache.oozie.workflow.Workflo
 import org.apache.oozie.workflow.WorkflowInstance;
 import org.apache.oozie.workflow.WorkflowLib;
 import org.apache.oozie.workflow.lite.NodeHandler;
+import org.jdom.Element;
+import org.jdom.JDOMException;
 
 /**
  * This is a RerunXCommand which is used for rerunn.
@@ -156,6 +165,20 @@ public class ReRunXCommand extends Workf
             catch (WorkflowException e) {
                 throw new CommandException(e);
             }
+
+            if (SLAService.isEnabled()) {
+                Element wfElem = XmlUtils.parseXml(app.getDefinition());
+                ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
+                Element eSla = XmlUtils.getSLAElement(wfElem);
+                String jobSlaXml = null;
+                if (eSla != null) {
+                    jobSlaXml = SubmitXCommand.resolveSla(eSla, evalSla);
+                }
+                String appName = ELUtils.resolveAppName(app.getName(), conf);
+                writeSLARegistration(wfElem, jobSlaXml, newWfInstance.getId(),
+                            conf.get(SubWorkflowActionExecutor.PARENT_ID), conf.get(OozieClient.USER_NAME), appName,
+                            evalSla);
+            }
             wfBean.setAppName(app.getName());
             wfBean.setProtoActionConf(protoActionConf.toXmlString());
         }
@@ -171,6 +194,9 @@ public class ReRunXCommand extends Workf
         catch (URISyntaxException ex) {
             throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
         }
+        catch (Exception ex) {
+            throw new CommandException(ErrorCode.E1007, ex.getMessage(), ex);
+        }
 
         for (int i = 0; i < actions.size(); i++) {
             if (!nodesToSkip.contains(actions.get(i).getName())) {
@@ -207,6 +233,33 @@ public class ReRunXCommand extends Workf
         return null;
     }
 
+
+    @SuppressWarnings("unchecked")
+	private void writeSLARegistration(Element wfElem, String jobSlaXml, String id, String parentId, String user,
+            String appName, ELEvaluator evalSla) throws JDOMException, CommandException {
+        if (jobSlaXml != null && jobSlaXml.length() > 0) {
+            Element eSla = XmlUtils.parseXml(jobSlaXml);
+            // insert into new table
+            SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, LOG,
+                    true);
+        }
+        // Add sla for wf actions
+        for (Element action : (List<Element>) wfElem.getChildren("action", wfElem.getNamespace())) {
+            Element actionSla = XmlUtils.getSLAElement(action);
+            if (actionSla != null) {
+                String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
+                actionSla = XmlUtils.parseXml(actionSlaXml);
+                if (!nodesToSkip.contains(action.getAttributeValue("name"))) {
+                    String actionId = Services.get().get(UUIDService.class)
+                            .generateChildId(jobId, action.getAttributeValue("name") + "");
+                    SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, user,
+                            appName, LOG, true);
+                }
+            }
+        }
+
+    }
+
     /**
      * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the
      * skipped node list

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java Mon Jun 10 04:37:20 2013
@@ -271,7 +271,7 @@ public class SubmitXCommand extends Work
                 }
                 // insert into new table
                 SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName,
-                        log);
+                        log, false);
             }
             // Add sla for wf actions
             for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
@@ -282,7 +282,7 @@ public class SubmitXCommand extends Work
                     String actionId = Services.get().get(UUIDService.class)
                             .generateChildId(jobId, action.getAttributeValue("name") + "");
                     SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION,
-                            user, appName, log);
+                            user, appName, log, false);
                 }
             }
         }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java Mon Jun 10 04:37:20 2013
@@ -99,7 +99,7 @@ public class MessageFactory {
         SLAMessage slaMessage = new SLAMessage(event.getEventStatus(), event.getSLAStatus(), event.getAppType(),
                 event.getAppName(), event.getUser(), event.getId(), event.getParentId(), event.getNominalTime(),
                 event.getExpectedStart(), event.getActualStart(), event.getExpectedEnd(), event.getActualEnd(),
-                event.getExpectedDuration(), event.getNotificationMsg(), event.getUpstreamApps());
+                event.getExpectedDuration(), event.getActualDuration(), event.getNotificationMsg(), event.getUpstreamApps());
         return slaMessage;
     }
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -17,17 +17,13 @@
  */
 package org.apache.oozie.executor.jpa.sla;
 
-import java.sql.Timestamp;
-
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
-import org.apache.oozie.AppType;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.executor.jpa.JPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.sla.SLARegistrationBean;
-import org.apache.oozie.util.DateUtils;
 
 /**
  * Load the list of SLARegistrationBean and return the list.
@@ -50,57 +46,13 @@ public class SLARegistrationGetJPAExecut
         try {
             Query q = em.createNamedQuery("GET_SLA_REG_ALL");
             q.setParameter("id", id);
-            Object[] obj = (Object[]) q.getSingleResult();
-            return getBeanFromObj(obj);
+            SLARegistrationBean slaRegBean= (SLARegistrationBean) q.getSingleResult();
+            slaRegBean.setSlaConfig(slaRegBean.getSlaConfig());
+            return slaRegBean;
         }
         catch (Exception e) {
             throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
         }
     }
 
-    private SLARegistrationBean getBeanFromObj(Object[] arr) {
-        SLARegistrationBean bean = new SLARegistrationBean();
-        if (arr[0] != null) {
-            bean.setJobId((String) arr[0]);
-        }
-        if (arr[1] != null) {
-            bean.setAppType(AppType.valueOf((String) arr[1]));
-        }
-        if (arr[2] != null) {
-            bean.setAppName((String) arr[2]);
-        }
-        if (arr[3] != null) {
-            bean.setUser((String) arr[3]);
-        }
-        if (arr[4] != null) {
-            bean.setNominalTime(DateUtils.toDate((Timestamp) arr[4]));
-        }
-        if (arr[5] != null) {
-            bean.setExpectedStart(DateUtils.toDate((Timestamp) arr[5]));
-        }
-        if (arr[6] != null) {
-            bean.setExpectedEnd(DateUtils.toDate((Timestamp) arr[6]));
-        }
-        if (arr[7] != null) {
-            bean.setExpectedDuration((Long) arr[7]);
-        }
-        if (arr[8] != null) {
-            bean.setJobData((String) arr[8]);
-        }
-        if (arr[9] != null) {
-            bean.setParentId((String) arr[9]);
-        }
-        if (arr[10] != null) {
-            bean.setNotificationMsg((String) arr[10]);
-        }
-        if (arr[11] != null) {
-            bean.setUpstreamApps((String) arr[11]);
-        }
-        if (arr[12] != null) {
-            bean.setSlaConfig((String) arr[12]);
-        }
-
-        return bean;
-    }
-
 }

Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa.sla;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.executor.jpa.JPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.sla.SLARegistrationBean;
+
+/**
+ * Load SLARegistrationBean on restart
+ */
+public class SLARegistrationGetOnRestartJPAExecutor implements JPAExecutor<SLARegistrationBean> {
+
+    private String id;
+
+    public SLARegistrationGetOnRestartJPAExecutor(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getName() {
+        return "SLARegistrationGetOnRestartJPAExecutor";
+    }
+
+    @Override
+    public SLARegistrationBean execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = em.createNamedQuery("GET_SLA_REG_ON_RESTART");
+            q.setParameter("id", id);
+            Object[] obj = (Object[]) q.getSingleResult();
+            return getBeanFromObj(obj);
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+    private SLARegistrationBean getBeanFromObj(Object[] arr) {
+        SLARegistrationBean bean = new SLARegistrationBean();
+        if (arr[0] != null) {
+            bean.setNotificationMsg((String) arr[0]);
+        }
+        if (arr[1] != null) {
+            bean.setUpstreamApps((String) arr[1]);
+        }
+        if (arr[2] != null) {
+            bean.setSlaConfig((String) arr[2]);
+        }
+        if (arr[3] != null) {
+            bean.setJobData((String) arr[3]);
+        }
+        return bean;
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetRecordsOnRestartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetRecordsOnRestartJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetRecordsOnRestartJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetRecordsOnRestartJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa.sla;
+
+import java.sql.Timestamp;
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.executor.jpa.JPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.sla.SLASummaryBean;
+
+/**
+ * Load the list of SLASummaryBean when Oozie restarts and return the list.
+ */
+public class SLASummaryGetRecordsOnRestartJPAExecutor implements JPAExecutor<List<SLASummaryBean>> {
+
+    private int days;
+    public SLASummaryGetRecordsOnRestartJPAExecutor (int days) {
+        this.days = days;
+    }
+
+    @Override
+    public String getName() {
+        return "SLASummaryGetRecordsOnRestartJPAExecutor";
+    }
+
+    @SuppressWarnings("unchecked")
+	@Override
+    public List<SLASummaryBean> execute(EntityManager em) throws JPAExecutorException {
+        List<SLASummaryBean> ssBean;
+        try {
+            Query q = em.createNamedQuery("GET_SLA_SUMMARY_RECORDS_RESTART");
+            Timestamp ts = new Timestamp(System.currentTimeMillis() - days * 24 * 60 * 60 * 1000);
+            q.setParameter("lastModifiedTime", ts);
+            ssBean = q.getResultList();
+            return ssBean;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa.sla;
+
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.executor.jpa.JPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.sla.SLASummaryBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Update the slaStatus, eventStatus, eventProcessed, jobStatus, actualStart,
+ * actualEnd, actualDuration and lastModifiedTime of SLAsummaryBean
+ */
+public class SLASummaryUpdateForSLAStatusActualTimesJPAExecutor implements JPAExecutor<Void> {
+
+    private SLASummaryBean slaSummaryBean = null;
+
+    public SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(SLASummaryBean slaSummaryBean) {
+        ParamChecker.notNull(slaSummaryBean, "slaSummaryBean");
+        this.slaSummaryBean = slaSummaryBean;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = em.createNamedQuery("UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES");
+            q.setParameter("jobId", slaSummaryBean.getJobId());
+            q.setParameter("slaStatus", slaSummaryBean.getSLAStatusString());
+            q.setParameter("lastModifiedTS", new Date());
+            q.setParameter("eventStatus", slaSummaryBean.getEventStatusString());
+            q.setParameter("jobStatus", slaSummaryBean.getJobStatus());
+            q.setParameter("eventProcessed", slaSummaryBean.getEventProcessed());
+            q.setParameter("actualStartTS", slaSummaryBean.getActualStart());
+            q.setParameter("actualEndTS", slaSummaryBean.getActualEnd());
+            q.setParameter("actualDuration", slaSummaryBean.getActualDuration());
+            q.executeUpdate();
+            return null;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "SLASummaryUpdateForSLAStatusActualTimesJPAExecutor";
+    }
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa.sla;
+
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.executor.jpa.JPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.sla.SLASummaryBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Update the slaStatus, eventStatus, eventProcessed and lastModifiedTime of SLAsummaryBean
+ */
+public class SLASummaryUpdateForSLAStatusJPAExecutor implements JPAExecutor<Void> {
+
+    private SLASummaryBean slaSummaryBean = null;
+
+    public SLASummaryUpdateForSLAStatusJPAExecutor(SLASummaryBean slaSummaryBean) {
+        ParamChecker.notNull(slaSummaryBean, "slaSummaryBean");
+        this.slaSummaryBean = slaSummaryBean;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = em.createNamedQuery("UPDATE_SLA_SUMMARY_FOR_SLA_STATUS");
+
+            q.setParameter("jobId", slaSummaryBean.getJobId());
+            q.setParameter("slaStatus", slaSummaryBean.getSLAStatusString());
+            q.setParameter("lastModifiedTS", new Date());
+            q.setParameter("eventStatus", slaSummaryBean.getEventStatusString());
+            q.setParameter("eventProcessed", slaSummaryBean.getEventProcessed());
+            q.executeUpdate();
+            return null;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "SLASummaryUpdateForSLAStatusJPAExecutor";
+    }
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java Mon Jun 10 04:37:20 2013
@@ -34,9 +34,8 @@ public class SLACalcStatus extends SLAEv
     private EventStatus eventStatus;
     private Date actualStart;
     private Date actualEnd;
-    private long actualDuration;
+    private long actualDuration = -1;
     private Date lastModifiedTime;
-    private byte slaProcessed;
     private byte eventProcessed;
 
     public SLACalcStatus(SLARegistrationBean reg) {
@@ -44,9 +43,16 @@ public class SLACalcStatus extends SLAEv
         setSLARegistrationBean(reg);
     }
 
-    public SLACalcStatus(SLASummaryBean summary) {
+    public SLACalcStatus(SLASummaryBean summary, SLARegistrationBean regBean) {
         this();
         SLARegistrationBean reg = new SLARegistrationBean();
+        reg.setNotificationMsg(regBean.getNotificationMsg());
+        reg.setUpstreamApps(regBean.getUpstreamApps());
+        reg.setAlertContact(regBean.getAlertContact());
+        reg.setAlertEvents(regBean.getAlertEvents());
+        reg.setJobData(regBean.getJobData());
+        reg.setJobId(summary.getJobId());
+        reg.setAppType(summary.getAppType());
         reg.setUser(summary.getUser());
         reg.setAppName(summary.getAppName());
         reg.setParentId(summary.getParentId());
@@ -54,17 +60,15 @@ public class SLACalcStatus extends SLAEv
         reg.setExpectedStart(summary.getExpectedStart());
         reg.setExpectedEnd(summary.getExpectedEnd());
         reg.setExpectedDuration(summary.getExpectedDuration());
+        setSLARegistrationBean(reg);
         setActualStart(summary.getActualStart());
         setActualEnd(summary.getActualEnd());
         setActualDuration(summary.getActualDuration());
         setSLAStatus(summary.getSLAStatus());
-        setSLARegistrationBean(reg);
-        setJobId(reg.getId());
         setJobStatus(summary.getJobStatus());
         setEventStatus(summary.getEventStatus());
         setLastModifiedTime(summary.getLastModifiedTime());
-        setSlaProcessed(summary.getSlaProcessed());
-        setEventProcessed((byte) summary.getEventStatus().ordinal());
+        setEventProcessed(summary.getEventProcessed());
     }
 
     /**
@@ -81,15 +85,11 @@ public class SLACalcStatus extends SLAEv
         setActualEnd(a.getActualEnd());
         setActualDuration(a.getActualDuration());
         setLastModifiedTime(a.getLastModifiedTime());
-        setSlaProcessed(a.getSlaProcessed());
         setEventProcessed(a.getEventProcessed());
     }
 
     public SLACalcStatus() {
         setMsgType(MessageType.SLA);
-        setSlaProcessed((byte) 0);
-        setEventProcessed((byte) 0);
-        setActualDuration(-1);
         setLastModifiedTime(new Date());
     }
 
@@ -173,28 +173,13 @@ public class SLACalcStatus extends SLAEv
     }
 
     /**
-     * Get whether sla has been processed
-     * and actual times stored for this job
-     *
-     * @return 0 = sla not processed
-     * 1 = sla processed but actual times not stored
-     * 2 = sla processed as well as actual times stored
-     */
-    public byte getSlaProcessed() {
-        return slaProcessed;
-    }
-
-    public void setSlaProcessed(int slaProcessed) {
-        this.slaProcessed = (byte) slaProcessed;
-    }
-
-    /**
      * Get which type of sla event has been processed needed when calculator
      * periodically loops to update all jobs' sla
      *
      * @return byte 1st bit set (from LSB) = start processed
      * 2nd bit set = duration processed
      * 3rd bit set = end processed
+     * only 4th bit set = everything processed
      */
     public byte getEventProcessed() {
         return eventProcessed;
@@ -279,4 +264,9 @@ public class SLACalcStatus extends SLAEv
         return regBean.getMsgType();
     }
 
+    @Override
+    public String toString() {
+        return "ID: " + getId() + " SLAStatus: " + slaStatus + " EventProcessed: "+eventProcessed;
+    }
+
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculator.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculator.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculator.java Mon Jun 10 04:37:20 2013
@@ -48,4 +48,6 @@ public interface SLACalculator {
 
     SLACalcStatus get(String jobId);
 
+    boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException;
+
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java Mon Jun 10 04:37:20 2013
@@ -36,7 +36,11 @@ import org.apache.oozie.client.event.SLA
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusActualTimesJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusJPAExecutor;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.ServiceException;
@@ -53,16 +57,48 @@ public class SLACalculatorMemory impleme
     private static Map<String, SLACalcStatus> slaMap;
     private static Set<String> historySet;
     private static int capacity;
-    private static JPAService jpa;
+    private static JPAService jpaService;
     private EventHandlerService eventHandler;
+    private static int modifiedAfter;
 
     @Override
     public void init(Configuration conf) throws ServiceException {
         capacity = conf.getInt(SLAService.CONF_CAPACITY, 5000);
         slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
         historySet = Collections.synchronizedSet(new HashSet<String>());
-        jpa = Services.get().get(JPAService.class);
+        jpaService = Services.get().get(JPAService.class);
         eventHandler = Services.get().get(EventHandlerService.class);
+        // load events modified after
+        modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
+        loadOnRestart();
+
+    }
+
+    private void loadOnRestart() {
+        try {
+            List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
+                    modifiedAfter));
+            for (SLASummaryBean summaryBean : summaryBeans) {
+                String jobId = summaryBean.getJobId();
+                if (summaryBean.getEventProcessed() == 7) {
+                    historySet.add(jobId);
+                }
+                else {
+                    try {
+                        SLARegistrationBean slaRegBean = jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(
+                                jobId));
+                        SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
+                        slaMap.put(jobId, slaCalcStatus);
+                    }
+                    catch (JPAExecutorException e) {
+                        XLog.getLog(SLAService.class).warn("Cannot retrieve registration record for " + jobId, e);
+                    }
+                }
+            }
+        }
+        catch (JPAExecutorException e) {
+            XLog.getLog(SLAService.class).warn("Failed to retrieve SLASummary records on restart", e);
+        }
     }
 
     @Override
@@ -101,7 +137,6 @@ public class SLACalculatorMemory impleme
     @Override
     public void updateSlaStatus(String jobId) throws JPAExecutorException, ServiceException {
         SLACalcStatus slaCalc = slaMap.get(jobId);
-        List<JsonBean> updateList = new ArrayList<JsonBean>();
         synchronized (slaCalc) {
             boolean change = false;
             byte eventProc = slaCalc.getEventProcessed();
@@ -119,10 +154,11 @@ public class SLACalculatorMemory impleme
                 }
                 else {
                     eventProc++; //disable further processing for optional start sla condition
+                    change = true;
                 }
+
             }
-            byte eventProcCopy = eventProc;
-            if (((eventProcCopy >> 1) & 1) == 0) { // check if second bit (duration-processed) is unset
+            if (((eventProc >> 1) & 1) == 0) { // check if second bit (duration-processed) is unset
                 if (slaCalc.getActualStart() != null) {
                     if (reg.getExpectedDuration() < Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()
                             - slaCalc.getActualStart().getTime()) {
@@ -133,12 +169,11 @@ public class SLACalculatorMemory impleme
                     }
                 }
             }
-            if (eventProc < 5) {
+            if (eventProc < 4) {
                 if (reg.getExpectedEnd().getTime() < Calendar.getInstance(TimeZone.getTimeZone("UTC"))
                         .getTimeInMillis()) {
                     slaCalc.setEventStatus(EventStatus.END_MISS);
                     slaCalc.setSLAStatus(SLAStatus.MISS);
-                    slaCalc.setSlaProcessed(1);
                     change = true;
                     eventHandler.queueEvent(new SLACalcStatus(slaCalc));
                     eventProc += 4;
@@ -146,14 +181,18 @@ public class SLACalculatorMemory impleme
             }
             if (change) {
                 slaCalc.setEventProcessed(eventProc);
-                slaCalc.setLastModifiedTime(new Date());
-                updateList.add(new SLASummaryBean(slaCalc));
-                jpa.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
-                if (slaCalc.getEventProcessed() == 7 && slaCalc.getSlaProcessed() != 2) {
+                SLASummaryBean slaSummaryBean = new SLASummaryBean();
+                slaSummaryBean.setJobId(slaCalc.getId());
+                slaSummaryBean.setEventProcessed(eventProc);
+                slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+                slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+                jpaService.execute(new SLASummaryUpdateForSLAStatusJPAExecutor(slaSummaryBean));
+                if (eventProc == 7) {
                     historySet.add(jobId);
                     slaMap.remove(jobId);
                     XLog.getLog(SLAService.class).trace("Removed Job [{0}] from map after End-processed", jobId);
                 }
+
             }
         }
     }
@@ -181,22 +220,51 @@ public class SLACalculatorMemory impleme
      */
     @Override
     public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
-        List<JsonBean> insertList = new ArrayList<JsonBean>();
         try {
             if (slaMap.size() < capacity) {
                 SLACalcStatus slaCalc = new SLACalcStatus(reg);
                 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
                 slaMap.put(jobId, slaCalc);
+                List<JsonBean> insertList = new ArrayList<JsonBean>();
                 insertList.add(reg);
                 insertList.add(new SLASummaryBean(slaCalc));
-                jpa.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
+                jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
                 XLog.getLog(SLAService.class).trace("SLA Registration Event - Job:" + jobId);
                 return true;
             }
             else {
-                XLog.getLog(SLAService.class).error(
-                        "SLACalculator memory capacity reached. Cannot add new SLA Registration entry for job [{0}]",
-                        reg.getId());
+                XLog.getLog(SLAService.class)
+                .error("SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
+                reg.getId());
+            }
+        }
+        catch (JPAExecutorException jpa) {
+            throw jpa;
+        }
+        return false;
+    }
+
+    /**
+     * Update job into the map for SLA tracking
+     */
+    @Override
+    public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
+        try {
+            if (slaMap.size() < capacity) {
+                SLACalcStatus slaCalc = new SLACalcStatus(reg);
+                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
+                slaMap.put(jobId, slaCalc);
+                List<JsonBean> updateList = new ArrayList<JsonBean>();
+                updateList.add(reg);
+                updateList.add(new SLASummaryBean(slaCalc));
+                jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
+                XLog.getLog(SLAService.class).trace("SLA Registration Event - Job:" + jobId);
+                return true;
+            }
+            else {
+                XLog.getLog(SLAService.class)
+                .error("SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
+                reg.getId());
             }
         }
         catch (JPAExecutorException jpa) {
@@ -213,62 +281,50 @@ public class SLACalculatorMemory impleme
     public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
             Date endTime) throws JPAExecutorException, ServiceException {
         SLACalcStatus slaCalc = slaMap.get(jobId);
-        List<JsonBean> updateList = new ArrayList<JsonBean>();
         SLASummaryBean slaInfo = null;
         boolean hasSla = false;
         if (slaCalc != null) {
             synchronized (slaCalc) {
-                byte eventProc = slaCalc.getEventProcessed();
                 slaCalc.setJobStatus(jobStatus);
                 switch (jobEventStatus) {
                     case STARTED:
                         slaInfo = processJobStartSLA(slaCalc, startTime);
-                        eventProc += 1;
                         break;
                     case SUCCESS:
                         slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
-                        slaInfo.setSlaProcessed(2);
-                        eventProc += 6;
                         break;
                     case FAILURE:
                         slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
-                        slaInfo.setSlaProcessed(2);
-                        eventProc += 6;
                         break;
                     default:
                         XLog.getLog(SLAService.class).debug("Unknown Job Status [{0}]", jobEventStatus);
                         return false;
                 }
-                slaCalc.setEventProcessed(eventProc);
-                slaInfo.setLastModifiedTime(new Date());
-                if (slaInfo.getSlaProcessed() == 2) {
+
+                if (slaCalc.getEventProcessed() == 7) {
+                    slaInfo.setEventProcessed(8);
                     slaMap.remove(jobId);
                 }
                 hasSla = true;
             }
+            XLog.getLog(SLAService.class).trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
         }
         else if (historySet.contains(jobId)) {
-            slaInfo = jpa.execute(new SLASummaryGetJPAExecutor(jobId));
+            slaInfo = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
             slaInfo.setJobStatus(jobStatus);
             slaInfo.setActualStart(startTime);
             slaInfo.setActualEnd(endTime);
             if (endTime != null) {
                 slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
             }
-            slaInfo.setSlaProcessed(2);
+            slaInfo.setEventProcessed(8);
             historySet.remove(jobId);
             hasSla = true;
         }
+
         if (hasSla) {
-            slaInfo.setLastModifiedTime(new Date());
-            updateList.add(slaInfo);
-            if (jpa != null) {
-                jpa.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
-            }
-            XLog.getLog(SLAService.class)
-                    .trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
+            jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaInfo));
         }
-
         return hasSla;
     }
 
@@ -281,20 +337,27 @@ public class SLACalculatorMemory impleme
      */
     public SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) {
         slaCalc.setActualStart(actualStart);
-        slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
+        if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
+            slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
+        }
         SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
         Date expecStart = reg.getExpectedStart();
         byte eventProc = slaCalc.getEventProcessed();
-        if (((eventProc & 1) == 0) && expecStart != null) {
-            if (actualStart.getTime() > expecStart.getTime()) {
-                slaCalc.setEventStatus(EventStatus.START_MISS);
-            }
-            else {
-                slaCalc.setEventStatus(EventStatus.START_MET);
+        // set event proc here
+        if (((eventProc & 1) == 0)) {
+            if (expecStart != null) {
+                if (actualStart.getTime() > expecStart.getTime()) {
+                    slaCalc.setEventStatus(EventStatus.START_MISS);
+                }
+                else {
+                    slaCalc.setEventStatus(EventStatus.START_MET);
+                }
+                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
             }
-            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+            eventProc += 1;
+            slaCalc.setEventProcessed(eventProc);
         }
-        return new SLASummaryBean(slaCalc);
+        return getSLASummaryBean(slaCalc);
     }
 
     /**
@@ -313,25 +376,35 @@ public class SLACalculatorMemory impleme
         long expectedDuration = reg.getExpectedDuration();
         long actualDuration = actualEnd.getTime() - actualStart.getTime();
         slaCalc.setActualDuration(actualDuration);
-        if (actualDuration > expectedDuration) {
-            slaCalc.setEventStatus(EventStatus.DURATION_MISS);
-        }
-        else {
-            slaCalc.setEventStatus(EventStatus.DURATION_MET);
+        //check event proc
+        byte eventProc = slaCalc.getEventProcessed();
+        if (((eventProc >> 1) & 1) == 0) {
+            if (actualDuration > expectedDuration) {
+                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+            }
+            else {
+                slaCalc.setEventStatus(EventStatus.DURATION_MET);
+            }
+            eventProc += 2;
+            slaCalc.setEventProcessed(eventProc);
+            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
         }
-        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
 
-        Date expectedEnd = reg.getExpectedEnd();
-        if (actualEnd.getTime() > expectedEnd.getTime()) {
-            slaCalc.setEventStatus(EventStatus.END_MISS);
-            slaCalc.setSLAStatus(SLAStatus.MISS);
-        }
-        else {
-            slaCalc.setEventStatus(EventStatus.END_MET);
-            slaCalc.setSLAStatus(SLAStatus.MET);
+        if (eventProc < 4) {
+            Date expectedEnd = reg.getExpectedEnd();
+            if (actualEnd.getTime() > expectedEnd.getTime()) {
+                slaCalc.setEventStatus(EventStatus.END_MISS);
+                slaCalc.setSLAStatus(SLAStatus.MISS);
+            }
+            else {
+                slaCalc.setEventStatus(EventStatus.END_MET);
+                slaCalc.setSLAStatus(SLAStatus.MET);
+            }
+            eventProc += 4;
+            slaCalc.setEventProcessed(eventProc);
+            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
         }
-        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-        return new SLASummaryBean(slaCalc);
+        return getSLASummaryBean(slaCalc);
     }
 
     /**
@@ -344,25 +417,46 @@ public class SLACalculatorMemory impleme
      * @return SLASummaryBean
      */
     public SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) {
-        SLASummaryBean summ;
         slaCalc.setActualStart(actualStart);
         slaCalc.setActualEnd(actualEnd);
         SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
         long expectedDuration = reg.getExpectedDuration();
         long actualDuration = actualEnd.getTime() - actualStart.getTime();
         slaCalc.setActualDuration(actualDuration);
-        if (actualDuration > expectedDuration) {
-            slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+
+        byte eventProc = slaCalc.getEventProcessed();
+        if (((eventProc >> 1) & 1) == 0) {
+            if (actualDuration > expectedDuration) {
+                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+            }
+            else {
+                slaCalc.setEventStatus(EventStatus.DURATION_MET);
+            }
+            eventProc += 2;
+            slaCalc.setEventProcessed(eventProc);
             eventHandler.queueEvent(new SLACalcStatus(slaCalc));
         }
-        else {
-            slaCalc.setEventStatus(EventStatus.DURATION_MET);
+        if (eventProc < 4) {
+            slaCalc.setEventStatus(EventStatus.END_MISS);
+            slaCalc.setSLAStatus(SLAStatus.MISS);
+            eventProc += 4;
+            slaCalc.setEventProcessed(eventProc);
+            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
         }
-        slaCalc.setEventStatus(EventStatus.END_MISS);
-        slaCalc.setSLAStatus(SLAStatus.MISS);
-        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-        summ = new SLASummaryBean(slaCalc);
-        return summ;
+        return getSLASummaryBean(slaCalc);
+    }
+
+    private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) {
+        SLASummaryBean slaSummaryBean = new SLASummaryBean();
+        slaSummaryBean.setActualStart(slaCalc.getActualStart());
+        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+        slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed());
+        slaSummaryBean.setJobId(slaCalc.getId());
+        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+        return slaSummaryBean;
     }
 
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLAOperations.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLAOperations.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLAOperations.java Mon Jun 10 04:37:20 2013
@@ -24,6 +24,9 @@ import org.apache.oozie.AppType;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.event.SLAEvent.EventStatus;
 import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.sla.SLARegistrationBean;
@@ -41,7 +44,7 @@ public class SLAOperations {
     private static final String ALERT_EVENTS = "alert-events";
 
     public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, String parentId,
-            AppType appType, String user, String appName, XLog log) throws CommandException {
+            AppType appType, String user, String appName, XLog log, boolean rerun) throws CommandException {
         if (eSla == null || !SLAService.isEnabled()) {
             log.debug("Not registering SLA for job [{0}]. Sla-Xml null OR SLAService not enabled", jobId);
             return null;
@@ -134,23 +137,46 @@ public class SLAOperations {
         sla.setUser(user);
         sla.setParentId(parentId);
 
-            SLAService slaService = Services.get().get(SLAService.class);
-            try {
+        SLAService slaService = Services.get().get(SLAService.class);
+        try {
+            if (!rerun) {
                 slaService.addRegistrationEvent(sla);
             }
-            catch (ServiceException e) {
-                throw new CommandException(ErrorCode.E1007, " id " + jobId, e.getMessage(), e);
+            else {
+                slaService.updateRegistrationEvent(sla);
             }
+        }
+        catch (ServiceException e) {
+            throw new CommandException(ErrorCode.E1007, " id " + jobId, e.getMessage(), e);
+        }
 
         return sla;
     }
 
+    /**
+     * Retrieve registration event
+     * @param jobId the jobId
+     * @throws CommandException
+     * @throws JPAExecutorException
+     */
+    public static void updateRegistrationEvent(String jobId) throws CommandException, JPAExecutorException {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        SLAService slaService = Services.get().get(SLAService.class);
+        try {
+            slaService.updateRegistrationEvent(jpaService.execute(new SLARegistrationGetJPAExecutor(jobId)));
+        }
+        catch (ServiceException e) {
+            throw new CommandException(ErrorCode.E1007, " id " + jobId, e.getMessage(), e);
+        }
+
+    }
+
     /*
      * parentId null
      */
     public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, AppType appType,
             String user, String appName, XLog log) throws CommandException {
-        return createSlaRegistrationEvent(eSla, jobId, null, appType, user, appName, log);
+        return createSlaRegistrationEvent(eSla, jobId, null, appType, user, appName, log, false);
     }
 
     /*
@@ -158,7 +184,7 @@ public class SLAOperations {
      */
     public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, String parentId,
             AppType appType, String user, XLog log) throws CommandException {
-        return createSlaRegistrationEvent(eSla, jobId, parentId, appType, user, null, log);
+        return createSlaRegistrationEvent(eSla, jobId, parentId, appType, user, null, log, false);
     }
 
     /*
@@ -166,7 +192,7 @@ public class SLAOperations {
      */
     public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, AppType appType,
             String user, XLog log) throws CommandException {
-        return createSlaRegistrationEvent(eSla, jobId, null, appType, user, null, log);
+        return createSlaRegistrationEvent(eSla, jobId, null, appType, user, null, log, false);
     }
 
     private static String getTagElement(Element elem, String tagName) {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java Mon Jun 10 04:37:20 2013
@@ -42,9 +42,10 @@ import org.json.simple.JSONObject;
 
 @Entity
 @NamedQueries({
- @NamedQuery(name = "GET_SLA_REG_ALL", query = "select w.jobId, w.appType, w.appName, w.user, w.nominalTimeTS," +
-        "w.expectedStartTS, w.expectedEndTS, w.expectedDuration, w.jobData, w.parentId, w.notificationMsg," +
-        "w.upstreamApps, w.slaConfig from SLARegistrationBean w where w.jobId = :id") })
+
+ @NamedQuery(name = "GET_SLA_REG_ON_RESTART", query = "select w.notificationMsg, w.upstreamApps, w.slaConfig, w.jobData from SLARegistrationBean w where w.jobId = :id"),
+
+ @NamedQuery(name = "GET_SLA_REG_ALL", query = "select OBJECT(w) from SLARegistrationBean w where w.jobId = :id") })
 public class SLARegistrationBean extends JsonSLARegistrationEvent implements Writable {
 
     @Basic

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java Mon Jun 10 04:37:20 2013
@@ -42,7 +42,15 @@ import org.json.simple.JSONObject;
 @Entity
 @Table(name = "SLA_SUMMARY")
 @NamedQueries({
- @NamedQuery(name = "GET_SLA_SUMMARY", query = "select OBJECT(w) from SLASummaryBean w where w.jobId = :id") })
+
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_SLA_STATUS", query = "update  SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"),
+
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES", query = "update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.jobStatus = :jobStatus, w.lastModifiedTS = :lastModifiedTS, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration where w.jobId = :jobId"),
+
+ @NamedQuery(name = "GET_SLA_SUMMARY", query = "select OBJECT(w) from SLASummaryBean w where w.jobId = :id"),
+
+ @NamedQuery(name = "GET_SLA_SUMMARY_RECORDS_RESTART", query = "select OBJECT(w) from SLASummaryBean w where w.eventProcessed <= 7 AND w.lastModifiedTS >= :lastModifiedTime") })
+
 /**
  * Class to store all the SLA related details (summary) per job
  */
@@ -114,8 +122,8 @@ public class SLASummaryBean implements J
 
     @Basic
     @Index
-    @Column(name = "sla_processed")
-    private byte slaProcessed = 0;
+    @Column(name = "event_processed")
+    private byte eventProcessed = 0;
 
     @Basic
     @Index
@@ -129,16 +137,21 @@ public class SLASummaryBean implements J
         SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
         setJobId(slaCalc.getId());
         setAppName(reg.getAppName());
+        setAppType(reg.getAppType());
+        setNominalTime(reg.getNominalTime());
         setExpectedStart(reg.getExpectedStart());
         setExpectedEnd(reg.getExpectedEnd());
         setExpectedDuration(reg.getExpectedDuration());
         setJobStatus(slaCalc.getJobStatus());
         setSLAStatus(slaCalc.getSLAStatus());
         setEventStatus(slaCalc.getEventStatus());
-        setSlaProcessed(slaCalc.getSlaProcessed());
         setLastModifiedTime(slaCalc.getLastModifiedTime());
         setUser(reg.getUser());
         setParentId(reg.getParentId());
+        setEventProcessed(slaCalc.getEventProcessed());
+        setActualDuration(slaCalc.getActualDuration());
+        setActualEnd(slaCalc.getActualEnd());
+        setActualStart(slaCalc.getActualStart());
     }
 
     public String getJobId() {
@@ -234,6 +247,14 @@ public class SLASummaryBean implements J
         return (slaStatus != null ? SLAEvent.SLAStatus.valueOf(slaStatus) : null);
     }
 
+    public String getSLAStatusString() {
+        return slaStatus;
+    }
+
+    public String getEventStatusString() {
+        return eventStatus;
+    }
+
     public void setSLAStatus(SLAEvent.SLAStatus stage) {
         this.slaStatus = (stage != null ? stage.name() : null);
     }
@@ -262,12 +283,12 @@ public class SLASummaryBean implements J
         this.appType = appType.toString();
     }
 
-    public byte getSlaProcessed() {
-        return slaProcessed;
+    public byte getEventProcessed() {
+        return eventProcessed;
     }
 
-    public void setSlaProcessed(int slaProcessed) {
-        this.slaProcessed = (byte) slaProcessed;
+    public void setEventProcessed(int eventProcessed) {
+        this.eventProcessed = (byte)eventProcessed;
     }
 
     public Date getLastModifiedTime() {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAEmailEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAEmailEventListener.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAEmailEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAEmailEventListener.java Mon Jun 10 04:37:20 2013
@@ -309,9 +309,7 @@ public class SLAEmailEventListener exten
         printField(body, EmailField.EXPECTED_END_TIME.toString(), event.getExpectedEnd(), false);
         printField(body, EmailField.ACTUAL_END_TIME.toString(), event.getActualEnd(), false);
         printField(body, EmailField.EXPECTED_DURATION.toString(), event.getExpectedDuration(), false);
-        long actualDuration = (event.getActualEnd() != null && event.getActualStart() != null) ? (event.getActualEnd()
-                .getTime() - event.getActualStart().getTime()) / (1000 * 60) : -1;
-        printField(body, EmailField.ACTUAL_DURATION.toString(), actualDuration, false);
+        printField(body, EmailField.ACTUAL_DURATION.toString(), event.getActualDuration(), false);
 
         try {
             msg.setText(body.toString());

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java Mon Jun 10 04:37:20 2013
@@ -18,18 +18,25 @@
 package org.apache.oozie.sla.service;
 
 import java.util.Date;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
 import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.SchedulerService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLACalcStatus;
 import org.apache.oozie.sla.SLACalculator;
 import org.apache.oozie.sla.SLACalculatorMemory;
 import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.sla.SLASummaryBean;
 import org.apache.oozie.util.XLog;
 import com.google.common.annotations.VisibleForTesting;
 
@@ -39,11 +46,13 @@ public class SLAService implements Servi
     public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl";
     public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
     public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events";
+    public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after";
 
     private static SLACalculator calcImpl;
     private static boolean slaEnabled = false;
     private EventHandlerService eventHandler;
     public static XLog LOG;
+    private JPAService jpaService;
 
     @Override
     public void init(Services services) throws ServiceException {
@@ -130,6 +139,21 @@ public class SLAService implements Servi
         return false;
     }
 
+    public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
+        try {
+            if (calcImpl.updateRegistration(reg.getId(), reg)) {
+                return true;
+            }
+            else {
+                LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId());
+            }
+        }
+        catch (JPAExecutorException ex) {
+            LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex);
+        }
+        return false;
+    }
+
     public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime)
             throws ServiceException {
         try {

Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Mon Jun 10 04:37:20 2013
@@ -1905,4 +1905,12 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.sla.service.SLAService.events.modified.after</name>
+        <value>7</value>
+        <description> Retrive records which have been modified in last x amount of days (x is 7 by default)
+        </description>
+    </property>
+
+
 </configuration>

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java Mon Jun 10 04:37:20 2013
@@ -180,6 +180,8 @@ public class TestJMSSLAEventListener ext
         durationMissBean.setExpectedEnd(expectedEndDate);
         durationMiss.setActualEnd(actualEndDate);
         durationMissBean.setExpectedDuration(expectedDuration);
+        long actualDuration = actualEndDate.getTime() - actualStartDate.getTime();
+        durationMiss.setActualDuration(actualDuration);
 
         ConnectionContext jmsContext = getConnectionContext();
         Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
@@ -202,7 +204,7 @@ public class TestJMSSLAEventListener ext
         assertEquals(expectedEndDate, durationMissMsg.getExpectedEndTime());
         assertEquals(actualEndDate, durationMissMsg.getActualEndTime());
         assertEquals(expectedDuration, durationMissMsg.getExpectedDuration());
-        assertEquals((actualEndDate.getTime() - actualStartDate.getTime()) / (1000 * 60), durationMissMsg.getActualDuration());
+        assertEquals(actualDuration, durationMissMsg.getActualDuration());
         assertEquals("notification of duration miss", durationMissMsg.getNotificationMessage());
     }
 
@@ -363,6 +365,8 @@ public class TestJMSSLAEventListener ext
         durationMetBean.setExpectedEnd(expectedEndDate);
         durationMet.setActualEnd(actualEndDate);
         durationMetBean.setExpectedDuration(expectedDuration);
+        long actualDuration = actualEndDate.getTime() - actualStartDate.getTime();
+        durationMet.setActualDuration(actualDuration);
 
         ConnectionContext jmsContext = getConnectionContext();
 
@@ -386,7 +390,7 @@ public class TestJMSSLAEventListener ext
         assertEquals(expectedEndDate, durationMissMsg.getExpectedEndTime());
         assertEquals(actualEndDate, durationMissMsg.getActualEndTime());
         assertEquals(expectedDuration, durationMissMsg.getExpectedDuration());
-        assertEquals((actualEndDate.getTime() - actualStartDate.getTime()) / (1000 * 60), durationMissMsg.getActualDuration());
+        assertEquals(actualDuration, durationMissMsg.getActualDuration());
         assertEquals("notification of duration met", durationMissMsg.getNotificationMessage());
     }
 }

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java Mon Jun 10 04:37:20 2013
@@ -183,7 +183,6 @@ public class TestV2SLAServlet extends Da
             bean.setActualEnd(actualEnd.getTime());
             bean.setExpectedDuration(10);
             bean.setActualDuration(15);
-            bean.setSlaProcessed((byte) 1);
             bean.setUser("testuser");
             bean.setLastModifiedTime(currentTime);
             list.add(bean);

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -101,7 +101,6 @@ public class TestSLACalculationJPAExecut
         assertEquals(actStart, sBean.getActualStart());
         assertEquals(actEnd, sBean.getActualEnd());
         assertEquals(2000, sBean.getActualDuration());
-        assertEquals(1, sBean.getSlaProcessed());
         assertEquals(actEnd, sBean.getLastModifiedTime());
 
     }
@@ -153,7 +152,6 @@ public class TestSLACalculationJPAExecut
         assertEquals(actStart, sBean.getActualStart());
         assertEquals(newDate, sBean.getActualEnd());
         assertEquals(2000, sBean.getActualDuration());
-        assertEquals(1, sBean.getSlaProcessed());
         assertEquals(newDate, sBean.getLastModifiedTime());
 
     }
@@ -183,7 +181,6 @@ public class TestSLACalculationJPAExecut
         bean1 = new SLASummaryBean();
         bean1.setJobId(wfId1);
         bean1.setActualEnd(newDate);
-        bean1.setSlaProcessed(1);
         List<JsonBean> updateList = new ArrayList<JsonBean>();
         updateList.add(bean1);
 
@@ -208,7 +205,6 @@ public class TestSLACalculationJPAExecut
         SLASummaryGetJPAExecutor readCmd = new SLASummaryGetJPAExecutor(wfId1);
         SLASummaryBean sBean = jpaService.execute(readCmd);
         // isSlaProcessed should NOT be changed to 1
-        assertFalse(sBean.getSlaProcessed() == 1);
         // actualEnd should be null as before
         assertNull(sBean.getActualEnd());
 
@@ -234,7 +230,6 @@ public class TestSLACalculationJPAExecut
         bean.setActualStart(aStart);
         bean.setActualEnd(aEnd);
         bean.setActualDuration(aDur);
-        bean.setSlaProcessed((byte) slaProc);
         bean.setLastModifiedTime(lastMod);
         return bean;
     }