You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/02/21 00:01:50 UTC
[4/4] oozie git commit: OOZIE-1913 Devise a way to turn off SLA
alerts for bundle/coordinator flexibly
OOZIE-1913 Devise a way to turn off SLA alerts for bundle/coordinator flexibly
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0f4b0181
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0f4b0181
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0f4b0181
Branch: refs/heads/master
Commit: 0f4b0181bc8bdac4696bce2bde854b332bb02d80
Parents: 5228eb8
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Fri Feb 20 15:01:19 2015 -0800
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Fri Feb 20 15:01:19 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/oozie/cli/OozieCLI.java | 69 ++++-
.../org/apache/oozie/client/OozieClient.java | 185 ++++++++++--
.../org/apache/oozie/client/event/SLAEvent.java | 2 +-
.../org/apache/oozie/client/rest/JsonTags.java | 2 +
.../apache/oozie/client/rest/RestConstants.java | 20 ++
.../main/java/org/apache/oozie/BaseEngine.java | 40 +++
.../java/org/apache/oozie/BundleEngine.java | 42 ++-
.../org/apache/oozie/CoordinatorActionBean.java | 49 +--
.../org/apache/oozie/CoordinatorEngine.java | 47 +++
.../org/apache/oozie/CoordinatorJobBean.java | 56 ++--
.../main/java/org/apache/oozie/DagEngine.java | 16 +
.../main/java/org/apache/oozie/ErrorCode.java | 2 +
.../apache/oozie/command/SLAAlertsXCommand.java | 117 +++++++
.../bundle/BundleSLAAlertsDisableXCommand.java | 44 +++
.../bundle/BundleSLAAlertsEnableXCommand.java | 45 +++
.../command/bundle/BundleSLAAlertsXCommand.java | 149 +++++++++
.../command/bundle/BundleSLAChangeXCommand.java | 57 ++++
.../bundle/BundleStatusTransitXCommand.java | 1 +
.../CoordMaterializeTransitionXCommand.java | 19 +-
.../coord/CoordSLAAlertsDisableXCommand.java | 71 +++++
.../coord/CoordSLAAlertsEnableXCommand.java | 65 ++++
.../command/coord/CoordSLAAlertsXCommand.java | 233 ++++++++++++++
.../command/coord/CoordSLAChangeXCommand.java | 100 ++++++
.../java/org/apache/oozie/coord/CoordUtils.java | 146 ++++++++-
.../executor/jpa/CoordActionQueryExecutor.java | 48 ++-
...dJobGetActionIdsForDateRangeJPAExecutor.java | 69 -----
...dJobGetActionsByDatesForKillJPAExecutor.java | 108 -------
.../CoordJobGetActionsForDatesJPAExecutor.java | 70 -----
.../executor/jpa/CoordJobQueryExecutor.java | 51 +++-
.../CoordJobsToBeMaterializedJPAExecutor.java | 2 +-
.../jpa/SLARegistrationQueryExecutor.java | 62 +++-
.../executor/jpa/SLASummaryQueryExecutor.java | 29 +-
.../service/CoordMaterializeTriggerService.java | 2 +-
.../oozie/service/EventHandlerService.java | 24 +-
.../apache/oozie/servlet/BaseJobServlet.java | 55 ++++
.../org/apache/oozie/servlet/SLAServlet.java | 1 +
.../org/apache/oozie/servlet/V0JobServlet.java | 18 +-
.../org/apache/oozie/servlet/V1JobServlet.java | 16 +
.../org/apache/oozie/servlet/V2JobServlet.java | 74 ++++-
.../org/apache/oozie/servlet/V2SLAServlet.java | 21 +-
.../org/apache/oozie/sla/SLACalcStatus.java | 12 +-
.../org/apache/oozie/sla/SLACalculator.java | 54 ++++
.../apache/oozie/sla/SLACalculatorMemory.java | 302 ++++++++++++++++---
.../org/apache/oozie/sla/SLAOperations.java | 143 +++++----
.../apache/oozie/sla/SLARegistrationBean.java | 28 +-
.../org/apache/oozie/sla/SLASummaryBean.java | 33 +-
.../apache/oozie/sla/service/SLAService.java | 94 +++++-
.../oozie/util/CoordActionsInDateRange.java | 23 +-
core/src/main/resources/oozie-default.xml | 9 +
.../oozie/command/TestSLAAlertXCommand.java | 300 ++++++++++++++++++
.../command/coord/TestCoordSubmitXCommand.java | 178 +++++++++++
.../jpa/TestCoordActionQueryExecutor.java | 111 +++++++
...CoordJobGetActionIdsForDatesJPAExecutor.java | 82 -----
...stCoordJobGetActionsForDatesJPAExecutor.java | 83 -----
.../apache/oozie/service/TestHASLAService.java | 71 +++++
.../apache/oozie/servlet/TestV2SLAServlet.java | 2 -
.../oozie/sla/TestSLACalculatorMemory.java | 125 ++++++--
.../oozie/sla/TestSLAEventGeneration.java | 4 +
.../sla/TestSLARegistrationGetJPAExecutor.java | 20 +-
core/src/test/resources/coord-action-sla.xml | 2 +-
docs/src/site/twiki/DG_CommandLineTool.twiki | 22 +-
docs/src/site/twiki/DG_SLAMonitoring.twiki | 46 +++
docs/src/site/twiki/WebServicesAPI.twiki | 42 +++
release-log.txt | 1 +
.../webapp/console/sla/js/oozie-sla-table.js | 1 +
.../src/main/webapp/console/sla/oozie-sla.html | 1 +
66 files changed, 3327 insertions(+), 689 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
index 6690869..218edf2 100644
--- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
+++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
@@ -135,17 +135,21 @@ public class OozieCLI {
public static final String LOCAL_TIME_OPTION = "localtime";
public static final String TIME_ZONE_OPTION = "timezone";
public static final String QUEUE_DUMP_OPTION = "queuedump";
- public static final String RERUN_COORD_OPTION = "coordinator";
public static final String DATE_OPTION = "date";
public static final String RERUN_REFRESH_OPTION = "refresh";
public static final String RERUN_NOCLEANUP_OPTION = "nocleanup";
public static final String RERUN_FAILED_OPTION = "failed";
public static final String ORDER_OPTION = "order";
+ public static final String COORD_OPTION = "coordinator";
public static final String UPDATE_SHARELIB_OPTION = "sharelibupdate";
public static final String LIST_SHARELIB_LIB_OPTION = "shareliblist";
+ public static final String SLA_DISABLE_ALERT = "sla_disable";
+ public static final String SLA_ENABLE_ALERT = "sla_enable";
+ public static final String SLA_CHANGE = "sla_change";
+
public static final String AUTH_OPTION = "auth";
@@ -328,7 +332,7 @@ public class OozieCLI {
+ "(requires -log)");
Option date = new Option(DATE_OPTION, true,
"coordinator/bundle rerun on action dates (requires -rerun); coordinator log retrieval on action dates (requires -log)");
- Option rerun_coord = new Option(RERUN_COORD_OPTION, true, "bundle rerun on coordinator names (requires -rerun)");
+ Option rerun_coord = new Option(COORD_OPTION, true, "bundle rerun on coordinator names (requires -rerun)");
Option rerun_refresh = new Option(RERUN_REFRESH_OPTION, false,
"re-materialize the coordinator rerun actions (requires -rerun)");
Option rerun_nocleanup = new Option(RERUN_NOCLEANUP_OPTION, false,
@@ -348,6 +352,14 @@ public class OozieCLI {
Option interval = new Option(INTERVAL_OPTION, true, "polling interval in minutes (default is 5, requires -poll)");
interval.setType(Integer.class);
+ Option slaDisableAlert = new Option(SLA_DISABLE_ALERT, true,
+ "disables sla alerts for the job and its children");
+ Option slaEnableAlert = new Option(SLA_ENABLE_ALERT, true,
+ "enables sla alerts for the job and its children");
+ Option slaChange = new Option(SLA_CHANGE, true,
+ "Update sla param for jobs, supported param are should-start, should-end, nominal-time and max-duration");
+
+
Option doAs = new Option(DO_AS_OPTION, true, "doAs user, impersonates as the specified user");
OptionGroup actions = new OptionGroup();
@@ -368,6 +380,10 @@ public class OozieCLI {
actions.addOption(config_content);
actions.addOption(ignore);
actions.addOption(poll);
+ actions.addOption(slaDisableAlert);
+ actions.addOption(slaEnableAlert);
+ actions.addOption(slaChange);
+
actions.setRequired(true);
Options jobOptions = new Options();
jobOptions.addOption(oozie);
@@ -401,6 +417,7 @@ public class OozieCLI {
OptionGroup updateOption = new OptionGroup();
updateOption.addOption(dryrun);
jobOptions.addOptionGroup(updateOption);
+
return jobOptions;
}
@@ -1014,8 +1031,8 @@ public class OozieCLI {
dateScope = commandLine.getOptionValue(DATE_OPTION);
}
- if (options.contains(RERUN_COORD_OPTION)) {
- coordScope = commandLine.getOptionValue(RERUN_COORD_OPTION);
+ if (options.contains(COORD_OPTION)) {
+ coordScope = commandLine.getOptionValue(COORD_OPTION);
}
if (options.contains(RERUN_REFRESH_OPTION)) {
@@ -1234,6 +1251,15 @@ public class OozieCLI {
boolean verbose = commandLine.hasOption(VERBOSE_OPTION);
wc.pollJob(jobId, timeout, interval, verbose);
}
+ else if (options.contains(SLA_ENABLE_ALERT)) {
+ slaAlertCommand(commandLine.getOptionValue(SLA_ENABLE_ALERT), wc, commandLine, options);
+ }
+ else if (options.contains(SLA_DISABLE_ALERT)) {
+ slaAlertCommand(commandLine.getOptionValue(SLA_DISABLE_ALERT), wc, commandLine, options);
+ }
+ else if (options.contains(SLA_CHANGE)) {
+ slaAlertCommand(commandLine.getOptionValue(SLA_CHANGE), wc, commandLine, options);
+ }
}
catch (OozieClientException ex) {
throw new OozieCLIException(ex.toString(), ex);
@@ -1902,8 +1928,8 @@ public class OozieCLI {
"ssh-action-0.2.xsd")));
sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
"hive2-action-0.1.xsd")));
- sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
- "spark-action-0.1.xsd")));
+ sources.add(new StreamSource(Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream("spark-action-0.1.xsd")));
SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
Schema schema = factory.newSchema(sources.toArray(new StreamSource[sources.size()]));
Validator validator = schema.newValidator();
@@ -2059,4 +2085,35 @@ public class OozieCLI {
return allDeps.toString();
}
+ private void slaAlertCommand(String jobIds, OozieClient wc, CommandLine commandLine, List<String> options)
+ throws OozieCLIException, OozieClientException {
+ String actions = null, coordinators = null, dates = null;
+
+ if (options.contains(ACTION_OPTION)) {
+ actions = commandLine.getOptionValue(ACTION_OPTION);
+ }
+
+ if (options.contains(DATE_OPTION)) {
+ dates = commandLine.getOptionValue(DATE_OPTION);
+ }
+
+ if (options.contains(COORD_OPTION)) {
+ coordinators = commandLine.getOptionValue(COORD_OPTION);
+ if (coordinators == null) {
+ throw new OozieCLIException("No value specified for -coordinator option");
+ }
+ }
+
+ if (options.contains(SLA_ENABLE_ALERT)) {
+ wc.slaEnableAlert(jobIds, actions, dates, coordinators);
+ }
+ else if (options.contains(SLA_DISABLE_ALERT)) {
+ wc.slaDisableAlert(jobIds, actions, dates, coordinators);
+ }
+ else if (options.contains(SLA_CHANGE)) {
+ String newSlaParams = commandLine.getOptionValue(CHANGE_VALUE_OPTION);
+ wc.slaChange(jobIds, actions, dates, coordinators, newSlaParams);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/client/src/main/java/org/apache/oozie/client/OozieClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java
index e4c93cd..5de25cc 100644
--- a/client/src/main/java/org/apache/oozie/client/OozieClient.java
+++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java
@@ -52,6 +52,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -101,10 +102,10 @@ public class OozieClient {
public static final String EXTERNAL_ID = "oozie.wf.external.id";
- public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url";
-
public static final String WORKFLOW_NOTIFICATION_PROXY = "oozie.wf.workflow.notification.proxy";
+ public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url";
+
public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url";
public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url";
@@ -155,6 +156,14 @@ public class OozieClient {
public static final String FILTER_CREATED_TIME_END = "endcreatedtime";
+ public static final String SLA_DISABLE_ALERT = "oozie.sla.disable.alerts";
+
+ public static final String SLA_ENABLE_ALERT = "oozie.sla.enable.alerts";
+
+ public static final String SLA_DISABLE_ALERT_OLDER_THAN = SLA_DISABLE_ALERT + ".older.than";
+
+ public static final String SLA_DISABLE_ALERT_COORD = SLA_DISABLE_ALERT + ".coord";
+
public static final String CHANGE_VALUE_ENDTIME = "endtime";
public static final String CHANGE_VALUE_PAUSETIME = "pausetime";
@@ -1626,33 +1635,137 @@ public class OozieClient {
}
/**
- * Print sla info about coordinator and workflow jobs and actions.
+ * Sla enable alert.
*
- * @param start starting offset
- * @param len number of results
- * @throws OozieClientException
+ * @param jobIds the job ids
+ * @param actionIds comma separated list of action ids or action id ranges
+ * @param dates comma separated list of the nominal times
+ * @throws OozieClientException the oozie client exception
*/
- public void getSlaInfo(int start, int len, String filter) throws OozieClientException {
- new SlaInfo(start, len, filter).call();
+ public void slaEnableAlert(String jobIds, String actions, String dates) throws OozieClientException {
+ new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, jobIds, actions, dates, null).call();
}
- private class SlaInfo extends ClientCallable<Void> {
+ /**
+ * Sla enable alert for bundle with coord name/id.
+ *
+ * @param bundleId the bundle id
+ * @param actionIds comma separated list of action ids or action id ranges
+ * @param dates comma separated list of the nominal times
+ * @param coords the coordinators
+ * @throws OozieClientException the oozie client exception
+ */
+ public void slaEnableAlert(String bundleId, String actions, String dates, String coords)
+ throws OozieClientException {
+ new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, bundleId, actions, dates, coords).call();
+ }
- SlaInfo(int start, int len, String filter) {
- super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID,
- Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len),
- RestConstants.JOBS_FILTER_PARAM, filter));
+ /**
+ * Sla disable alert.
+ *
+ * @param jobIds the job ids
+ * @param actionIds comma separated list of action ids or action id ranges
+ * @param dates comma separated list of the nominal times
+ * @throws OozieClientException the oozie client exception
+ */
+ public void slaDisableAlert(String jobIds, String actions, String dates) throws OozieClientException {
+ new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, jobIds, actions, dates, null).call();
+ }
+
+ /**
+ * Sla disable alert for bundle with coord name/id.
+ *
+ * @param bundleId the bundle id
+ * @param actionIds comma separated list of action ids or action id ranges
+ * @param dates comma separated list of the nominal times
+ * @param coords the coordinators
+ * @throws OozieClientException the oozie client exception
+ */
+ public void slaDisableAlert(String bundleId, String actions, String dates, String coords)
+ throws OozieClientException {
+ new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, bundleId, actions, dates, coords).call();
+ }
+
+ /**
+ * Sla change definations.
+ * SLA change definition parameters can be [<key>=<value>,...<key>=<value>]
+ * Supported parameter key names are should-start, should-end and max-duration
+ * @param jobIds the job ids
+ * @param actionIds comma separated list of action ids or action id ranges.
+ * @param dates comma separated list of the nominal times
+ * @param newSlaParams the new sla params
+ * @throws OozieClientException the oozie client exception
+ */
+ public void slaChange(String jobIds, String actions, String dates, String newSlaParams) throws OozieClientException {
+ new UpdateSLA(RestConstants.SLA_CHANGE, jobIds, actions, dates, null, newSlaParams).call();
+ }
+
+ /**
+ * Sla change defination for bundle with coord name/id.
+ * SLA change definition parameters can be [<key>=<value>,...<key>=<value>]
+ * Supported parameter key names are should-start, should-end and max-duration
+ * @param bundleId the bundle id
+ * @param actionIds comma separated list of action ids or action id ranges
+ * @param dates comma separated list of the nominal times
+ * @param coords the coords
+ * @param newSlaParams the new sla params
+ * @throws OozieClientException the oozie client exception
+ */
+ public void slaChange(String bundleId, String actions, String dates, String coords, String newSlaParams)
+ throws OozieClientException {
+ new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, newSlaParams).call();
+ }
+
+ /**
+ * Sla change with new sla param as hasmap.
+ * Supported parameter key names are should-start, should-end and max-duration
+ * @param bundleId the bundle id
+ * @param actionIds comma separated list of action ids or action id ranges
+ * @param dates comma separated list of the nominal times
+ * @param coords the coords
+ * @param newSlaParams the new sla params
+ * @throws OozieClientException the oozie client exception
+ */
+ public void slaChange(String bundleId, String actions, String dates, String coords, Map<String, String> newSlaParams)
+ throws OozieClientException {
+ new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, mapToString(newSlaParams)).call();
+ }
+
+ /**
+ * Convert Map to string.
+ *
+ * @param map the map
+ * @return the string
+ */
+ private String mapToString(Map<String, String> map) {
+ StringBuilder sb = new StringBuilder();
+ Iterator<Entry<String, String>> it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, String> e = (Entry<String, String>) it.next();
+ sb.append(e.getKey()).append("=").append(e.getValue()).append(";");
+ }
+ return sb.toString();
+ }
+
+ private class UpdateSLA extends ClientCallable<Void> {
+
+ UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords) {
+ super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM,
+ action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE,
+ dates, RestConstants.COORDINATORS_PARAM, coords));
+ }
+
+ UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords, String newSlaParams) {
+ super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM,
+ action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE,
+ dates, RestConstants.COORDINATORS_PARAM, coords, RestConstants.JOB_CHANGE_VALUE, newSlaParams));
}
@Override
protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
- BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
- String line = null;
- while ((line = br.readLine()) != null) {
- System.out.println(line);
- }
+ System.out.println("Done");
}
else {
handleError(conn);
@@ -1661,6 +1774,42 @@ public class OozieClient {
}
}
+ /**
+ * Print sla info about coordinator and workflow jobs and actions.
+ *
+ * @param start starting offset
+ * @param len number of results
+ * @throws OozieClientException
+ */
+ public void getSlaInfo(int start, int len, String filter) throws OozieClientException {
+ new SlaInfo(start, len, filter).call();
+ }
+
+ private class SlaInfo extends ClientCallable<Void> {
+
+ SlaInfo(int start, int len, String filter) {
+ super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID,
+ Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len),
+ RestConstants.JOBS_FILTER_PARAM, filter));
+ }
+
+ @Override
+ protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
+ conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
+ if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
+ BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ System.out.println(line);
+ }
+ }
+ else {
+ handleError(conn);
+ }
+ return null;
+ }
+ }
+
private class JobIdAction extends ClientCallable<String> {
JobIdAction(String externalId) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java b/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java
index 27a0e1f..19d732f 100644
--- a/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java
+++ b/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java
@@ -157,7 +157,7 @@ public abstract class SLAEvent extends Event {
*
* @return String slaConfig
*/
- public abstract String getSlaConfig();
+ public abstract String getSLAConfig();
/**
* Get the actual start time of job for SLA
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
index b7cf0e7..1022dd7 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
@@ -172,6 +172,8 @@ public interface JsonTags {
public static final String SLA_SUMMARY_JOB_STATUS = "jobStatus";
public static final String SLA_SUMMARY_SLA_STATUS = "slaStatus";
public static final String SLA_SUMMARY_LAST_MODIFIED = "lastModified";
+ public static final String SLA_ALERT_STATUS = "slaAlertStatus";
+
public static final String TO_STRING = "toString";
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
index 3c2afc3..4c75d2a 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
@@ -186,4 +186,24 @@ public interface RestConstants {
public static final String LOG_FILTER_OPTION = "logfilter";
public static final String JOB_COORD_RERUN_FAILED_PARAM = "failed";
+
+ public static final String SLA_DISABLE_ALERT = "sla-disable";
+
+ public static final String SLA_ENABLE_ALERT = "sla-enable";
+
+ public static final String SLA_CHANGE = "sla-change";
+
+ public static final String SLA_ALERT_RANGE = "sla-alert-range";
+
+ public static final String COORDINATORS_PARAM = "coordinators";
+
+ public static final String SLA_NOMINAL_TIME = "sla-nominal-time";
+
+ public static final String SLA_SHOULD_START = "sla-should-start";
+
+ public static final String SLA_SHOULD_END = "sla-should-end";
+
+ public static final String SLA_MAX_DURATION = "sla-max-duration";
+
+ public static final String JOB_COORD_SCOPE_ACTION_LIST = "action-list";
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/BaseEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BaseEngine.java b/core/src/main/java/org/apache/oozie/BaseEngine.java
index bf38a0c..44074ea 100644
--- a/core/src/main/java/org/apache/oozie/BaseEngine.java
+++ b/core/src/main/java/org/apache/oozie/BaseEngine.java
@@ -239,4 +239,44 @@ public abstract class BaseEngine {
* @throws BaseEngineException thrown if the job's status could not be obtained
*/
public abstract String getJobStatus(String jobId) throws BaseEngineException;
+
+ /**
+ * Return the status for a Job ID
+ *
+ * @param jobId job Id.
+ * @return the job's status
+ * @throws BaseEngineException thrown if the job's status could not be obtained
+ */
+
+ /**
+ * Enable SLA alert for job
+ * @param id
+ * @param actions
+ * @param dates
+ * @param childIds
+ * @throws BaseEngineException
+ */
+ public abstract void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException;
+
+ /**
+ * Disable SLA alert for job
+ * @param id
+ * @param actions
+ * @param dates
+ * @param childIds
+ * @throws BaseEngineException
+ */
+ public abstract void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException;
+
+ /**
+ * Change SLA properties for job
+ * @param id
+ * @param actions
+ * @param childIds
+ * @param newParams
+ * @throws BaseEngineException
+ */
+ public abstract void changeSLA(String id, String actions, String dates, String childIds, String newParams)
+ throws BaseEngineException;
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/BundleEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java
index 9818acc..659c8e6 100644
--- a/core/src/main/java/org/apache/oozie/BundleEngine.java
+++ b/core/src/main/java/org/apache/oozie/BundleEngine.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
@@ -40,6 +39,9 @@ import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.rest.BulkResponseImpl;
import org.apache.oozie.command.BulkJobsXCommand;
import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.bundle.BundleSLAAlertsDisableXCommand;
+import org.apache.oozie.command.bundle.BundleSLAAlertsEnableXCommand;
+import org.apache.oozie.command.bundle.BundleSLAChangeXCommand;
import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
@@ -55,6 +57,7 @@ import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogStreamingService;
import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.XLogUserFilterParam;
import org.apache.oozie.util.ParamChecker;
@@ -506,4 +509,41 @@ public class BundleEngine extends BaseEngine {
throw new BundleEngineException(e);
}
}
+
+ @Override
+ public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
+ try {
+ new BundleSLAAlertsEnableXCommand(id, actions, dates, childIds).call();
+ }
+ catch (CommandException e) {
+ throw new BundleEngineException(e);
+ }
+ }
+
+ @Override
+ public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
+ try {
+ new BundleSLAAlertsDisableXCommand(id, actions, dates, childIds).call();
+ }
+ catch (CommandException e) {
+ throw new BundleEngineException(e);
+ }
+ }
+
+ @Override
+ public void changeSLA(String id, String actions, String dates, String childIds, String newParams)
+ throws BaseEngineException {
+ Map<String, String> slaNewParams = null;
+ try {
+
+ if (newParams != null) {
+ slaNewParams = JobUtils.parseChangeValue(newParams);
+ }
+ new BundleSLAChangeXCommand(id, actions, dates, childIds, slaNewParams).call();
+ }
+ catch (CommandException e) {
+ throw new BundleEngineException(e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index bd01d14..85b7ed4 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -18,6 +18,23 @@
package org.apache.oozie;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.text.MessageFormat;
+import java.util.Date;
+import java.util.List;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+
import org.apache.hadoop.io.Writable;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.rest.JsonBean;
@@ -30,25 +47,6 @@ import org.apache.openjpa.persistence.jdbc.Strategy;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
-import javax.persistence.Basic;
-import javax.persistence.Column;
-import javax.persistence.ColumnResult;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.Lob;
-import javax.persistence.NamedNativeQueries;
-import javax.persistence.NamedNativeQuery;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.SqlResultSetMapping;
-import javax.persistence.Table;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.text.MessageFormat;
-import java.util.Date;
-import java.util.List;
@Entity
@NamedQueries({
@@ -149,13 +147,13 @@ import java.util.List;
@NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
// Select query used by rerun, requires almost all columns so select * is used
- @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED' OR a.statusStr = 'IGNORED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
+ @NamedQuery(name = "GET_TERMINATED_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED' OR a.statusStr = 'IGNORED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
// Select query used by log
- @NamedQuery(name = "GET_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
+ @NamedQuery(name = "GET_TERMINATED_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
// Select query used by rerun, requires almost all columns so select * is used
@NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"),
- @NamedQuery(name = "GET_ACTIONS_BY_DATES_FOR_KILL", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
+ @NamedQuery(name = "GET_ACTIVE_ACTIONS_FOR_DATES", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'WAITING' OR a.statusStr = 'READY' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'RUNNING' OR a.statusStr = 'SUSPENDED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
@NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w"),
@@ -163,7 +161,12 @@ import java.util.List;
@NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
- @NamedQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.jobId, min(a.lastModifiedTimestamp) from CoordinatorActionBean a where a.statusStr = 'READY' group by a.jobId having min(a.lastModifiedTimestamp) < :lastModifiedTime")})
+ @NamedQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.jobId, min(a.lastModifiedTimestamp) from CoordinatorActionBean a where a.statusStr = 'READY' group by a.jobId having min(a.lastModifiedTimestamp) < :lastModifiedTime"),
+
+ @NamedQuery(name = "GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.id in (:ids) and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')"),
+
+ @NamedQuery(name = "GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')")
+ })
@Table(name = "COORD_ACTIONS")
public class CoordinatorActionBean implements
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index 136c097..642a82a 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -19,6 +19,7 @@
package org.apache.oozie;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorAction;
@@ -36,6 +37,9 @@ import org.apache.oozie.command.coord.CoordJobsXCommand;
import org.apache.oozie.command.coord.CoordKillXCommand;
import org.apache.oozie.command.coord.CoordRerunXCommand;
import org.apache.oozie.command.coord.CoordResumeXCommand;
+import org.apache.oozie.command.coord.CoordSLAAlertsDisableXCommand;
+import org.apache.oozie.command.coord.CoordSLAAlertsEnableXCommand;
+import org.apache.oozie.command.coord.CoordSLAChangeXCommand;
import org.apache.oozie.command.coord.CoordSubmitXCommand;
import org.apache.oozie.command.coord.CoordSuspendXCommand;
import org.apache.oozie.command.coord.CoordUpdateXCommand;
@@ -49,6 +53,7 @@ import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogStreamingService;
import org.apache.oozie.util.CoordActionsInDateRange;
import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.Pair;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
@@ -847,4 +852,46 @@ public class CoordinatorEngine extends BaseEngine {
throw new CoordinatorEngineException(e);
}
}
+
+ @Override
+ public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
+ try {
+ new CoordSLAAlertsDisableXCommand(id, actions, dates).call();
+
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ @Override
+ public void changeSLA(String id, String actions, String dates, String childIds, String newParams)
+ throws BaseEngineException {
+ Map<String, String> slaNewParams = null;
+
+ try {
+
+ if (newParams != null) {
+ slaNewParams = JobUtils.parseChangeValue(newParams);
+ }
+
+ new CoordSLAChangeXCommand(id, actions, dates, slaNewParams).call();
+
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ @Override
+ public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
+ try {
+ new CoordSLAAlertsEnableXCommand(id, actions, dates).call();
+
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index 4d6b970..c3ee839 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -18,18 +18,14 @@
package org.apache.oozie;
-import org.apache.hadoop.io.Writable;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.rest.JsonBean;
-import org.apache.oozie.client.rest.JsonTags;
-import org.apache.oozie.client.rest.JsonUtils;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.WritableUtils;
-import org.apache.openjpa.persistence.jdbc.Index;
-import org.apache.openjpa.persistence.jdbc.Strategy;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
import javax.persistence.Basic;
import javax.persistence.Column;
@@ -42,14 +38,19 @@ import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Table;
import javax.persistence.Transient;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.client.rest.JsonTags;
+import org.apache.oozie.client.rest.JsonUtils;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.WritableUtils;
+import org.apache.openjpa.persistence.jdbc.Index;
+import org.apache.openjpa.persistence.jdbc.Strategy;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
@Entity
@NamedQueries( {
@@ -79,6 +80,10 @@ import java.util.List;
@NamedQuery(name = "UPDATE_COORD_JOB_CHANGE", query = "update CoordinatorJobBean w set w.endTimestamp = :endTime, w.statusStr = :status, w.pending = :pending, w.doneMaterialization = :doneMaterialization, w.concurrency = :concurrency, w.pauseTimestamp = :pauseTime, w.lastActionNumber = :lastActionNumber, w.lastActionTimestamp = :lastActionTime, w.nextMaterializedTimestamp = :nextMatdTime, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
+ @NamedQuery(name = "UPDATE_COORD_JOB_CONF", query = "update CoordinatorJobBean w set w.conf = :conf where w.id = :id"),
+
+ @NamedQuery(name = "UPDATE_COORD_JOB_XML", query = "update CoordinatorJobBean w set w.jobXml = :jobXml where w.id = :id"),
+
@NamedQuery(name = "DELETE_COORD_JOB", query = "delete from CoordinatorJobBean w where w.id IN (:id)"),
@NamedQuery(name = "GET_COORD_JOBS", query = "select OBJECT(w) from CoordinatorJobBean w"),
@@ -108,7 +113,7 @@ import java.util.List;
//TODO need to remove.
@NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN", query = "select OBJECT(w) from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) order by w.lastModifiedTimestamp"),
- @NamedQuery(name = "GET_COORD_JOBS_OLDER_FOR_MATERILZATION", query = "select w.id from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) and w.matThrottling > ( select count(a.jobId) from CoordinatorActionBean a where a.jobId = w.id and a.statusStr = 'WAITING') order by w.lastModifiedTimestamp"),
+ @NamedQuery(name = "GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION", query = "select w.id from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) and w.matThrottling > ( select count(a.jobId) from CoordinatorActionBean a where a.jobId = w.id and a.statusStr = 'WAITING') order by w.lastModifiedTimestamp"),
@NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN_STATUS", query = "select OBJECT(w) from CoordinatorJobBean w where w.statusStr = :status AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"),
@@ -134,7 +139,13 @@ import java.util.List;
@NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id"),
- @NamedQuery(name = "GET_COORD_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from CoordinatorActionBean a, CoordinatorJobBean w where w.id = a.jobId and a.lastModifiedTimestamp >= :lastModifiedTime and (w.statusStr IN ('PAUSED', 'RUNNING', 'RUNNINGWITHERROR', 'PAUSEDWITHERROR') or w.pending = 1) and w.statusStr <> 'IGNORED'")
+ @NamedQuery(name = "GET_COORD_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from CoordinatorActionBean a, CoordinatorJobBean w where w.id = a.jobId and a.lastModifiedTimestamp >= :lastModifiedTime and (w.statusStr IN ('PAUSED', 'RUNNING', 'RUNNINGWITHERROR', 'PAUSEDWITHERROR') or w.pending = 1) and w.statusStr <> 'IGNORED'"),
+
+ @NamedQuery(name = "GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID", query = "select w.id from CoordinatorJobBean w where ( w.appName IN (:appName) OR w.id IN (:appName) ) AND w.bundleId = :bundleId"),
+
+ @NamedQuery(name = "GET_COORD_JOB_CONF", query = "select w.conf from CoordinatorJobBean w where w.id = :id"),
+
+ @NamedQuery(name = "GET_COORD_JOB_XML", query = "select w.jobXml from CoordinatorJobBean w where w.id = :id")
})
@NamedNativeQueries({
@@ -221,7 +232,6 @@ public class CoordinatorJobBean implements Writable, CoordinatorJob, JsonBean {
private java.sql.Timestamp startTimestamp = null;
@Basic
- @Index
@Column(name = "end_time")
private java.sql.Timestamp endTimestamp = null;
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/DagEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java
index 50aef2f..ac2e7b1 100644
--- a/core/src/main/java/org/apache/oozie/DagEngine.java
+++ b/core/src/main/java/org/apache/oozie/DagEngine.java
@@ -585,4 +585,20 @@ public class DagEngine extends BaseEngine {
throw new DagEngineException(ex);
}
}
+
+ @Override
+ public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
+ throw new BaseEngineException(new XException(ErrorCode.E0301, "Not supported for workflow"));
+ }
+
+ @Override
+ public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
+ throw new BaseEngineException(new XException(ErrorCode.E0301, "Not supported for workflow"));
+ }
+
+ @Override
+ public void changeSLA(String id, String actions, String dates, String childIds, String newParams) throws BaseEngineException {
+ throw new BaseEngineException(new XException(ErrorCode.E0301, "Not supported for workflow"));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 4444c87..7630c2f 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -209,6 +209,8 @@ public enum ErrorCode {
E1023(XLog.STD, "Coord Job update Error: [{0}]"),
E1024(XLog.STD, "Cannot run ignore command: [{0}]"),
E1025(XLog.STD, "Coord status transit error: [{0}]"),
+ E1026(XLog.STD, "SLA alert update command failed: {0}"),
+ E1027(XLog.STD, "SLA change command failed. {0}"),
E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/SLAAlertsXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/SLAAlertsXCommand.java b/core/src/main/java/org/apache/oozie/command/SLAAlertsXCommand.java
new file mode 100644
index 0000000..baf3a27
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/SLAAlertsXCommand.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command;
+
+import java.util.Map;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.util.LogUtils;
+
+public abstract class SLAAlertsXCommand extends XCommand<Void> {
+
+ private String jobId;
+
+ public SLAAlertsXCommand(String jobId, String name, String type) {
+ super(name, type, 1);
+ this.jobId = jobId;
+ }
+
+ @Override
+ final protected boolean isLockRequired() {
+ return true;
+ }
+
+ @Override
+ final public String getEntityKey() {
+ return getJobId();
+ }
+
+ final public String getJobId() {
+ return jobId;
+ }
+
+ @Override
+ protected void setLogInfo() {
+ LogUtils.setLogInfo(jobId);
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ }
+
+ @Override
+ protected Void execute() throws CommandException {
+ try {
+ if (!executeSlaCommand()) {
+ if (!isJobRequest()) {
+ throw new CommandException(ErrorCode.E1026, "No record found");
+ }
+ }
+
+ }
+ catch (ServiceException e) {
+ throw new CommandException(e);
+ }
+ updateJob();
+ return null;
+ }
+
+ @Override
+ public String getKey() {
+ return getName() + "_" + jobId;
+ }
+
+ protected void validateSLAChangeParam(Map<String, String> slaParams) throws CommandException, PreconditionException {
+ for (String key : slaParams.keySet()) {
+ if (key.equals(RestConstants.SLA_NOMINAL_TIME) || key.equals(RestConstants.SLA_SHOULD_START)
+ || key.equals(RestConstants.SLA_SHOULD_END) || key.equals(RestConstants.SLA_MAX_DURATION)) {
+ // good.
+ }
+ else {
+ throw new CommandException(ErrorCode.E1027, "Unsupported parameter " + key);
+ }
+ }
+ }
+
+ /**
+ * Execute sla command.
+ *
+ * @return true, if successful
+ * @throws ServiceException the service exception
+ * @throws CommandException the command exception
+ */
+ protected abstract boolean executeSlaCommand() throws ServiceException, CommandException;
+
+ /**
+ * Update job.
+ *
+ * @throws CommandException the command exception
+ */
+ protected abstract void updateJob() throws CommandException;
+
+ protected abstract boolean isJobRequest() throws CommandException;
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsDisableXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsDisableXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsDisableXCommand.java
new file mode 100644
index 0000000..4f4e2cd
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsDisableXCommand.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.bundle;
+
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.CoordSLAAlertsDisableXCommand;
+import org.apache.oozie.service.ServiceException;
+
+public class BundleSLAAlertsDisableXCommand extends BundleSLAAlertsXCommand {
+
+ public BundleSLAAlertsDisableXCommand(String jobId, String actions, String dates, String childIds) {
+ super(jobId, actions, dates, childIds);
+
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ }
+
+ @Override
+ protected void updateJob() throws CommandException {
+ }
+
+ @Override
+ protected void executeCoordCommand(String id, String actions, String dates) throws ServiceException,
+ CommandException {
+ new CoordSLAAlertsDisableXCommand(id, actions, dates).call();
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsEnableXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsEnableXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsEnableXCommand.java
new file mode 100644
index 0000000..4d3b75c
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsEnableXCommand.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.bundle;
+
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.CoordSLAAlertsEnableXCommand;
+import org.apache.oozie.service.ServiceException;
+
+public class BundleSLAAlertsEnableXCommand extends BundleSLAAlertsXCommand {
+
+ public BundleSLAAlertsEnableXCommand(String jobId, String actions, String dates, String childIds) {
+ super(jobId, actions, dates, childIds);
+
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ }
+
+ @Override
+ protected void executeCoordCommand(String id, String actions, String dates) throws ServiceException,
+ CommandException {
+ new CoordSLAAlertsEnableXCommand(id, actions, dates).call();
+ }
+
+ @Override
+ protected void updateJob() throws CommandException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsXCommand.java
new file mode 100644
index 0000000..1e6f6ae
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsXCommand.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.bundle;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.SLAAlertsXCommand;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.service.ServiceException;
+
+public abstract class BundleSLAAlertsXCommand extends SLAAlertsXCommand {
+
+ private String actions;
+
+ private String dates;
+
+ private String childIds;
+
+ public BundleSLAAlertsXCommand(String jobId, String actions, String dates, String childIds) {
+ super(jobId, "SLA.command", "SLA.command");
+ this.actions = actions;
+ this.dates = dates;
+ this.childIds = childIds;
+
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ }
+
+ /**
+ * Gets the coord jobs from bundle.
+ *
+ * @param id the bundle id
+ * @param coords the coords name/id
+ * @return the coord jobs from bundle
+ * @throws CommandException the command exception
+ */
+ protected Set<String> getCoordJobsFromBundle(String id, String coords) throws CommandException {
+ Set<String> jobs = new HashSet<String>();
+ List<CoordinatorJobBean> coordJobs;
+ try {
+ if (coords == null) {
+ coordJobs = CoordJobQueryExecutor.getInstance()
+ .getList(CoordJobQuery.GET_COORD_JOBS_WITH_PARENT_ID, id);
+ }
+ else {
+ coordJobs = CoordJobQueryExecutor.getInstance().getList(
+ CoordJobQuery.GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, Arrays.asList(coords.split(",")), id);
+ }
+ }
+ catch (XException e) {
+ throw new CommandException(e);
+ }
+ for (CoordinatorJobBean jobBean : coordJobs) {
+ jobs.add(jobBean.getId());
+ }
+ return jobs;
+
+ }
+
+ /**
+ * Gets the coord jobs.
+ *
+ * @return the coord jobs
+ */
+ protected String getCoordJobs() {
+ return childIds;
+ }
+
+ /**
+ * Gets the actions.
+ *
+ * @return the actions
+ */
+ protected String getActions() {
+ return actions;
+ }
+
+ /**
+ * Gets the dates.
+ *
+ * @return the dates
+ */
+ protected String getDates() {
+ return dates;
+ }
+
+ protected boolean isJobRequest() {
+ return true;
+
+ }
+
+ @Override
+ protected boolean executeSlaCommand() throws ServiceException, CommandException {
+ StringBuffer report = new StringBuffer();
+
+ Set<String> coordJobs = getCoordJobsFromBundle(getJobId(), getCoordJobs());
+
+ if (coordJobs.isEmpty()) {
+ throw new CommandException(ErrorCode.E1026, "No record found");
+ }
+ else {
+ for (String job : coordJobs) {
+ try {
+ executeCoordCommand(job, getActions(), getDates());
+ }
+ catch (Exception e) {
+ // Ignore exception for coords.
+ String errorMsg = "SLA command for coord job " + job + " failed. Error message is : " + e.getMessage();
+ LOG.error(errorMsg, e);
+ report.append(errorMsg).append(System.getProperty("line.separator"));
+ }
+ }
+ if (!report.toString().isEmpty()) {
+ throw new CommandException(ErrorCode.E1026, report.toString());
+ }
+ return true;
+ }
+ }
+
+ protected abstract void executeCoordCommand(String id, String actions, String dates) throws ServiceException,
+ CommandException;
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAChangeXCommand.java
new file mode 100644
index 0000000..6530451
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAChangeXCommand.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.bundle;
+
+import java.util.Map;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.coord.CoordSLAChangeXCommand;
+import org.apache.oozie.service.ServiceException;
+
+public class BundleSLAChangeXCommand extends BundleSLAAlertsXCommand {
+
+ Map<String, String> newSlaParams;
+
+ public BundleSLAChangeXCommand(String jobId, String actions, String dates, String childIds,
+ Map<String, String> newSlaParams) {
+ super(jobId, actions, dates, childIds);
+ this.newSlaParams = newSlaParams;
+
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ }
+
+ @Override
+ protected void executeCoordCommand(String id, String actions, String dates) throws ServiceException,
+ CommandException {
+ new CoordSLAChangeXCommand(id, actions, dates, newSlaParams).call();
+ }
+
+ @Override
+ protected void updateJob() throws CommandException {
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ validateSLAChangeParam(newSlaParams);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
index d6a3197..953e899 100644
--- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
@@ -90,6 +90,7 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand {
}
if (bAction.isPending()) {
+ LOG.debug(bAction + " has pending flag set");
foundPending = true;
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 548946f..39e6ac1 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -32,6 +32,7 @@ import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.MaterializeTransitionXCommand;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
+import org.apache.oozie.coord.CoordUtils;
import org.apache.oozie.coord.TimeUnit;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
@@ -486,7 +487,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
actionBean.setTimeOut(timeout);
if (!dryrun) {
- storeToDB(actionBean, action); // Storing to table
+ storeToDB(actionBean, action, jobConf); // Storing to table
}
else {
@@ -524,26 +525,28 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
}
}
- private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception {
+ private void storeToDB(CoordinatorActionBean actionBean, String actionXml, Configuration jobConf) throws Exception {
LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
+ actionXml.length());
actionBean.setActionXml(actionXml);
insertList.add(actionBean);
- writeActionSlaRegistration(actionXml, actionBean);
+ writeActionSlaRegistration(actionXml, actionBean, jobConf);
}
- private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {
+ private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean, Configuration jobConf)
+ throws Exception {
Element eAction = XmlUtils.parseXml(actionXml);
Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
- SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
- .getUser(), coordJob.getGroup(), LOG);
- if(slaEvent != null) {
+ SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
+ SlaAppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getGroup(), LOG);
+ if (slaEvent != null) {
insertList.add(slaEvent);
}
// inserting into new table also
SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
- AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false);
+ AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false,
+ CoordUtils.isSlaAlertDisabled(actionBean, coordJob.getAppName(), jobConf));
}
private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsDisableXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsDisableXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsDisableXCommand.java
new file mode 100644
index 0000000..11daa41
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsDisableXCommand.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.util.ArrayList;
+
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLAOperations;
+import org.apache.oozie.sla.service.SLAService;
+import org.apache.oozie.util.XConfiguration;
+
+public class CoordSLAAlertsDisableXCommand extends CoordSLAAlertsXCommand {
+
+ public CoordSLAAlertsDisableXCommand(String id, String actions, String dates) {
+ super(id, "SLA.alerts.disable", "SLA.alerts.disable", actions, dates);
+
+ }
+
+ @SuppressWarnings("serial")
+ @Override
+ protected boolean executeSlaCommand() throws ServiceException, CommandException {
+ if (getActionList() == null) {
+ // if getActionList() == null, means enable command is for all child job.
+ return Services.get().get(SLAService.class).disableChildJobAlert(new ArrayList<String>() {
+ {
+ add(getJobId());
+
+ }
+ });
+ }
+ else {
+ return Services.get().get(SLAService.class).disableAlert(getActionList());
+ }
+
+ }
+
+ @Override
+ protected void updateJob() throws CommandException {
+ XConfiguration conf = new XConfiguration();
+ if (isJobRequest()) {
+ LOG.debug("Updating job property " + OozieClient.SLA_DISABLE_ALERT + " = " + SLAOperations.ALL_VALUE);
+ conf.set(OozieClient.SLA_DISABLE_ALERT, SLAOperations.ALL_VALUE);
+ }
+ else {
+ LOG.debug("Updating job property " + OozieClient.SLA_DISABLE_ALERT + " = " + SLAOperations.ALL_VALUE);
+ conf.set(OozieClient.SLA_DISABLE_ALERT, getActionDateListAsString());
+ }
+
+ updateJobConf(conf);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsEnableXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsEnableXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsEnableXCommand.java
new file mode 100644
index 0000000..936f13d
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsEnableXCommand.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.util.ArrayList;
+
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.service.SLAService;
+import org.apache.oozie.util.XConfiguration;
+
+public class CoordSLAAlertsEnableXCommand extends CoordSLAAlertsXCommand {
+
+ public CoordSLAAlertsEnableXCommand(String id, String actions, String dates) {
+ super(id, "SLA.alerts.enable", "SLA.alerts.enable", actions, dates);
+ }
+
+ @SuppressWarnings("serial")
+ @Override
+ protected boolean executeSlaCommand() throws ServiceException, CommandException {
+ if (getActionList() == null) {
+ // if getActionList() == null, means enable command is for all child job.
+ return Services.get().get(SLAService.class).enableChildJobAlert(new ArrayList<String>() {
+ {
+ add(getJobId());
+ }
+ });
+ }
+ else {
+ return Services.get().get(SLAService.class).enableAlert(getActionList());
+ }
+ }
+
+ @Override
+ protected void updateJob() throws CommandException {
+ XConfiguration conf = new XConfiguration();
+ if (isJobRequest()) {
+ conf.set(OozieClient.SLA_DISABLE_ALERT, "");
+ LOG.debug("Updating job property " + OozieClient.SLA_DISABLE_ALERT + " = ");
+ }
+ else {
+ conf.set(OozieClient.SLA_ENABLE_ALERT, getActionDateListAsString());
+ LOG.debug("Updating job property " + OozieClient.SLA_DISABLE_ALERT + " = " + getActionDateListAsString());
+
+ }
+ updateJobConf(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java
new file mode 100644
index 0000000..b8affd6
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.SLAAlertsXCommand;
+import org.apache.oozie.coord.CoordUtils;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.sla.SLAOperations;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+
+public abstract class CoordSLAAlertsXCommand extends SLAAlertsXCommand {
+
+ private String scope;
+ private String dates;
+ private List<String> actionIds;
+
+ @Override
+ protected void loadState() throws CommandException {
+ actionIds = getActionListForScopeAndDate(getJobId(), scope, dates);
+
+ }
+
+ public CoordSLAAlertsXCommand(String jobId, String name, String type, String actions, String dates) {
+ super(jobId, name, type);
+ this.scope = actions;
+ this.dates = dates;
+
+ }
+
+ /**
+ * Update job conf.
+ *
+ * @param newConf the new conf
+ * @throws CommandException the command exception
+ */
+ protected void updateJobConf(Configuration newConf) throws CommandException {
+
+ try {
+ CoordinatorJobBean job = new CoordinatorJobBean();
+ XConfiguration conf = null;
+ conf = getJobConf();
+ XConfiguration.copy(newConf, conf);
+ job.setId(getJobId());
+ job.setConf(XmlUtils.prettyPrint(conf).toString());
+ CoordJobQueryExecutor.getInstance().executeUpdate(
+ CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_CONF, job);
+ }
+
+ catch (XException e) {
+ throw new CommandException(e);
+ }
+ }
+
+ /**
+ * Update job sla.
+ *
+ * @param newParams the new params
+ * @throws CommandException the command exception
+ */
+ protected void updateJobSLA(Map<String, String> newParams) throws CommandException {
+
+ try {
+
+ CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(
+ CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_XML, getJobId());
+
+ Element eAction;
+ try {
+ eAction = XmlUtils.parseXml(job.getJobXml());
+ }
+ catch (JDOMException e) {
+ throw new CommandException(ErrorCode.E1005, e.getMessage(), e);
+ }
+ Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info",
+ eAction.getNamespace("sla"));
+
+ if (newParams != null) {
+ if (newParams.get(RestConstants.SLA_NOMINAL_TIME) != null) {
+ updateSlaTagElement(eSla, SLAOperations.NOMINAL_TIME,
+ newParams.get(RestConstants.SLA_NOMINAL_TIME));
+ }
+ if (newParams.get(RestConstants.SLA_SHOULD_START) != null) {
+ updateSlaTagElement(eSla, SLAOperations.SHOULD_START,
+ newParams.get(RestConstants.SLA_SHOULD_START));
+ }
+ if (newParams.get(RestConstants.SLA_SHOULD_END) != null) {
+ updateSlaTagElement(eSla, SLAOperations.SHOULD_END, newParams.get(RestConstants.SLA_SHOULD_END));
+ }
+ if (newParams.get(RestConstants.SLA_MAX_DURATION) != null) {
+ updateSlaTagElement(eSla, SLAOperations.MAX_DURATION,
+ newParams.get(RestConstants.SLA_MAX_DURATION));
+ }
+ }
+
+ String actualXml = XmlUtils.prettyPrint(eAction).toString();
+ job.setJobXml(actualXml);
+ job.setId(getJobId());
+
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_XML,
+ job);
+ }
+ catch (XException e) {
+ throw new CommandException(e);
+ }
+
+ }
+
+ /**
+ * Gets the action and date list as string.
+ *
+ * @return the action date list as string
+ */
+ protected String getActionDateListAsString() {
+ StringBuffer bf = new StringBuffer();
+ if (!StringUtils.isEmpty(dates)) {
+ bf.append(dates);
+ }
+
+ if (!StringUtils.isEmpty(scope)) {
+ if (!StringUtils.isEmpty(bf.toString())) {
+ bf.append(",");
+ }
+ bf.append(scope);
+ }
+
+ return bf.toString();
+
+ }
+
+ /**
+ * Gets the action list for scope and date.
+ *
+ * @param id the id
+ * @param scope the scope
+ * @param dates the dates
+ * @return the action list for scope and date
+ * @throws CommandException the command exception
+ */
+ private List<String> getActionListForScopeAndDate(String id, String scope, String dates) throws CommandException {
+ List<String> actionIds = new ArrayList<String>();
+
+ if (scope == null && dates == null) {
+ return null;
+ }
+ List<String> parsed = new ArrayList<String>();
+ if (dates != null) {
+ List<CoordinatorActionBean> actionSet = CoordUtils.getCoordActionsFromDates(id, dates, true);
+ for (CoordinatorActionBean action : actionSet) {
+ actionIds.add(action.getId());
+ }
+ parsed.addAll(actionIds);
+ }
+ if (scope != null) {
+ parsed.addAll(CoordUtils.getActionsIds(id, scope));
+ }
+ return parsed;
+ }
+
+ /**
+ * Gets the action list.
+ *
+ * @return the action list
+ */
+ protected List<String> getActionList() {
+ return actionIds;
+ }
+
+ protected boolean isJobRequest() {
+ return StringUtils.isEmpty(dates) && StringUtils.isEmpty(scope);
+ }
+
+
+ /**
+ * Update Sla tag element.
+ *
+ * @param elem the elem
+ * @param tagName the tag name
+ * @param value the value
+ */
+ public void updateSlaTagElement(Element elem, String tagName, String value) {
+ if (elem != null && elem.getChild(tagName, elem.getNamespace("sla")) != null) {
+ elem.getChild(tagName, elem.getNamespace("sla")).setText(value);
+ }
+ }
+
+ protected XConfiguration getJobConf() throws JPAExecutorException, CommandException {
+ CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(
+ CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_CONF, getJobId());
+ String jobConf = job.getConf();
+ XConfiguration conf = null;
+ try {
+ conf = new XConfiguration(new StringReader(jobConf));
+ }
+ catch (IOException e) {
+ throw new CommandException(ErrorCode.E1005, e.getMessage(), e);
+ }
+ return conf;
+ }
+
+}