You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/02/21 00:01:49 UTC
[3/4] oozie git commit: OOZIE-1913 Devise a way to turn off SLA
alerts for bundle/coordinator flexibly
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java
new file mode 100644
index 0000000..4d24388
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.coord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.coord.CoordELEvaluator;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.service.SLAService;
+import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.Pair;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+
+public class CoordSLAChangeXCommand extends CoordSLAAlertsXCommand {
+
+ Map<String, String> newParams;
+
+ public CoordSLAChangeXCommand(String jobId, String actions, String dates, Map<String, String> newParams) {
+ super(jobId, "SLA.alerts.change", "SLA.alerts.change", actions, dates);
+ this.newParams = newParams;
+ }
+
+ @Override
+ protected boolean executeSlaCommand() throws ServiceException, CommandException {
+ try {
+ List<Pair<String, Map<String, String>>> idSlaDefinitionList = new ArrayList<Pair<String, Map<String, String>>>();
+ List<CoordinatorActionBean> coordinatorActionBeanList = getNotTerminatedActions();
+ Configuration conf = getJobConf();
+ for (CoordinatorActionBean coordAction : coordinatorActionBeanList) {
+ Map<String, String> slaDefinitionMap = new HashMap<String, String>(newParams);
+ for (String key : slaDefinitionMap.keySet()) {
+ Element eAction = XmlUtils.parseXml(coordAction.getActionXml().toString());
+ ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(eAction, coordAction, conf);
+ String updateValue = CoordELFunctions.evalAndWrap(evalSla, slaDefinitionMap.get(key));
+ slaDefinitionMap.put(key, updateValue);
+ }
+ idSlaDefinitionList.add(new Pair<String, Map<String, String>>(coordAction.getId(), slaDefinitionMap));
+ }
+ return Services.get().get(SLAService.class).changeDefinition(idSlaDefinitionList);
+ }
+ catch (Exception e) {
+ throw new CommandException(ErrorCode.E1027, e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ protected void updateJob() throws CommandException {
+ if (isJobRequest()) {
+ updateJobSLA(newParams);
+ }
+ }
+
+ private List<CoordinatorActionBean> getNotTerminatedActions() throws JPAExecutorException {
+ if (isJobRequest()) {
+ return CoordActionQueryExecutor.getInstance().getList(
+ CoordActionQuery.GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE, getJobId());
+ }
+ else {
+ return CoordActionQueryExecutor.getInstance().getList(
+ CoordActionQuery.GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE, getActionList());
+ }
+
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ validateSLAChangeParam(newParams);
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
index 4643d73..90050b3 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
@@ -20,11 +20,14 @@ package org.apache.oozie.coord;
import java.text.ParseException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
@@ -35,15 +38,19 @@ import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogService;
+import org.apache.oozie.sla.SLAOperations;
import org.apache.oozie.util.CoordActionsInDateRange;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
import org.jdom.Element;
+import com.google.common.annotations.VisibleForTesting;
+
public class CoordUtils {
public static final String HADOOP_USER = "user.name";
@@ -92,7 +99,8 @@ public class CoordUtils {
* @return the list of Coordinator actions for the date range
* @throws CommandException thrown if failed to get coordinator actions by given date range
*/
- static List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, boolean active)
+ @VisibleForTesting
+ public static List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, boolean active)
throws CommandException {
JPAService jpaService = Services.get().get(JPAService.class);
ParamChecker.notEmpty(jobId, "jobId");
@@ -132,7 +140,12 @@ public class CoordUtils {
throw new CommandException(ErrorCode.E0302, s.trim(), e);
}
catch (JPAExecutorException e) {
- throw new CommandException(e);
+ if (e.getErrorCode() == ErrorCode.E0605) {
+ XLog.getLog(CoordUtils.class).info("No action for nominal time:" + s + ". Skipping over");
+ }
+ else {
+ throw new CommandException(e);
+ }
}
}
@@ -145,16 +158,7 @@ public class CoordUtils {
return coordActions;
}
- /**
- * Get the list of actions for given id ranges
- *
- * @param jobId coordinator job id
- * @param scope a comma-separated list of action ranges. The action range is specified with two action numbers separated by '-'
- * @return the list of all Coordinator actions for action range
- * @throws CommandException thrown if failed to get coordinator actions by given id range
- */
- public static List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException {
- JPAService jpaService = Services.get().get(JPAService.class);
+ public static Set<String> getActionsIds(String jobId, String scope) throws CommandException {
ParamChecker.notEmpty(jobId, "jobId");
ParamChecker.notEmpty(scope, "scope");
@@ -202,6 +206,21 @@ public class CoordUtils {
actions.add(jobId + "@" + s);
}
}
+ return actions;
+ }
+
+ /**
+ * Get the list of actions for given id ranges
+ *
+ * @param jobId coordinator job id
+ * @param scope a comma-separated list of action ranges. The action range is specified with two action numbers separated by '-'
+ * @return the list of all Coordinator actions for action range
+ * @throws CommandException thrown if failed to get coordinator actions by given id range
+ */
+ @VisibleForTesting
+ public static List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ Set<String> actions = getActionsIds(jobId, scope);
// Retrieve the actions using the corresponding actionIds
List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
for (String id : actions) {
@@ -225,4 +244,107 @@ public class CoordUtils {
return coordActions;
}
+ /**
+ * Check if sla alert is disabled for action.
+ * @param actionBean
+ * @param coordName
+ * @param jobConf
+ * @return
+ * @throws ParseException
+ */
+ public static boolean isSlaAlertDisabled(CoordinatorActionBean actionBean, String coordName, Configuration jobConf)
+ throws ParseException {
+
+ int disableSlaNotificationOlderThan = jobConf.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN,
+ ConfigurationService.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN));
+
+ if (disableSlaNotificationOlderThan > 0) {
+ // Disable alert for catchup jobs
+ long timeDiffinHrs = TimeUnit.MILLISECONDS.toHours(new Date().getTime()
+ - actionBean.getNominalTime().getTime());
+ if (timeDiffinHrs > jobConf.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN,
+ ConfigurationService.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN))) {
+ return true;
+ }
+ }
+
+ boolean disableAlert = false;
+ if (jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD) != null) {
+ String coords = jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD);
+ Set<String> coordsToDisableFor = new HashSet<String>(Arrays.asList(coords.split(",")));
+ if (coordsToDisableFor.contains(coordName)) {
+ return true;
+ }
+ if (coordsToDisableFor.contains(actionBean.getJobId())) {
+ return true;
+ }
+ }
+
+ // Check if sla alert is disabled for that action
+ if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_DISABLE_ALERT))
+ && getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_DISABLE_ALERT)) {
+ return true;
+ }
+
+ // Check if sla alert is enabled for that action
+ if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_ENABLE_ALERT))
+ && getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_ENABLE_ALERT)) {
+ return false;
+ }
+
+ return disableAlert;
+ }
+
+ /**
+ * Get coord action SLA alert status.
+ * @param actionBean
+ * @param coordName
+ * @param jobConf
+ * @param slaAlertType
+ * @return
+ * @throws ParseException
+ */
+ private static boolean getCoordActionSLAAlertStatus(CoordinatorActionBean actionBean, String coordName,
+ Configuration jobConf, String slaAlertType) throws ParseException {
+ String slaAlertList;
+
+ if (!StringUtils.isEmpty(jobConf.get(slaAlertType))) {
+ slaAlertList = jobConf.get(slaAlertType);
+ // check if ALL or date/action-num range
+ if (slaAlertList.equalsIgnoreCase(SLAOperations.ALL_VALUE)) {
+ return true;
+ }
+ String[] values = slaAlertList.split(",");
+ for (String value : values) {
+ value = value.trim();
+ if (value.contains("::")) {
+ String[] datesInRange = value.split("::");
+ Date start = DateUtils.parseDateOozieTZ(datesInRange[0].trim());
+ Date end = DateUtils.parseDateOozieTZ(datesInRange[1].trim());
+ // check if nominal time in this range
+ if (actionBean.getNominalTime().compareTo(start) >= 0
+ || actionBean.getNominalTime().compareTo(end) <= 0) {
+ return true;
+ }
+ }
+ else if (value.contains("-")) {
+ String[] actionsInRange = value.split("-");
+ int start = Integer.parseInt(actionsInRange[0].trim());
+ int end = Integer.parseInt(actionsInRange[1].trim());
+ // check if action number in this range
+ if (actionBean.getActionNumber() >= start || actionBean.getActionNumber() <= end) {
+ return true;
+ }
+ }
+ else {
+ int actionNumber = Integer.parseInt(value.trim());
+ if (actionBean.getActionNumber() == actionNumber) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index e6ab09b..c6a60a1 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -28,6 +28,7 @@ import javax.persistence.Query;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.StringBlob;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -51,7 +52,12 @@ public class CoordActionQueryExecutor extends
GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID,
GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME,
GET_COORD_ACTIONS_STATUS_UNIGNORED,
- GET_COORD_ACTIONS_PENDING_COUNT
+ GET_COORD_ACTIONS_PENDING_COUNT,
+ GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE,
+ GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE,
+ GET_TERMINATED_ACTIONS_FOR_DATES,
+ GET_TERMINATED_ACTION_IDS_FOR_DATES,
+ GET_ACTIVE_ACTIONS_FOR_DATES
};
private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
@@ -180,6 +186,19 @@ public class CoordActionQueryExecutor extends
case GET_COORD_ACTIONS_PENDING_COUNT:
query.setParameter("jobId", parameters[0]);
break;
+ case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE:
+ query.setParameter("ids", parameters[0]);
+ break;
+ case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE:
+ query.setParameter("jobId", parameters[0]);
+ break;
+ case GET_TERMINATED_ACTIONS_FOR_DATES:
+ case GET_TERMINATED_ACTION_IDS_FOR_DATES:
+ case GET_ACTIVE_ACTIONS_FOR_DATES:
+ query.setParameter("jobId", parameters[0]);
+ query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime()));
+ query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime()));
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
@@ -247,6 +266,33 @@ public class CoordActionQueryExecutor extends
bean.setStatusStr((String)arr[0]);
bean.setPending((Integer)arr[1]);
break;
+ case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE:
+ case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE:
+ arr = (Object[]) ret;
+ bean = new CoordinatorActionBean();
+ bean.setId((String)arr[0]);
+ bean.setNominalTime((Timestamp)arr[1]);
+ bean.setCreatedTime((Timestamp)arr[2]);
+ bean.setActionXmlBlob((StringBlob)arr[3]);
+ break;
+ case GET_TERMINATED_ACTIONS_FOR_DATES:
+ bean = (CoordinatorActionBean) ret;
+ break;
+ case GET_TERMINATED_ACTION_IDS_FOR_DATES:
+ bean = new CoordinatorActionBean();
+ bean.setId((String) ret);
+ break;
+ case GET_ACTIVE_ACTIONS_FOR_DATES:
+ arr = (Object[]) ret;
+ bean = new CoordinatorActionBean();
+ bean.setId((String)arr[0]);
+ bean.setJobId((String)arr[1]);
+ bean.setStatusStr((String) arr[2]);
+ bean.setExternalId((String) arr[3]);
+ bean.setPending((Integer) arr[4]);
+ bean.setNominalTime((Timestamp) arr[5]);
+ bean.setCreatedTime((Timestamp) arr[6]);
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java
deleted file mode 100644
index 1862c7c..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Load coordinator action ids by date range.
- */
-public class CoordJobGetActionIdsForDateRangeJPAExecutor implements JPAExecutor<List<String>> {
-
- private String jobId = null;
- private Date startDate, endDate;
-
- public CoordJobGetActionIdsForDateRangeJPAExecutor(String jobId, Date startDate, Date endDate) {
- ParamChecker.notNull(jobId, "jobId");
- this.jobId = jobId;
- this.startDate = startDate;
- this.endDate = endDate;
- }
-
- @Override
- public String getName() {
- return "CoordJobGetActionIdsForDateRangeJPAExecutor";
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public List<String> execute(EntityManager em) throws JPAExecutorException {
- try {
- Query q = em.createNamedQuery("GET_ACTION_IDS_FOR_DATES");
- q.setParameter("jobId", jobId);
- q.setParameter("startTime", new Timestamp(startDate.getTime()));
- q.setParameter("endTime", new Timestamp(endDate.getTime()));
- List<String> coordActionIds= q.getResultList();
- return coordActionIds;
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java
deleted file mode 100644
index eb95591..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Load non-terminal coordinator actions by dates.
- */
-public class CoordJobGetActionsByDatesForKillJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> {
-
- private String jobId = null;
- private Date startDate, endDate;
-
- public CoordJobGetActionsByDatesForKillJPAExecutor(String jobId, Date startDate, Date endDate) {
- ParamChecker.notNull(jobId, "jobId");
- this.jobId = jobId;
- this.startDate = startDate;
- this.endDate = endDate;
- }
-
- @Override
- public String getName() {
- return "CoordJobGetActionsByDatesForKillJPAExecutor";
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException {
- List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
- try {
- Query q = em.createNamedQuery("GET_ACTIONS_BY_DATES_FOR_KILL");
- q.setParameter("jobId", jobId);
- q.setParameter("startTime", new Timestamp(startDate.getTime()));
- q.setParameter("endTime", new Timestamp(endDate.getTime()));
- List<Object[]> actions = q.getResultList();
-
- for (Object[] a : actions) {
- CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
- actionList.add(aa);
- }
- return actionList;
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
- }
-
- private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) {
- CoordinatorActionBean action = new CoordinatorActionBean();
- if (arr[0] != null) {
- action.setId((String) arr[0]);
- }
-
- if (arr[1] != null) {
- action.setJobId((String) arr[1]);
- }
-
- if (arr[2] != null) {
- action.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
- }
-
- if (arr[3] != null) {
- action.setExternalId((String) arr[3]);
- }
-
- if (arr[4] != null) {
- action.setPending((Integer) arr[4]);
- }
-
- if (arr[5] != null) {
- action.setNominalTime(DateUtils.toDate((Timestamp) arr[5]));
- }
-
- if (arr[6] != null) {
- action.setCreatedTime(DateUtils.toDate((Timestamp) arr[6]));
- }
- return action;
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java
deleted file mode 100644
index d1856ae..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Load coordinator actions by dates.
- */
-public class CoordJobGetActionsForDatesJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> {
-
- private String jobId = null;
- private Date startDate, endDate;
-
- public CoordJobGetActionsForDatesJPAExecutor(String jobId, Date startDate, Date endDate) {
- ParamChecker.notNull(jobId, "jobId");
- this.jobId = jobId;
- this.startDate = startDate;
- this.endDate = endDate;
- }
-
- @Override
- public String getName() {
- return "CoordJobGetActionsForDatesJPAExecutor";
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException {
- List<CoordinatorActionBean> actions;
- try {
- Query q = em.createNamedQuery("GET_ACTIONS_FOR_DATES");
- q.setParameter("jobId", jobId);
- q.setParameter("startTime", new Timestamp(startDate.getTime()));
- q.setParameter("endTime", new Timestamp(endDate.getTime()));
- actions = q.getResultList();
- return actions;
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index 4bccef4..1518686 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -26,7 +26,6 @@ import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.Query;
-import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.StringBlob;
@@ -53,6 +52,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
UPDATE_COORD_JOB_STATUS_PENDING_TIME,
UPDATE_COORD_JOB_MATERIALIZE,
UPDATE_COORD_JOB_CHANGE,
+ UPDATE_COORD_JOB_CONF,
+ UPDATE_COORD_JOB_XML,
GET_COORD_JOB,
GET_COORD_JOB_USER_APPNAME,
GET_COORD_JOB_INPUT_CHECK,
@@ -63,9 +64,13 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
GET_COORD_JOB_STATUS,
GET_COORD_JOB_STATUS_PARENTID,
GET_COORD_JOBS_CHANGED,
- GET_COORD_JOBS_OLDER_FOR_MATERILZATION,
+ GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION,
GET_COORD_FOR_ABANDONEDCHECK,
- GET_COORD_IDS_FOR_STATUS_TRANSIT
+ GET_COORD_IDS_FOR_STATUS_TRANSIT,
+ GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID,
+ GET_COORD_JOBS_WITH_PARENT_ID,
+ GET_COORD_JOB_CONF,
+ GET_COORD_JOB_XML
};
private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor();
@@ -177,6 +182,15 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
query.setParameter("id", cjBean.getId());
break;
+ case UPDATE_COORD_JOB_CONF:
+ query.setParameter("conf", cjBean.getConfBlob());
+ query.setParameter("id", cjBean.getId());
+ break;
+ case UPDATE_COORD_JOB_XML:
+ query.setParameter("jobXml", cjBean.getJobXmlBlob());
+ query.setParameter("id", cjBean.getId());
+ break;
+
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
@@ -198,12 +212,14 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
case GET_COORD_JOB_SUSPEND_KILL:
case GET_COORD_JOB_STATUS:
case GET_COORD_JOB_STATUS_PARENTID:
+ case GET_COORD_JOB_CONF:
+ case GET_COORD_JOB_XML:
query.setParameter("id", parameters[0]);
break;
case GET_COORD_JOBS_CHANGED:
query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime()));
break;
- case GET_COORD_JOBS_OLDER_FOR_MATERILZATION:
+ case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION:
query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime()));
int limit = (Integer) parameters[1];
if (limit > 0) {
@@ -218,7 +234,13 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
case GET_COORD_IDS_FOR_STATUS_TRANSIT:
query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
break;
-
+ case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID:
+ query.setParameter("appName", parameters[0]);
+ query.setParameter("bundleId", parameters[1]);
+ break;
+ case GET_COORD_JOBS_WITH_PARENT_ID:
+ query.setParameter("parentId", parameters[0]);
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
@@ -335,7 +357,15 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
case GET_COORD_JOBS_CHANGED:
bean = (CoordinatorJobBean) ret;
break;
- case GET_COORD_JOBS_OLDER_FOR_MATERILZATION:
+ case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION:
+ bean = new CoordinatorJobBean();
+ bean.setId((String) ret);
+ break;
+ case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID:
+ bean = new CoordinatorJobBean();
+ bean.setId((String) ret);
+ break;
+ case GET_COORD_JOBS_WITH_PARENT_ID:
bean = new CoordinatorJobBean();
bean.setId((String) ret);
break;
@@ -347,11 +377,18 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
bean.setGroup((String) arr[2]);
bean.setAppName((String) arr[3]);
break;
-
case GET_COORD_IDS_FOR_STATUS_TRANSIT:
bean = new CoordinatorJobBean();
bean.setId((String) ret);
break;
+ case GET_COORD_JOB_CONF:
+ bean = new CoordinatorJobBean();
+ bean.setConfBlob((StringBlob) ret);
+ break;
+ case GET_COORD_JOB_XML:
+ bean = new CoordinatorJobBean();
+ bean.setJobXmlBlob((StringBlob) ret);
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
index 5e018c7..6d13ed1 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
@@ -56,7 +56,7 @@ public class CoordJobsToBeMaterializedJPAExecutor implements JPAExecutor<List<Co
public List<CoordinatorJobBean> execute(EntityManager em) throws JPAExecutorException {
List<CoordinatorJobBean> cjBeans;
try {
- Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_FOR_MATERILZATION");
+ Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION");
q.setParameter("matTime", new Timestamp(this.dateInput.getTime()));
if (limit > 0) {
q.setMaxResults(limit);
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
index e220c01..bded634 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
@@ -18,6 +18,8 @@
package org.apache.oozie.executor.jpa;
+import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.List;
import javax.persistence.EntityManager;
@@ -36,8 +38,13 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
public enum SLARegQuery {
UPDATE_SLA_REG_ALL,
+ UPDATE_SLA_CONFIG,
+ UPDATE_SLA_EXPECTED_VALUE,
GET_SLA_REG_ALL,
- GET_SLA_REG_ON_RESTART
+ GET_SLA_EXPECTED_VALUE_CONFIG,
+ GET_SLA_REG_FOR_PARENT_ID,
+ GET_SLA_REG_ON_RESTART,
+ GET_SLA_CONFIGS
};
private static SLARegistrationQueryExecutor instance = new SLARegistrationQueryExecutor();
@@ -70,6 +77,17 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
query.setParameter("parentId", bean.getParentId());
query.setParameter("jobData", bean.getJobData());
break;
+ case UPDATE_SLA_EXPECTED_VALUE:
+ query.setParameter("jobId", bean.getId());
+ query.setParameter("expectedStartTime", bean.getExpectedStartTimestamp());
+ query.setParameter("expectedEndTime", bean.getExpectedEndTimestamp());
+ query.setParameter("expectedDuration", bean.getExpectedDuration());
+ break;
+ case UPDATE_SLA_CONFIG:
+ query.setParameter("jobId", bean.getId());
+ query.setParameter("slaConfig", bean.getSlaConfig());
+ break;
+
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
@@ -86,6 +104,16 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
case GET_SLA_REG_ON_RESTART:
query.setParameter("id", parameters[0]);
break;
+ case GET_SLA_CONFIGS:
+ query.setParameter("ids", parameters[0]);
+ break;
+ case GET_SLA_EXPECTED_VALUE_CONFIG:
+ query.setParameter("id", parameters[0]);
+ break;
+ case GET_SLA_REG_FOR_PARENT_ID:
+ query.setParameter("parentId", parameters[0]);
+ break;
+
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
@@ -120,9 +148,13 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
JPAService jpaService = Services.get().get(JPAService.class);
EntityManager em = jpaService.getEntityManager();
Query query = getSelectQuery(namedQuery, em, parameters);
- @SuppressWarnings("unchecked")
- List<SLARegistrationBean> beanList = (List<SLARegistrationBean>) jpaService.executeGetList(namedQuery.name(),
- query, em);
+ List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
+ List<SLARegistrationBean> beanList = new ArrayList<SLARegistrationBean>();
+ if (retList != null) {
+ for (Object ret : retList) {
+ beanList.add(constructBean(namedQuery, ret));
+ }
+ }
return beanList;
}
@@ -145,6 +177,28 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
bean.setSlaConfig((String) arr[2]);
bean.setJobData((String) arr[3]);
break;
+ case GET_SLA_CONFIGS:
+ bean = new SLARegistrationBean();
+ arr = (Object[]) ret;
+ bean.setId((String) arr[0]);
+ bean.setSlaConfig((String) arr[1]);
+ break;
+ case GET_SLA_EXPECTED_VALUE_CONFIG:
+ bean = new SLARegistrationBean();
+ arr = (Object[]) ret;
+ bean.setId((String) arr[0]);
+ bean.setSlaConfig((String) arr[1]);
+ bean.setExpectedStart((Timestamp)arr[2]);
+ bean.setExpectedEnd((Timestamp)arr[3]);
+ bean.setExpectedDuration((Long)arr[4]);
+ bean.setNominalTime((Timestamp)arr[5]);
+ break;
+ case GET_SLA_REG_FOR_PARENT_ID:
+ bean = new SLARegistrationBean();
+ arr = (Object[]) ret;
+ bean.setId((String) arr[0]);
+ bean.setSlaConfig((String) arr[1]);
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+ namedQuery.name());
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
index c3197b7..0057c89 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
@@ -18,6 +18,7 @@
package org.apache.oozie.executor.jpa;
+import java.sql.Timestamp;
import java.util.List;
import javax.persistence.EntityManager;
@@ -39,8 +40,11 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES,
UPDATE_SLA_SUMMARY_ALL,
UPDATE_SLA_SUMMARY_EVENTPROCESSED,
+ UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
+ UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME,
GET_SLA_SUMMARY,
- GET_SLA_SUMMARY_EVENTPROCESSED
+ GET_SLA_SUMMARY_EVENTPROCESSED,
+ GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED
};
private static SLASummaryQueryExecutor instance = new SLASummaryQueryExecutor();
@@ -95,10 +99,24 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
query.setParameter("actualStartTS", bean.getActualStartTimestamp());
query.setParameter("jobId", bean.getId());
break;
+ case UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES:
+ query.setParameter("nominalTime", bean.getNominalTimestamp());
+ query.setParameter("expectedStartTime", bean.getExpectedStartTimestamp());
+ query.setParameter("expectedEndTime", bean.getExpectedEndTimestamp());
+ query.setParameter("expectedDuration", bean.getExpectedDuration());
+ query.setParameter("lastModTime", bean.getLastModifiedTimestamp());
+ query.setParameter("jobId", bean.getId());
+ break;
+
case UPDATE_SLA_SUMMARY_EVENTPROCESSED:
query.setParameter("eventProcessed", bean.getEventProcessed());
query.setParameter("jobId", bean.getId());
break;
+ case UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME:
+ query.setParameter("lastModifiedTS", bean.getLastModifiedTime());
+ query.setParameter("jobId", bean.getId());
+ break;
+
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
@@ -113,6 +131,7 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
switch (namedQuery) {
case GET_SLA_SUMMARY:
case GET_SLA_SUMMARY_EVENTPROCESSED:
+ case GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED:
query.setParameter("id", parameters[0]);
break;
}
@@ -174,6 +193,14 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
bean = new SLASummaryBean();
bean.setEventProcessed(((Byte)ret).intValue());
break;
+ case GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED:
+ Object[] arr = (Object[]) ret;
+ bean = new SLASummaryBean();
+ bean.setEventProcessed((Byte)arr[0]);
+ bean.setLastModifiedTime((Timestamp)arr[1]);
+
+ break;
+
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+ namedQuery.name());
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index fa16d1d..1cbd474 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -160,7 +160,7 @@ public class CoordMaterializeTriggerService implements Service {
throws JPAExecutorException {
try {
List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList(
- CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERILZATION, currDate, limit);
+ CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION, currDate, limit);
LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
+ ", Num jobs to materialize = " + materializeJobs.size());
for (CoordinatorJobBean coordJob : materializeJobs) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
index 7c0d3be..22c6fb0 100644
--- a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
+++ b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
@@ -19,32 +19,32 @@
package org.apache.oozie.service;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
-import org.apache.oozie.event.BundleJobEvent;
-import org.apache.oozie.event.CoordinatorActionEvent;
-import org.apache.oozie.event.CoordinatorJobEvent;
import org.apache.oozie.client.event.Event;
import org.apache.oozie.client.event.Event.MessageType;
import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.event.BundleJobEvent;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.CoordinatorJobEvent;
import org.apache.oozie.event.EventQueue;
import org.apache.oozie.event.MemoryEventQueue;
import org.apache.oozie.event.WorkflowActionEvent;
import org.apache.oozie.event.WorkflowJobEvent;
import org.apache.oozie.event.listener.JobEventListener;
import org.apache.oozie.sla.listener.SLAEventListener;
-import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.XLog;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
/**
* Service class that handles the events system - creating events queue,
* managing configured properties and managing and invoking various event
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
index 5690787..a581f8b 100644
--- a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
@@ -177,6 +177,27 @@ public abstract class BaseJobServlet extends JsonRestServlet {
startCron();
sendJsonResponse(response, HttpServletResponse.SC_OK, json);
}
+ else if (action.equals(RestConstants.SLA_ENABLE_ALERT)) {
+ validateContentType(request, RestConstants.XML_CONTENT_TYPE);
+ stopCron();
+ slaEnableAlert(request, response);
+ startCron();
+ response.setStatus(HttpServletResponse.SC_OK);
+ }
+ else if (action.equals(RestConstants.SLA_DISABLE_ALERT)) {
+ validateContentType(request, RestConstants.XML_CONTENT_TYPE);
+ stopCron();
+ slaDisableAlert(request, response);
+ startCron();
+ response.setStatus(HttpServletResponse.SC_OK);
+ }
+ else if (action.equals(RestConstants.SLA_CHANGE)) {
+ validateContentType(request, RestConstants.XML_CONTENT_TYPE);
+ stopCron();
+ slaChange(request, response);
+ startCron();
+ response.setStatus(HttpServletResponse.SC_OK);
+ }
else {
throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
RestConstants.ACTION_PARAM, action);
@@ -498,4 +519,38 @@ public abstract class BaseJobServlet extends JsonRestServlet {
*/
abstract String getJobStatus(HttpServletRequest request, HttpServletResponse response)
throws XServletException, IOException;
+
+ /**
+ * Abstract method to enable SLA alert.
+ *
+ * @param request the request
+ * @param response the response
+ * @throws XServletException the x servlet exception
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ abstract void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException;
+
+ /**
+ * Abstract method to disable SLA alert.
+ *
+ * @param request the request
+ * @param response the response
+ * @throws XServletException the x servlet exception
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ abstract void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException;
+
+ /**
+ * Abstract method to change SLA definition.
+ *
+ * @param request the request
+ * @param response the response
+ * @throws XServletException the x servlet exception
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ abstract void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException;
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java b/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java
index 2578e41..f897652 100644
--- a/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java
@@ -31,6 +31,7 @@ import java.util.StringTokenizer;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
import org.apache.oozie.ErrorCode;
import org.apache.oozie.SLAEventBean;
import org.apache.oozie.client.OozieClient;
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
index 3e186f9..3cb9168 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
@@ -237,6 +237,22 @@ public class V0JobServlet extends BaseJobServlet {
@Override
protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException,
IOException {
- throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0");
+ }
+
+ @Override
+ void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0");
+ }
+
+ @Override
+ void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0");
+ }
+
+ @Override
+ void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0");
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
index 64b97c2..d4564c6 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
@@ -1103,4 +1103,20 @@ public class V1JobServlet extends BaseJobServlet {
IOException {
throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
}
+
+ @Override
+ void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+ }
+
+ @Override
+ void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+ }
+
+ @Override
+ void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
index 5238426..7100c98 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
@@ -149,6 +149,53 @@ public class V2JobServlet extends V1JobServlet {
}
+ @Override
+ protected void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException {
+ String jobId = getResourceName(request);
+ String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
+ String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
+ String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM);
+ try {
+ getBaseEngine(jobId, getUser(request)).enableSLAAlert(jobId, actions, dates, childIds);
+ }
+ catch (BaseEngineException e) {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
+ }
+
+ }
+
+ @Override
+ protected void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException {
+ String jobId = getResourceName(request);
+ String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
+ String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
+ String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM);
+ try {
+ getBaseEngine(jobId, getUser(request)).disableSLAAlert(jobId, actions, dates, childIds);
+ }
+ catch (BaseEngineException e) {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
+ }
+ }
+
+ @Override
+ protected void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
+ String jobId = getResourceName(request);
+ String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
+ String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
+ String newParams = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
+ String coords = request.getParameter(RestConstants.COORDINATORS_PARAM);
+
+ try {
+ getBaseEngine(jobId, getUser(request)).changeSLA(jobId, actions, dates, coords, newParams);
+ }
+ catch (BaseEngineException e) {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
+ }
+ }
+
/**
* Ignore a coordinator job/action
*
@@ -199,21 +246,18 @@ public class V2JobServlet extends V1JobServlet {
String status;
String jobId = getResourceName(request);
try {
- if (jobId.endsWith("-B")) {
- BundleEngine engine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
- status = engine.getJobStatus(jobId);
- } else if (jobId.endsWith("-W")) {
- DagEngine engine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
- status = engine.getJobStatus(jobId);
- } else {
- CoordinatorEngine engine =
- Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request));
- if (jobId.contains("-C@")) {
- status = engine.getActionStatus(jobId);
- } else {
- status = engine.getJobStatus(jobId);
- }
+ if (jobId.endsWith("-B") || jobId.endsWith("-W")) {
+ status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId);
+ }
+ else if (jobId.contains("C@")) {
+ CoordinatorEngine engine = Services.get().get(CoordinatorEngineService.class)
+ .getCoordinatorEngine(getUser(request));
+ status = engine.getActionStatus(jobId);
}
+ else {
+ status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId);
+ }
+
} catch (BaseEngineException ex) {
throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
}
@@ -251,7 +295,7 @@ public class V2JobServlet extends V1JobServlet {
else if (jobId.endsWith("-B")) {
return Services.get().get(BundleEngineService.class).getBundleEngine(user);
}
- else if (jobId.endsWith("-C")) {
+ else if (jobId.contains("-C")) {
return Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(user);
}
else {
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java b/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java
index a0fe1b6..57170e1 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java
@@ -22,7 +22,9 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.text.ParseException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -31,15 +33,19 @@ import java.util.Set;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetForFilterJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetForFilterJPAExecutor.SLASummaryFilter;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.sla.SLASummaryBean;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XLog;
@@ -146,7 +152,20 @@ public class V2SLAServlet extends SLAServlet {
else {
XLog.getLog(getClass()).error(ErrorCode.E0610);
}
- return SLASummaryBean.toJSONObject(slaSummaryList, timeZoneId);
+
+ List<String> jobIds = new ArrayList<String>();
+ for(SLASummaryBean summaryBean:slaSummaryList){
+ jobIds.add(summaryBean.getId());
+ }
+ List<SLARegistrationBean> SLARegistrationList = SLARegistrationQueryExecutor.getInstance().getList(
+ SLARegQuery.GET_SLA_CONFIGS, jobIds);
+
+ Map<String, Map<String, String>> jobIdSLAConfigMap = new HashMap<String, Map<String, String>>();
+ for(SLARegistrationBean registrationBean:SLARegistrationList){
+ jobIdSLAConfigMap.put(registrationBean.getId(), registrationBean.getSLAConfigMap());
+ }
+
+ return SLASummaryBean.toJSONObject(slaSummaryList, jobIdSLAConfigMap, timeZoneId);
}
catch (XException ex) {
throw new CommandException(ex);
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index 189d5ea..0d7123a 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -20,8 +20,10 @@
package org.apache.oozie.sla;
import java.util.Date;
+import java.util.Map;
import org.apache.oozie.AppType;
+import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.lock.LockToken;
import org.apache.oozie.service.JobsConcurrencyService;
@@ -65,6 +67,10 @@ public class SLACalcStatus extends SLAEvent {
reg.setAlertContact(regBean.getAlertContact());
reg.setAlertEvents(regBean.getAlertEvents());
reg.setJobData(regBean.getJobData());
+ if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) {
+ reg.addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
+ regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT));
+ }
reg.setId(summary.getId());
reg.setAppType(summary.getAppType());
reg.setUser(summary.getUser());
@@ -267,10 +273,14 @@ public class SLACalcStatus extends SLAEvent {
}
@Override
- public String getSlaConfig() {
+ public String getSLAConfig() {
return regBean.getSlaConfig();
}
+ public Map<String, String> getSLAConfigMap() {
+ return regBean.getSLAConfigMap();
+ }
+
@Override
public MessageType getMsgType() {
return regBean.getMsgType();
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculator.java b/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
index 20f93b5..f238321 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
@@ -20,11 +20,14 @@ package org.apache.oozie.sla;
import java.util.Date;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.util.Pair;
public interface SLACalculator {
@@ -51,4 +54,55 @@ public interface SLACalculator {
SLACalcStatus get(String jobId) throws JPAExecutorException;
+ /**
+ * Enable jobs sla alert.
+ *
+ * @param jobId the job ids
+ * @return true, if successful
+ * @throws JPAExecutorException the JPA executor exception
+ * @throws ServiceException the service exception
+ */
+ boolean enableAlert(List<String> jobId) throws JPAExecutorException, ServiceException;
+
+ /**
+ * Enable sla alert for child jobs.
+ * @param jobId the parent job ids
+ * @return
+ * @throws JPAExecutorException
+ * @throws ServiceException
+ */
+ boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException;
+
+ /**
+ * Disable jobs Sla alert.
+ *
+ * @param jobId the job ids
+ * @return true, if successful
+ * @throws JPAExecutorException the JPA executor exception
+ * @throws ServiceException the service exception
+ */
+ boolean disableAlert(List<String> jobId) throws JPAExecutorException, ServiceException;
+
+
+ /**
+ * Disable Sla alert for child jobs.
+ * @param jobId the parent job ids
+ * @return
+ * @throws JPAExecutorException
+ * @throws ServiceException
+ */
+ boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException;
+
+ /**
+ * Change jobs Sla definitions
+ * It takes list of pairs of jobid and key/value pairs of el evaluated sla definition.
+ * Support definition are sla-should-start, sla-should-end, sla-nominal-time and sla-max-duration.
+ *
+ * @param jobIdsSLAPair the job ids sla pair
+ * @return true, if successful
+ * @throws JPAExecutorException the JPA executor exception
+ * @throws ServiceException the service exception
+ */
+ public boolean changeDefinition(List<Pair<String, Map<String,String>>> jobIdsSLAPair ) throws JPAExecutorException,
+ ServiceException;
}