You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/02/21 00:01:49 UTC

[3/4] oozie git commit: OOZIE-1913 Devise a way to turn off SLA alerts for bundle/coordinator flexibly

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java
new file mode 100644
index 0000000..4d24388
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.coord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.coord.CoordELEvaluator;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.service.SLAService;
+import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.Pair;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+
+public class CoordSLAChangeXCommand extends CoordSLAAlertsXCommand {
+
+    Map<String, String> newParams;
+
+    public CoordSLAChangeXCommand(String jobId, String actions, String dates, Map<String, String> newParams) {
+        super(jobId, "SLA.alerts.change", "SLA.alerts.change", actions, dates);
+        this.newParams = newParams;
+    }
+
+    @Override
+    protected boolean executeSlaCommand() throws ServiceException, CommandException {
+        try {
+            List<Pair<String, Map<String, String>>> idSlaDefinitionList = new ArrayList<Pair<String, Map<String, String>>>();
+            List<CoordinatorActionBean> coordinatorActionBeanList = getNotTerminatedActions();
+            Configuration conf = getJobConf();
+            for (CoordinatorActionBean coordAction : coordinatorActionBeanList) {
+                Map<String, String> slaDefinitionMap = new HashMap<String, String>(newParams);
+                for (String key : slaDefinitionMap.keySet()) {
+                    Element eAction = XmlUtils.parseXml(coordAction.getActionXml().toString());
+                    ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(eAction, coordAction, conf);
+                    String updateValue = CoordELFunctions.evalAndWrap(evalSla, slaDefinitionMap.get(key));
+                    slaDefinitionMap.put(key, updateValue);
+                }
+                idSlaDefinitionList.add(new Pair<String, Map<String, String>>(coordAction.getId(), slaDefinitionMap));
+            }
+            return Services.get().get(SLAService.class).changeDefinition(idSlaDefinitionList);
+        }
+        catch (Exception e) {
+            throw new CommandException(ErrorCode.E1027, e.getMessage(), e);
+        }
+
+    }
+
+    @Override
+    protected void updateJob() throws CommandException {
+        if (isJobRequest()) {
+            updateJobSLA(newParams);
+        }
+    }
+
+    private List<CoordinatorActionBean> getNotTerminatedActions() throws JPAExecutorException {
+        if (isJobRequest()) {
+            return CoordActionQueryExecutor.getInstance().getList(
+                    CoordActionQuery.GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE, getJobId());
+        }
+        else {
+            return CoordActionQueryExecutor.getInstance().getList(
+                    CoordActionQuery.GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE, getActionList());
+        }
+
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+        validateSLAChangeParam(newParams);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
index 4643d73..90050b3 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
@@ -20,11 +20,14 @@ package org.apache.oozie.coord;
 
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
@@ -35,15 +38,19 @@ import org.apache.oozie.command.CommandException;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogService;
+import org.apache.oozie.sla.SLAOperations;
 import org.apache.oozie.util.CoordActionsInDateRange;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
 import org.jdom.Element;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class CoordUtils {
     public static final String HADOOP_USER = "user.name";
 
@@ -92,7 +99,8 @@ public class CoordUtils {
      * @return the list of Coordinator actions for the date range
      * @throws CommandException thrown if failed to get coordinator actions by given date range
      */
-    static List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, boolean active)
+    @VisibleForTesting
+    public static List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, boolean active)
             throws CommandException {
         JPAService jpaService = Services.get().get(JPAService.class);
         ParamChecker.notEmpty(jobId, "jobId");
@@ -132,7 +140,12 @@ public class CoordUtils {
                     throw new CommandException(ErrorCode.E0302, s.trim(), e);
                 }
                 catch (JPAExecutorException e) {
-                    throw new CommandException(e);
+                    if (e.getErrorCode() == ErrorCode.E0605) {
+                        XLog.getLog(CoordUtils.class).info("No action for nominal time:" + s + ". Skipping over");
+                    }
+                    else {
+                        throw new CommandException(e);
+                    }
                 }
 
             }
@@ -145,16 +158,7 @@ public class CoordUtils {
         return coordActions;
     }
 
-    /**
-     * Get the list of actions for given id ranges
-     *
-     * @param jobId coordinator job id
-     * @param scope a comma-separated list of action ranges. The action range is specified with two action numbers separated by '-'
-     * @return the list of all Coordinator actions for action range
-     * @throws CommandException thrown if failed to get coordinator actions by given id range
-     */
-     public static List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException {
-        JPAService jpaService = Services.get().get(JPAService.class);
+    public static Set<String> getActionsIds(String jobId, String scope) throws CommandException {
         ParamChecker.notEmpty(jobId, "jobId");
         ParamChecker.notEmpty(scope, "scope");
 
@@ -202,6 +206,21 @@ public class CoordUtils {
                 actions.add(jobId + "@" + s);
             }
         }
+        return actions;
+    }
+
+    /**
+     * Get the list of actions for given id ranges
+     *
+     * @param jobId coordinator job id
+     * @param scope a comma-separated list of action ranges. The action range is specified with two action numbers separated by '-'
+     * @return the list of all Coordinator actions for action range
+     * @throws CommandException thrown if failed to get coordinator actions by given id range
+     */
+     @VisibleForTesting
+     public static List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        Set<String> actions = getActionsIds(jobId, scope);
         // Retrieve the actions using the corresponding actionIds
         List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
         for (String id : actions) {
@@ -225,4 +244,107 @@ public class CoordUtils {
         return coordActions;
     }
 
+     /**
+      * Check if sla alert is disabled for action.
+      * @param actionBean
+      * @param coordName
+      * @param jobConf
+      * @return
+      * @throws ParseException
+      */
+    public static boolean isSlaAlertDisabled(CoordinatorActionBean actionBean, String coordName, Configuration jobConf)
+            throws ParseException {
+
+        int disableSlaNotificationOlderThan = jobConf.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN,
+                ConfigurationService.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN));
+
+        if (disableSlaNotificationOlderThan > 0) {
+            // Disable alert for catchup jobs
+            long timeDiffinHrs = TimeUnit.MILLISECONDS.toHours(new Date().getTime()
+                    - actionBean.getNominalTime().getTime());
+            if (timeDiffinHrs > jobConf.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN,
+                    ConfigurationService.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN))) {
+                return true;
+            }
+        }
+
+        boolean disableAlert = false;
+        if (jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD) != null) {
+            String coords = jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD);
+            Set<String> coordsToDisableFor = new HashSet<String>(Arrays.asList(coords.split(",")));
+            if (coordsToDisableFor.contains(coordName)) {
+                return true;
+            }
+            if (coordsToDisableFor.contains(actionBean.getJobId())) {
+                return true;
+            }
+        }
+
+        // Check if sla alert is disabled for that action
+        if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_DISABLE_ALERT))
+                && getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_DISABLE_ALERT)) {
+            return true;
+        }
+
+        // Check if sla alert is enabled for that action
+        if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_ENABLE_ALERT))
+                && getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_ENABLE_ALERT)) {
+            return false;
+        }
+
+        return disableAlert;
+    }
+
+    /**
+     * Get coord action SLA alert status.
+     * @param actionBean
+     * @param coordName
+     * @param jobConf
+     * @param slaAlertType
+     * @return
+     * @throws ParseException
+     */
+    private static boolean getCoordActionSLAAlertStatus(CoordinatorActionBean actionBean, String coordName,
+            Configuration jobConf, String slaAlertType) throws ParseException {
+        String slaAlertList;
+
+       if (!StringUtils.isEmpty(jobConf.get(slaAlertType))) {
+            slaAlertList = jobConf.get(slaAlertType);
+            // check if ALL or date/action-num range
+            if (slaAlertList.equalsIgnoreCase(SLAOperations.ALL_VALUE)) {
+                return true;
+            }
+            String[] values = slaAlertList.split(",");
+            for (String value : values) {
+                value = value.trim();
+                if (value.contains("::")) {
+                    String[] datesInRange = value.split("::");
+                    Date start = DateUtils.parseDateOozieTZ(datesInRange[0].trim());
+                    Date end = DateUtils.parseDateOozieTZ(datesInRange[1].trim());
+                    // check if nominal time in this range
+                    if (actionBean.getNominalTime().compareTo(start) >= 0
+                            || actionBean.getNominalTime().compareTo(end) <= 0) {
+                        return true;
+                    }
+                }
+                else if (value.contains("-")) {
+                    String[] actionsInRange = value.split("-");
+                    int start = Integer.parseInt(actionsInRange[0].trim());
+                    int end = Integer.parseInt(actionsInRange[1].trim());
+                    // check if action number in this range
+                    if (actionBean.getActionNumber() >= start || actionBean.getActionNumber() <= end) {
+                        return true;
+                    }
+                }
+                else {
+                    int actionNumber = Integer.parseInt(value.trim());
+                    if (actionBean.getActionNumber() == actionNumber) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index e6ab09b..c6a60a1 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -28,6 +28,7 @@ import javax.persistence.Query;
 
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.StringBlob;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 
@@ -51,7 +52,12 @@ public class CoordActionQueryExecutor extends
         GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID,
         GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME,
         GET_COORD_ACTIONS_STATUS_UNIGNORED,
-        GET_COORD_ACTIONS_PENDING_COUNT
+        GET_COORD_ACTIONS_PENDING_COUNT,
+        GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE,
+        GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE,
+        GET_TERMINATED_ACTIONS_FOR_DATES,
+        GET_TERMINATED_ACTION_IDS_FOR_DATES,
+        GET_ACTIVE_ACTIONS_FOR_DATES
     };
 
     private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
@@ -180,6 +186,19 @@ public class CoordActionQueryExecutor extends
             case GET_COORD_ACTIONS_PENDING_COUNT:
                 query.setParameter("jobId", parameters[0]);
                 break;
+            case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE:
+            query.setParameter("ids", parameters[0]);
+            break;
+            case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE:
+            query.setParameter("jobId", parameters[0]);
+            break;
+            case GET_TERMINATED_ACTIONS_FOR_DATES:
+            case GET_TERMINATED_ACTION_IDS_FOR_DATES:
+            case GET_ACTIVE_ACTIONS_FOR_DATES:
+                query.setParameter("jobId", parameters[0]);
+                query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime()));
+                query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime()));
+                break;
 
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
@@ -247,6 +266,33 @@ public class CoordActionQueryExecutor extends
                 bean.setStatusStr((String)arr[0]);
                 bean.setPending((Integer)arr[1]);
                 break;
+            case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE:
+            case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE:
+                arr = (Object[]) ret;
+                bean = new CoordinatorActionBean();
+                bean.setId((String)arr[0]);
+                bean.setNominalTime((Timestamp)arr[1]);
+                bean.setCreatedTime((Timestamp)arr[2]);
+                bean.setActionXmlBlob((StringBlob)arr[3]);
+                break;
+            case GET_TERMINATED_ACTIONS_FOR_DATES:
+                bean = (CoordinatorActionBean) ret;
+                break;
+            case GET_TERMINATED_ACTION_IDS_FOR_DATES:
+                bean = new CoordinatorActionBean();
+                bean.setId((String) ret);
+                break;
+            case GET_ACTIVE_ACTIONS_FOR_DATES:
+                arr = (Object[]) ret;
+                bean = new CoordinatorActionBean();
+                bean.setId((String)arr[0]);
+                bean.setJobId((String)arr[1]);
+                bean.setStatusStr((String) arr[2]);
+                bean.setExternalId((String) arr[3]);
+                bean.setPending((Integer) arr[4]);
+                bean.setNominalTime((Timestamp) arr[5]);
+                bean.setCreatedTime((Timestamp) arr[6]);
+                break;
 
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java
deleted file mode 100644
index 1862c7c..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Load coordinator action ids by date range.
- */
-public class CoordJobGetActionIdsForDateRangeJPAExecutor implements JPAExecutor<List<String>> {
-
-    private String jobId = null;
-    private Date startDate, endDate;
-
-    public CoordJobGetActionIdsForDateRangeJPAExecutor(String jobId, Date startDate, Date endDate) {
-        ParamChecker.notNull(jobId, "jobId");
-        this.jobId = jobId;
-        this.startDate = startDate;
-        this.endDate = endDate;
-    }
-
-    @Override
-    public String getName() {
-        return "CoordJobGetActionIdsForDateRangeJPAExecutor";
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public List<String> execute(EntityManager em) throws JPAExecutorException {
-        try {
-            Query q = em.createNamedQuery("GET_ACTION_IDS_FOR_DATES");
-            q.setParameter("jobId", jobId);
-            q.setParameter("startTime", new Timestamp(startDate.getTime()));
-            q.setParameter("endTime", new Timestamp(endDate.getTime()));
-            List<String> coordActionIds= q.getResultList();
-            return coordActionIds;
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java
deleted file mode 100644
index eb95591..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Load non-terminal coordinator actions by dates.
- */
-public class CoordJobGetActionsByDatesForKillJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> {
-
-    private String jobId = null;
-    private Date startDate, endDate;
-
-    public CoordJobGetActionsByDatesForKillJPAExecutor(String jobId, Date startDate, Date endDate) {
-        ParamChecker.notNull(jobId, "jobId");
-        this.jobId = jobId;
-        this.startDate = startDate;
-        this.endDate = endDate;
-    }
-
-    @Override
-    public String getName() {
-        return "CoordJobGetActionsByDatesForKillJPAExecutor";
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException {
-        List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
-        try {
-            Query q = em.createNamedQuery("GET_ACTIONS_BY_DATES_FOR_KILL");
-            q.setParameter("jobId", jobId);
-            q.setParameter("startTime", new Timestamp(startDate.getTime()));
-            q.setParameter("endTime", new Timestamp(endDate.getTime()));
-            List<Object[]> actions = q.getResultList();
-
-            for (Object[] a : actions) {
-                CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
-                actionList.add(aa);
-            }
-            return actionList;
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-    }
-
-    private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) {
-        CoordinatorActionBean action = new CoordinatorActionBean();
-        if (arr[0] != null) {
-            action.setId((String) arr[0]);
-        }
-
-        if (arr[1] != null) {
-            action.setJobId((String) arr[1]);
-        }
-
-        if (arr[2] != null) {
-            action.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
-        }
-
-        if (arr[3] != null) {
-            action.setExternalId((String) arr[3]);
-        }
-
-        if (arr[4] != null) {
-            action.setPending((Integer) arr[4]);
-        }
-
-        if (arr[5] != null) {
-            action.setNominalTime(DateUtils.toDate((Timestamp) arr[5]));
-        }
-
-        if (arr[6] != null) {
-            action.setCreatedTime(DateUtils.toDate((Timestamp) arr[6]));
-        }
-        return action;
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java
deleted file mode 100644
index d1856ae..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Load coordinator actions by dates.
- */
-public class CoordJobGetActionsForDatesJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> {
-
-    private String jobId = null;
-    private Date startDate, endDate;
-
-    public CoordJobGetActionsForDatesJPAExecutor(String jobId, Date startDate, Date endDate) {
-        ParamChecker.notNull(jobId, "jobId");
-        this.jobId = jobId;
-        this.startDate = startDate;
-        this.endDate = endDate;
-    }
-
-    @Override
-    public String getName() {
-        return "CoordJobGetActionsForDatesJPAExecutor";
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException {
-        List<CoordinatorActionBean> actions;
-        try {
-            Query q = em.createNamedQuery("GET_ACTIONS_FOR_DATES");
-            q.setParameter("jobId", jobId);
-            q.setParameter("startTime", new Timestamp(startDate.getTime()));
-            q.setParameter("endTime", new Timestamp(endDate.getTime()));
-            actions = q.getResultList();
-            return actions;
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index 4bccef4..1518686 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -26,7 +26,6 @@ import java.util.List;
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
-import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.StringBlob;
@@ -53,6 +52,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
         UPDATE_COORD_JOB_STATUS_PENDING_TIME,
         UPDATE_COORD_JOB_MATERIALIZE,
         UPDATE_COORD_JOB_CHANGE,
+        UPDATE_COORD_JOB_CONF,
+        UPDATE_COORD_JOB_XML,
         GET_COORD_JOB,
         GET_COORD_JOB_USER_APPNAME,
         GET_COORD_JOB_INPUT_CHECK,
@@ -63,9 +64,13 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
         GET_COORD_JOB_STATUS,
         GET_COORD_JOB_STATUS_PARENTID,
         GET_COORD_JOBS_CHANGED,
-        GET_COORD_JOBS_OLDER_FOR_MATERILZATION,
+        GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION,
         GET_COORD_FOR_ABANDONEDCHECK,
-        GET_COORD_IDS_FOR_STATUS_TRANSIT
+        GET_COORD_IDS_FOR_STATUS_TRANSIT,
+        GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID,
+        GET_COORD_JOBS_WITH_PARENT_ID,
+        GET_COORD_JOB_CONF,
+        GET_COORD_JOB_XML
     };
 
     private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor();
@@ -177,6 +182,15 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
                 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
                 query.setParameter("id", cjBean.getId());
                 break;
+            case UPDATE_COORD_JOB_CONF:
+                query.setParameter("conf", cjBean.getConfBlob());
+                query.setParameter("id", cjBean.getId());
+                break;
+            case UPDATE_COORD_JOB_XML:
+                query.setParameter("jobXml", cjBean.getJobXmlBlob());
+                query.setParameter("id", cjBean.getId());
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
                         + namedQuery.name());
@@ -198,12 +212,14 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
             case GET_COORD_JOB_SUSPEND_KILL:
             case GET_COORD_JOB_STATUS:
             case GET_COORD_JOB_STATUS_PARENTID:
+            case GET_COORD_JOB_CONF:
+            case GET_COORD_JOB_XML:
                 query.setParameter("id", parameters[0]);
                 break;
             case GET_COORD_JOBS_CHANGED:
                 query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime()));
                 break;
-            case GET_COORD_JOBS_OLDER_FOR_MATERILZATION:
+            case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION:
                 query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime()));
                 int limit = (Integer) parameters[1];
                 if (limit > 0) {
@@ -218,7 +234,13 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
             case GET_COORD_IDS_FOR_STATUS_TRANSIT:
                 query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
                 break;
-
+            case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID:
+                query.setParameter("appName", parameters[0]);
+                query.setParameter("bundleId", parameters[1]);
+                break;
+            case GET_COORD_JOBS_WITH_PARENT_ID:
+                query.setParameter("parentId", parameters[0]);
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
                         + namedQuery.name());
@@ -335,7 +357,15 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
             case GET_COORD_JOBS_CHANGED:
                 bean = (CoordinatorJobBean) ret;
                 break;
-            case GET_COORD_JOBS_OLDER_FOR_MATERILZATION:
+            case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION:
+                bean = new CoordinatorJobBean();
+                bean.setId((String) ret);
+                break;
+            case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID:
+                bean = new CoordinatorJobBean();
+                bean.setId((String) ret);
+                break;
+            case GET_COORD_JOBS_WITH_PARENT_ID:
                 bean = new CoordinatorJobBean();
                 bean.setId((String) ret);
                 break;
@@ -347,11 +377,18 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
                 bean.setGroup((String) arr[2]);
                 bean.setAppName((String) arr[3]);
                 break;
-
             case GET_COORD_IDS_FOR_STATUS_TRANSIT:
                 bean = new CoordinatorJobBean();
                 bean.setId((String) ret);
                 break;
+            case GET_COORD_JOB_CONF:
+                bean = new CoordinatorJobBean();
+                bean.setConfBlob((StringBlob) ret);
+                break;
+            case GET_COORD_JOB_XML:
+                bean = new CoordinatorJobBean();
+                bean.setJobXmlBlob((StringBlob) ret);
+                break;
 
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
index 5e018c7..6d13ed1 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
@@ -56,7 +56,7 @@ public class CoordJobsToBeMaterializedJPAExecutor implements JPAExecutor<List<Co
     public List<CoordinatorJobBean> execute(EntityManager em) throws JPAExecutorException {
         List<CoordinatorJobBean> cjBeans;
         try {
-            Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_FOR_MATERILZATION");
+            Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION");
             q.setParameter("matTime", new Timestamp(this.dateInput.getTime()));
             if (limit > 0) {
                 q.setMaxResults(limit);

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
index e220c01..bded634 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
@@ -18,6 +18,8 @@
 
 package org.apache.oozie.executor.jpa;
 
+import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.List;
 
 import javax.persistence.EntityManager;
@@ -36,8 +38,13 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
 
     public enum SLARegQuery {
         UPDATE_SLA_REG_ALL,
+        UPDATE_SLA_CONFIG,
+        UPDATE_SLA_EXPECTED_VALUE,
         GET_SLA_REG_ALL,
-        GET_SLA_REG_ON_RESTART
+        GET_SLA_EXPECTED_VALUE_CONFIG,
+        GET_SLA_REG_FOR_PARENT_ID,
+        GET_SLA_REG_ON_RESTART,
+        GET_SLA_CONFIGS
     };
 
     private static SLARegistrationQueryExecutor instance = new SLARegistrationQueryExecutor();
@@ -70,6 +77,17 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
                 query.setParameter("parentId", bean.getParentId());
                 query.setParameter("jobData", bean.getJobData());
                 break;
+            case UPDATE_SLA_EXPECTED_VALUE:
+                query.setParameter("jobId", bean.getId());
+                query.setParameter("expectedStartTime", bean.getExpectedStartTimestamp());
+                query.setParameter("expectedEndTime", bean.getExpectedEndTimestamp());
+                query.setParameter("expectedDuration", bean.getExpectedDuration());
+                break;
+            case UPDATE_SLA_CONFIG:
+                query.setParameter("jobId", bean.getId());
+                query.setParameter("slaConfig", bean.getSlaConfig());
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
                         + namedQuery.name());
@@ -86,6 +104,16 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
             case GET_SLA_REG_ON_RESTART:
                 query.setParameter("id", parameters[0]);
                 break;
+            case GET_SLA_CONFIGS:
+                query.setParameter("ids", parameters[0]);
+                break;
+            case GET_SLA_EXPECTED_VALUE_CONFIG:
+                query.setParameter("id", parameters[0]);
+                break;
+            case GET_SLA_REG_FOR_PARENT_ID:
+                query.setParameter("parentId", parameters[0]);
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
                         + namedQuery.name());
@@ -120,9 +148,13 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
         JPAService jpaService = Services.get().get(JPAService.class);
         EntityManager em = jpaService.getEntityManager();
         Query query = getSelectQuery(namedQuery, em, parameters);
-        @SuppressWarnings("unchecked")
-        List<SLARegistrationBean> beanList = (List<SLARegistrationBean>) jpaService.executeGetList(namedQuery.name(),
-                query, em);
+        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
+        List<SLARegistrationBean> beanList = new ArrayList<SLARegistrationBean>();
+        if (retList != null) {
+            for (Object ret : retList) {
+                beanList.add(constructBean(namedQuery, ret));
+            }
+        }
         return beanList;
     }
 
@@ -145,6 +177,28 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
                 bean.setSlaConfig((String) arr[2]);
                 bean.setJobData((String) arr[3]);
                 break;
+            case GET_SLA_CONFIGS:
+                bean = new SLARegistrationBean();
+                arr = (Object[]) ret;
+                bean.setId((String) arr[0]);
+                bean.setSlaConfig((String) arr[1]);
+                break;
+            case GET_SLA_EXPECTED_VALUE_CONFIG:
+                bean = new SLARegistrationBean();
+                arr = (Object[]) ret;
+                bean.setId((String) arr[0]);
+                bean.setSlaConfig((String) arr[1]);
+                bean.setExpectedStart((Timestamp)arr[2]);
+                bean.setExpectedEnd((Timestamp)arr[3]);
+                bean.setExpectedDuration((Long)arr[4]);
+                bean.setNominalTime((Timestamp)arr[5]);
+                break;
+            case GET_SLA_REG_FOR_PARENT_ID:
+                bean = new SLARegistrationBean();
+                arr = (Object[]) ret;
+                bean.setId((String) arr[0]);
+                bean.setSlaConfig((String) arr[1]);
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
index c3197b7..0057c89 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.executor.jpa;
 
+import java.sql.Timestamp;
 import java.util.List;
 
 import javax.persistence.EntityManager;
@@ -39,8 +40,11 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
         UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES,
         UPDATE_SLA_SUMMARY_ALL,
         UPDATE_SLA_SUMMARY_EVENTPROCESSED,
+        UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
+        UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME,
         GET_SLA_SUMMARY,
-        GET_SLA_SUMMARY_EVENTPROCESSED
+        GET_SLA_SUMMARY_EVENTPROCESSED,
+        GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED
     };
 
     private static SLASummaryQueryExecutor instance = new SLASummaryQueryExecutor();
@@ -95,10 +99,24 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
                 query.setParameter("actualStartTS", bean.getActualStartTimestamp());
                 query.setParameter("jobId", bean.getId());
                 break;
+            case UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES:
+                query.setParameter("nominalTime", bean.getNominalTimestamp());
+                query.setParameter("expectedStartTime", bean.getExpectedStartTimestamp());
+                query.setParameter("expectedEndTime", bean.getExpectedEndTimestamp());
+                query.setParameter("expectedDuration", bean.getExpectedDuration());
+                query.setParameter("lastModTime", bean.getLastModifiedTimestamp());
+                query.setParameter("jobId", bean.getId());
+                break;
+
             case UPDATE_SLA_SUMMARY_EVENTPROCESSED:
                 query.setParameter("eventProcessed", bean.getEventProcessed());
                 query.setParameter("jobId", bean.getId());
                 break;
+            case UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME:
+                query.setParameter("lastModifiedTS", bean.getLastModifiedTime());
+                query.setParameter("jobId", bean.getId());
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
                         + namedQuery.name());
@@ -113,6 +131,7 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
         switch (namedQuery) {
             case GET_SLA_SUMMARY:
             case GET_SLA_SUMMARY_EVENTPROCESSED:
+            case GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED:
                 query.setParameter("id", parameters[0]);
                 break;
         }
@@ -174,6 +193,14 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
                 bean = new SLASummaryBean();
                 bean.setEventProcessed(((Byte)ret).intValue());
                 break;
+            case GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED:
+                Object[] arr = (Object[]) ret;
+                bean = new SLASummaryBean();
+                bean.setEventProcessed((Byte)arr[0]);
+                bean.setLastModifiedTime((Timestamp)arr[1]);
+
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index fa16d1d..1cbd474 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -160,7 +160,7 @@ public class CoordMaterializeTriggerService implements Service {
                 throws JPAExecutorException {
             try {
                 List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList(
-                        CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERILZATION, currDate, limit);
+                        CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION, currDate, limit);
                 LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
                         + ", Num jobs to materialize = " + materializeJobs.size());
                 for (CoordinatorJobBean coordJob : materializeJobs) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
index 7c0d3be..22c6fb0 100644
--- a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
+++ b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
@@ -19,32 +19,32 @@
 package org.apache.oozie.service;
 
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
-import org.apache.oozie.event.BundleJobEvent;
-import org.apache.oozie.event.CoordinatorActionEvent;
-import org.apache.oozie.event.CoordinatorJobEvent;
 import org.apache.oozie.client.event.Event;
 import org.apache.oozie.client.event.Event.MessageType;
 import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.event.BundleJobEvent;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.CoordinatorJobEvent;
 import org.apache.oozie.event.EventQueue;
 import org.apache.oozie.event.MemoryEventQueue;
 import org.apache.oozie.event.WorkflowActionEvent;
 import org.apache.oozie.event.WorkflowJobEvent;
 import org.apache.oozie.event.listener.JobEventListener;
 import org.apache.oozie.sla.listener.SLAEventListener;
-import org.apache.oozie.client.event.SLAEvent;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.XLog;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Service class that handles the events system - creating events queue,
  * managing configured properties and managing and invoking various event

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
index 5690787..a581f8b 100644
--- a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
@@ -177,6 +177,27 @@ public abstract class BaseJobServlet extends JsonRestServlet {
             startCron();
             sendJsonResponse(response, HttpServletResponse.SC_OK, json);
         }
+        else if (action.equals(RestConstants.SLA_ENABLE_ALERT)) {
+            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
+            stopCron();
+            slaEnableAlert(request, response);
+            startCron();
+            response.setStatus(HttpServletResponse.SC_OK);
+        }
+        else if (action.equals(RestConstants.SLA_DISABLE_ALERT)) {
+            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
+            stopCron();
+            slaDisableAlert(request, response);
+            startCron();
+            response.setStatus(HttpServletResponse.SC_OK);
+        }
+        else if (action.equals(RestConstants.SLA_CHANGE)) {
+            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
+            stopCron();
+            slaChange(request, response);
+            startCron();
+            response.setStatus(HttpServletResponse.SC_OK);
+        }
         else {
             throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
                     RestConstants.ACTION_PARAM, action);
@@ -498,4 +519,38 @@ public abstract class BaseJobServlet extends JsonRestServlet {
      */
     abstract String getJobStatus(HttpServletRequest request, HttpServletResponse response)
             throws XServletException, IOException;
+
+    /**
+     * Abstract method to enable SLA alert.
+     *
+     * @param request the request
+     * @param response the response
+     * @throws XServletException the x servlet exception
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    abstract void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException;
+
+    /**
+     * Abstract method to disable SLA alert.
+     *
+     * @param request the request
+     * @param response the response
+     * @throws XServletException the x servlet exception
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    abstract void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException;
+
+    /**
+     * Abstract method to change SLA definition.
+     *
+     * @param request the request
+     * @param response the response
+     * @throws XServletException the x servlet exception
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    abstract void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException;
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java b/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java
index 2578e41..f897652 100644
--- a/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java
@@ -31,6 +31,7 @@ import java.util.StringTokenizer;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.client.OozieClient;

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
index 3e186f9..3cb9168 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
@@ -237,6 +237,22 @@ public class V0JobServlet extends BaseJobServlet {
     @Override
     protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException,
             IOException {
-        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0");
+    }
+
+    @Override
+    void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0");
+    }
+
+    @Override
+    void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException {
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0");
+    }
+
+    @Override
+    void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0");
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
index 64b97c2..d4564c6 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
@@ -1103,4 +1103,20 @@ public class V1JobServlet extends BaseJobServlet {
             IOException {
         throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
     }
+
+    @Override
+    void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+    }
+
+    @Override
+    void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException {
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+    }
+
+    @Override
+    void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
index 5238426..7100c98 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
@@ -149,6 +149,53 @@ public class V2JobServlet extends V1JobServlet {
 
     }
 
+    @Override
+    protected void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException {
+        String jobId = getResourceName(request);
+        String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
+        String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
+        String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM);
+        try {
+            getBaseEngine(jobId, getUser(request)).enableSLAAlert(jobId, actions, dates, childIds);
+        }
+        catch (BaseEngineException e) {
+            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
+        }
+
+    }
+
+    @Override
+    protected void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException {
+        String jobId = getResourceName(request);
+        String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
+        String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
+        String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM);
+        try {
+            getBaseEngine(jobId, getUser(request)).disableSLAAlert(jobId, actions, dates, childIds);
+        }
+        catch (BaseEngineException e) {
+            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
+        }
+    }
+
+    @Override
+    protected void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
+        String jobId = getResourceName(request);
+        String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
+        String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
+        String newParams = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
+        String coords = request.getParameter(RestConstants.COORDINATORS_PARAM);
+
+        try {
+            getBaseEngine(jobId, getUser(request)).changeSLA(jobId, actions, dates, coords, newParams);
+        }
+        catch (BaseEngineException e) {
+            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
+        }
+    }
+
     /**
      * Ignore a coordinator job/action
      *
@@ -199,21 +246,18 @@ public class V2JobServlet extends V1JobServlet {
         String status;
         String jobId = getResourceName(request);
         try {
-            if (jobId.endsWith("-B")) {
-                BundleEngine engine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
-                status = engine.getJobStatus(jobId);
-            } else if (jobId.endsWith("-W")) {
-                DagEngine engine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
-                status = engine.getJobStatus(jobId);
-            } else {
-                CoordinatorEngine engine =
-                        Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request));
-                if (jobId.contains("-C@")) {
-                    status = engine.getActionStatus(jobId);
-                } else {
-                    status = engine.getJobStatus(jobId);
-                }
+            if (jobId.endsWith("-B") || jobId.endsWith("-W")) {
+                status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId);
+            }
+            else if (jobId.contains("C@")) {
+                CoordinatorEngine engine = Services.get().get(CoordinatorEngineService.class)
+                        .getCoordinatorEngine(getUser(request));
+                status = engine.getActionStatus(jobId);
             }
+            else {
+                status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId);
+            }
+
         } catch (BaseEngineException ex) {
             throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
         }
@@ -251,7 +295,7 @@ public class V2JobServlet extends V1JobServlet {
         else if (jobId.endsWith("-B")) {
             return Services.get().get(BundleEngineService.class).getBundleEngine(user);
         }
-        else if (jobId.endsWith("-C")) {
+        else if (jobId.contains("-C")) {
             return Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(user);
         }
         else {

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java b/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java
index a0fe1b6..57170e1 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.text.ParseException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -31,15 +33,19 @@ import java.util.Set;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetForFilterJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetForFilterJPAExecutor.SLASummaryFilter;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLARegistrationBean;
 import org.apache.oozie.sla.SLASummaryBean;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.XLog;
@@ -146,7 +152,20 @@ public class V2SLAServlet extends SLAServlet {
             else {
                 XLog.getLog(getClass()).error(ErrorCode.E0610);
             }
-            return SLASummaryBean.toJSONObject(slaSummaryList, timeZoneId);
+
+            List<String> jobIds = new ArrayList<String>();
+            for(SLASummaryBean summaryBean:slaSummaryList){
+                jobIds.add(summaryBean.getId());
+            }
+            List<SLARegistrationBean> SLARegistrationList = SLARegistrationQueryExecutor.getInstance().getList(
+                    SLARegQuery.GET_SLA_CONFIGS, jobIds);
+
+            Map<String, Map<String, String>> jobIdSLAConfigMap = new HashMap<String, Map<String, String>>();
+            for(SLARegistrationBean registrationBean:SLARegistrationList){
+                jobIdSLAConfigMap.put(registrationBean.getId(), registrationBean.getSLAConfigMap());
+            }
+
+            return SLASummaryBean.toJSONObject(slaSummaryList, jobIdSLAConfigMap, timeZoneId);
         }
         catch (XException ex) {
             throw new CommandException(ex);

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index 189d5ea..0d7123a 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -20,8 +20,10 @@
 package org.apache.oozie.sla;
 
 import java.util.Date;
+import java.util.Map;
 
 import org.apache.oozie.AppType;
+import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.event.SLAEvent;
 import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.service.JobsConcurrencyService;
@@ -65,6 +67,10 @@ public class SLACalcStatus extends SLAEvent {
         reg.setAlertContact(regBean.getAlertContact());
         reg.setAlertEvents(regBean.getAlertEvents());
         reg.setJobData(regBean.getJobData());
+        if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) {
+            reg.addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
+                    regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT));
+        }
         reg.setId(summary.getId());
         reg.setAppType(summary.getAppType());
         reg.setUser(summary.getUser());
@@ -267,10 +273,14 @@ public class SLACalcStatus extends SLAEvent {
     }
 
     @Override
-    public String getSlaConfig() {
+    public String getSLAConfig() {
         return regBean.getSlaConfig();
     }
 
+    public Map<String, String> getSLAConfigMap() {
+        return regBean.getSLAConfigMap();
+    }
+
     @Override
     public MessageType getMsgType() {
         return regBean.getMsgType();

http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculator.java b/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
index 20f93b5..f238321 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
@@ -20,11 +20,14 @@ package org.apache.oozie.sla;
 
 import java.util.Date;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.util.Pair;
 
 public interface SLACalculator {
 
@@ -51,4 +54,55 @@ public interface SLACalculator {
 
     SLACalcStatus get(String jobId) throws JPAExecutorException;
 
+    /**
+     * Enable jobs sla alert.
+     *
+     * @param jobId the job ids
+     * @return true, if successful
+     * @throws JPAExecutorException the JPA executor exception
+     * @throws ServiceException the service exception
+     */
+    boolean enableAlert(List<String> jobId) throws JPAExecutorException, ServiceException;
+
+    /**
+     * Enable sla alert for child jobs.
+     * @param jobId the parent job ids
+     * @return
+     * @throws JPAExecutorException
+     * @throws ServiceException
+     */
+    boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException;
+
+    /**
+     * Disable jobs Sla alert.
+     *
+     * @param jobId the job ids
+     * @return true, if successful
+     * @throws JPAExecutorException the JPA executor exception
+     * @throws ServiceException the service exception
+     */
+    boolean disableAlert(List<String> jobId) throws JPAExecutorException, ServiceException;
+
+
+    /**
+     * Disable Sla alert for child jobs.
+     * @param jobId the parent job ids
+     * @return
+     * @throws JPAExecutorException
+     * @throws ServiceException
+     */
+    boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException;
+
+    /**
+     * Change jobs Sla definitions
+     * It takes list of pairs of jobid and key/value pairs of el evaluated sla definition.
+     * Support definition are sla-should-start, sla-should-end, sla-nominal-time and sla-max-duration.
+     *
+     * @param jobIdsSLAPair the job ids sla pair
+     * @return true, if successful
+     * @throws JPAExecutorException the JPA executor exception
+     * @throws ServiceException the service exception
+     */
+    public boolean changeDefinition(List<Pair<String, Map<String,String>>> jobIdsSLAPair ) throws JPAExecutorException,
+            ServiceException;
 }