You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/02/21 00:01:48 UTC
[2/4] oozie git commit: OOZIE-1913 Devise a way to turn off SLA
alerts for bundle/coordinator flexibly
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index fdce6b5..42313fd 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -38,13 +38,17 @@ import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.SLAEvent.EventStatus;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
@@ -52,17 +56,16 @@ import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
-import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.lock.LockToken;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.EventHandlerService;
@@ -76,7 +79,7 @@ import org.apache.oozie.sla.service.SLAService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.XLog;
-
+import org.apache.oozie.util.Pair;
import com.google.common.annotations.VisibleForTesting;
@@ -453,6 +456,17 @@ public class SLACalculatorMemory implements SLACalculator {
return memObj;
}
+ private SLACalcStatus getSLACalcStatus(String jobId) throws JPAExecutorException {
+ SLACalcStatus memObj;
+ memObj = slaMap.get(jobId);
+ if (memObj == null) {
+ memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId),
+ SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
+ }
+ return memObj;
+ }
+
+
@Override
public Iterator<String> iterator() {
return slaMap.keySet().iterator();
@@ -477,9 +491,9 @@ public class SLACalculatorMemory implements SLACalculator {
synchronized (slaCalc) {
boolean change = false;
// get eventProcessed on DB for validation in HA
- Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue(
- SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId);
- byte eventProc = ((Byte) eventProcObj).byteValue();
+ SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
+ byte eventProc = summaryBean.getEventProcessed();
if (eventProc >= 7) {
if (eventProc == 7) {
historySet.add(jobId);
@@ -488,6 +502,12 @@ public class SLACalculatorMemory implements SLACalculator {
LOG.trace("Removed Job [{0}] from map as SLA processed", jobId);
}
else {
+ if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
+ //Update last modified time.
+ slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
+ reloadExpectedTimeAndConfig(slaCalc);
+ LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
+ }
slaCalc.setEventProcessed(eventProc);
SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
// calculation w.r.t current time and status
@@ -499,7 +519,9 @@ public class SLACalculatorMemory implements SLACalculator {
if (eventProc != 8 && (eventProc & 1) == 0) {
// Some DB exception
slaCalc.setEventStatus(EventStatus.START_MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
eventProc++;
}
change = true;
@@ -525,7 +547,9 @@ public class SLACalculatorMemory implements SLACalculator {
if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
// Some DB exception
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
eventProc += 2;
}
change = true;
@@ -552,26 +576,16 @@ public class SLACalculatorMemory implements SLACalculator {
// Should not be > 8. But to handle any corner cases
slaCalc.setEventProcessed(8);
slaMap.remove(jobId);
+ LOG.trace("Removed Job [{0}] from map after Event-processed=8", jobId);
}
else {
slaCalc.setEventProcessed(eventProc);
}
- SLASummaryBean slaSummaryBean = new SLASummaryBean();
- slaSummaryBean.setId(slaCalc.getId());
- slaSummaryBean.setEventProcessed(eventProc);
- slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
- slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
- slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
- slaSummaryBean.setActualStart(slaCalc.getActualStart());
- slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
- slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
- slaSummaryBean.setLastModifiedTime(new Date());
- SLASummaryQueryExecutor.getInstance().executeUpdate(
- SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
+ writetoDB(slaCalc, eventProc);
if (eventProc == 7) {
historySet.add(jobId);
slaMap.remove(jobId);
- LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
+ LOG.trace("Removed Job [{0}] from map after Event-processed=7", jobId);
}
}
}
@@ -586,6 +600,48 @@ public class SLACalculatorMemory implements SLACalculator {
}
}
+ private void writetoDB(SLACalcStatus slaCalc, byte eventProc) throws JPAExecutorException {
+ SLASummaryBean slaSummaryBean = new SLASummaryBean();
+ slaSummaryBean.setId(slaCalc.getId());
+ slaSummaryBean.setEventProcessed(eventProc);
+ slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+ slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+ slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+ slaSummaryBean.setActualStart(slaCalc.getActualStart());
+ slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+ slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+ slaSummaryBean.setLastModifiedTime(new Date());
+
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+ slaSummaryBean);
+ LOG.trace("Stored SLA SummaryBean Job [{0}] with Event-processed=[{1}]", slaCalc.getId(),
+ slaSummaryBean.getEventProcessed());
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList)
+ throws JPAExecutorException {
+ updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean()));
+ slaCalc.setLastModifiedTime(new Date());
+ updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME, new SLASummaryBean(slaCalc)));
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void updateDBSlaExpectedValues(SLACalcStatus slaCalc, List<UpdateEntry> updateList)
+ throws JPAExecutorException {
+ slaCalc.setLastModifiedTime(new Date());
+ updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_EXPECTED_VALUE, slaCalc
+ .getSLARegistrationBean()));
+ updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
+ new SLASummaryBean(slaCalc)));
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void executeBatchQuery(List<UpdateEntry> updateList) throws JPAExecutorException {
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
+ }
+
+
/**
* Periodically run by the SLAService worker threads to update SLA status by
* iterating through all the jobs in the map
@@ -673,6 +729,8 @@ public class SLACalculatorMemory implements SLACalculator {
slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
slaMap.put(jobId, slaCalc);
+
+ @SuppressWarnings("rawtypes")
List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg));
updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
@@ -758,9 +816,17 @@ public class SLACalculatorMemory implements SLACalculator {
locked = slaCalc.isLocked();
if (locked) {
// get eventProcessed on DB for validation in HA
- Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance())
- .getSingleValue(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId);
- byte eventProc = ((Byte) eventProcObj).byteValue();
+ SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
+ byte eventProc = summaryBean.getEventProcessed();
+
+ if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
+ //Update last modified time.
+ slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
+ reloadExpectedTimeAndConfig(slaCalc);
+ LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
+ }
+
slaCalc.setEventProcessed(eventProc);
slaCalc.setJobStatus(jobStatus);
switch (jobEventStatus) {
@@ -824,7 +890,9 @@ public class SLACalculatorMemory implements SLACalculator {
else {
slaCalc.setEventStatus(EventStatus.START_MET);
}
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
}
eventProc += 1;
slaCalc.setEventProcessed(eventProc);
@@ -869,7 +937,9 @@ public class SLACalculatorMemory implements SLACalculator {
}
eventProc += 4;
slaCalc.setEventProcessed(eventProc);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
}
return getSLASummaryBean(slaCalc);
}
@@ -891,7 +961,9 @@ public class SLACalculatorMemory implements SLACalculator {
if (slaCalc.getEventProcessed() < 4) {
slaCalc.setEventStatus(EventStatus.END_MISS);
slaCalc.setSLAStatus(SLAStatus.MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
slaCalc.setEventProcessed(7);
return getSLASummaryBean(slaCalc);
}
@@ -905,7 +977,9 @@ public class SLACalculatorMemory implements SLACalculator {
if (((eventProc >> 1) & 1) == 0) {
if (expectedDuration != -1) {
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
}
eventProc += 2;
slaCalc.setEventProcessed(eventProc);
@@ -915,7 +989,9 @@ public class SLACalculatorMemory implements SLACalculator {
slaCalc.setSLAStatus(SLAStatus.MISS);
eventProc += 4;
slaCalc.setEventProcessed(eventProc);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
}
return getSLASummaryBean(slaCalc);
}
@@ -934,13 +1010,16 @@ public class SLACalculatorMemory implements SLACalculator {
}
private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
- if (expected != -1 && actual > expected) {
- slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- else if (expected != -1 && actual <= expected) {
- slaCalc.setEventStatus(EventStatus.DURATION_MET);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (expected != -1) {
+ if (actual > expected) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ }
+ else if (actual <= expected) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MET);
+ }
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
}
}
@@ -1016,7 +1095,9 @@ public class SLACalculatorMemory implements SLACalculator {
else {
slaCalc.setEventStatus(EventStatus.START_MET);
}
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
}
slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
if (((eventProc >> 1) & 1) == 0) {
@@ -1030,7 +1111,9 @@ public class SLACalculatorMemory implements SLACalculator {
else {
slaCalc.setEventStatus(EventStatus.END_MET);
}
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
}
slaCalc.setEventProcessed(8);
}
@@ -1046,12 +1129,16 @@ public class SLACalculatorMemory implements SLACalculator {
else {
slaCalc.setEventStatus(EventStatus.START_MET);
}
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
eventProc++;
}
else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
slaCalc.setEventStatus(EventStatus.START_MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
eventProc++;
}
}
@@ -1059,14 +1146,18 @@ public class SLACalculatorMemory implements SLACalculator {
&& slaCalc.getExpectedDuration() != -1) {
if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
eventProc += 2;
}
}
if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
slaCalc.setEventStatus(EventStatus.END_MISS);
slaCalc.setSLAStatus(SLAStatus.MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
eventProc += 4;
}
slaCalc.setEventProcessed(eventProc);
@@ -1078,12 +1169,36 @@ public class SLACalculatorMemory implements SLACalculator {
if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
slaCalc.setEventStatus(EventStatus.END_MISS);
slaCalc.setSLAStatus(SLAStatus.MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (shouldAlert(slaCalc)) {
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4);
}
}
}
+ public void reloadExpectedTimeAndConfig(SLACalcStatus slaCalc) throws JPAExecutorException {
+ SLARegistrationBean regBean = SLARegistrationQueryExecutor.getInstance().get(
+ SLARegQuery.GET_SLA_EXPECTED_VALUE_CONFIG, slaCalc.getId());
+
+ if (regBean.getExpectedDuration() > 0) {
+ slaCalc.getSLARegistrationBean().setExpectedDuration(regBean.getExpectedDuration());
+ }
+ if (regBean.getExpectedEnd() != null) {
+ slaCalc.getSLARegistrationBean().setExpectedEnd(regBean.getExpectedEnd());
+ }
+ if (regBean.getExpectedStart() != null) {
+ slaCalc.getSLARegistrationBean().setExpectedStart(regBean.getExpectedStart());
+ }
+ if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) {
+ slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
+ regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT));
+ }
+ if (regBean.getNominalTime() != null) {
+ slaCalc.getSLARegistrationBean().setNominalTime(regBean.getNominalTime());
+ }
+ }
+
@VisibleForTesting
public boolean isJobIdInSLAMap(String jobId) {
return this.slaMap.containsKey(jobId);
@@ -1097,4 +1212,99 @@ public class SLACalculatorMemory implements SLACalculator {
private void setLogPrefix(String jobId) {
LOG = LogUtils.setLogInfo(LOG, jobId, null, null);
}
+
+ @Override
+ public boolean enableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException {
+ boolean isJobFound = false;
+ @SuppressWarnings("rawtypes")
+ List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
+ for (String jobId : jobIds) {
+ SLACalcStatus slaCalc = getSLACalcStatus(jobId);
+ if (slaCalc != null) {
+ slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT);
+ updateDBSlaConfig(slaCalc, updateList);
+ isJobFound = true;
+ }
+ }
+ executeBatchQuery(updateList);
+ return isJobFound;
+ }
+
+ @Override
+ public boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException {
+ return enableAlert(getSLAJobsforParents(parentJobIds));
+ }
+
+
+ @Override
+ public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException {
+ boolean isJobFound = false;
+ @SuppressWarnings("rawtypes")
+ List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
+
+ for (String jobId : jobIds) {
+ SLACalcStatus slaCalc = getSLACalcStatus(jobId);
+ if (slaCalc != null) {
+ slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true));
+ updateDBSlaConfig(slaCalc, updateList);
+ isJobFound = true;
+ }
+ }
+ executeBatchQuery(updateList);
+ return isJobFound;
+ }
+
+ @Override
+ public boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException {
+ return disableAlert(getSLAJobsforParents(parentJobIds));
+ }
+
+ @Override
+ public boolean changeDefinition(List<Pair<String, Map<String,String>>> jobIdsSLAPair ) throws JPAExecutorException,
+ ServiceException{
+ boolean isJobFound = false;
+ @SuppressWarnings("rawtypes")
+ List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
+ for (Pair<String, Map<String,String>> jobIdSLAPair : jobIdsSLAPair) {
+ SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist());
+ if (slaCalc != null) {
+ updateParams(slaCalc, jobIdSLAPair.getSecond());
+ updateDBSlaExpectedValues(slaCalc, updateList);
+ isJobFound = true;
+ }
+ }
+ executeBatchQuery(updateList);
+ return isJobFound;
+ }
+
+ private void updateParams(SLACalcStatus slaCalc, Map<String, String> newParams) throws ServiceException {
+ SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
+ if (newParams != null) {
+ try {
+ Date newNominal = SLAOperations.setNominalTime(newParams.get(RestConstants.SLA_NOMINAL_TIME), reg);
+ SLAOperations.setExpectedStart(newParams.get(RestConstants.SLA_SHOULD_START), newNominal, reg);
+ SLAOperations.setExpectedEnd(newParams.get(RestConstants.SLA_SHOULD_END), newNominal, reg);
+ SLAOperations.setExpectedDuration(newParams.get(RestConstants.SLA_MAX_DURATION), reg);
+ }
+ catch (CommandException ce) {
+ throw new ServiceException(ce);
+ }
+ }
+ }
+
+ private boolean shouldAlert(SLACalcStatus slaObj) {
+ return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT);
+ }
+
+ private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException{
+ List<String> childJobIds = new ArrayList<String>();
+ for (String jobId : parentJobIds) {
+ List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList(
+ SLARegQuery.GET_SLA_REG_FOR_PARENT_ID, jobId);
+ for (SLARegistrationBean bean : registrationBeanList) {
+ childJobIds.add(bean.getId());
+ }
+ }
+ return childJobIds;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLAOperations.java b/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
index f5fc826..3905003 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
@@ -23,15 +23,14 @@ import java.util.Date;
import org.apache.oozie.AppType;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.event.SLAEvent.EventStatus;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
-import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.sla.service.SLAService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XLog;
@@ -40,14 +39,20 @@ import org.jdom.Element;
public class SLAOperations {
- private static final String NOMINAL_TIME = "nominal-time";
- private static final String SHOULD_START = "should-start";
- private static final String SHOULD_END = "should-end";
- private static final String MAX_DURATION = "max-duration";
- private static final String ALERT_EVENTS = "alert-events";
+ public static final String NOMINAL_TIME = "nominal-time";
+ public static final String SHOULD_START = "should-start";
+ public static final String SHOULD_END = "should-end";
+ public static final String MAX_DURATION = "max-duration";
+ public static final String ALERT_EVENTS = "alert-events";
+ public static final String ALL_VALUE = "ALL";
+
+
+ static public XLog LOG = XLog.getLog(SLAOperations.class);
+
public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, String parentId,
- AppType appType, String user, String appName, XLog log, boolean rerun) throws CommandException {
+ AppType appType, String user, String appName, XLog log, boolean rerun, boolean disableAlert)
+ throws CommandException {
if (eSla == null || !SLAService.isEnabled()) {
log.debug("Not registering SLA for job [{0}]. Sla-Xml null OR SLAService not enabled", jobId);
return null;
@@ -56,56 +61,19 @@ public class SLAOperations {
// Setting nominal time
String strNominalTime = getTagElement(eSla, NOMINAL_TIME);
- if (strNominalTime == null || strNominalTime.length() == 0) {
- throw new CommandException(ErrorCode.E1101, NOMINAL_TIME);
- }
- Date nominalTime;
- try {
- nominalTime = DateUtils.parseDateOozieTZ(strNominalTime);
- sla.setNominalTime(nominalTime);
- }
- catch (ParseException pex) {
- throw new CommandException(ErrorCode.E0302, strNominalTime, pex);
- }
+ Date nominalTime = setNominalTime(strNominalTime, sla);
// Setting expected start time
String strExpectedStart = getTagElement(eSla, SHOULD_START);
- if (strExpectedStart != null) {
- float expectedStart = Float.parseFloat(strExpectedStart);
- if (expectedStart < 0) {
- throw new CommandException(ErrorCode.E0302, strExpectedStart, "for SLA Expected start time");
- }
- else {
- Date expectedStartTime = new Date(nominalTime.getTime() + (long) (expectedStart * 60 * 1000));
- sla.setExpectedStart(expectedStartTime);
- }
- }
+ setExpectedStart(strExpectedStart, nominalTime, sla);
// Setting expected end time
String strExpectedEnd = getTagElement(eSla, SHOULD_END);
- if (strExpectedEnd == null || strExpectedEnd.length() == 0) {
- throw new CommandException(ErrorCode.E1101, SHOULD_END);
- }
- float expectedEnd = Float.parseFloat(strExpectedEnd);
- if (expectedEnd < 0) {
- throw new CommandException(ErrorCode.E0302, strExpectedEnd, "for SLA Expected end time");
- }
- else {
- Date expectedEndTime = new Date(nominalTime.getTime() + (long) (expectedEnd * 60 * 1000));
- sla.setExpectedEnd(expectedEndTime);
- }
+ setExpectedEnd(strExpectedEnd, nominalTime, sla);
// Setting expected duration in milliseconds
String expectedDurationStr = getTagElement(eSla, MAX_DURATION);
- if (expectedDurationStr != null && expectedDurationStr.length() > 0) {
- float expectedDuration = Float.parseFloat(expectedDurationStr);
- if (expectedDuration > 0) {
- sla.setExpectedDuration((long) (expectedDuration * 60 * 1000));
- }
- }
- else if (sla.getExpectedStart() != null) {
- sla.setExpectedDuration(sla.getExpectedEnd().getTime() - sla.getExpectedStart().getTime());
- }
+ setExpectedDuration(expectedDurationStr, sla);
// Parse desired alert-types i.e. start-miss, end-miss, start-met etc..
String alertEvents = getTagElement(eSla, ALERT_EVENTS);
@@ -134,6 +102,10 @@ public class SLAOperations {
sla.setAlertContact(getTagElement(eSla, "alert-contact"));
sla.setUpstreamApps(getTagElement(eSla, "upstream-apps"));
+ //disable Alert flag in slaConfig
+ if (disableAlert) {
+ sla.addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(disableAlert));
+ }
// Oozie defined
sla.setId(jobId);
sla.setAppType(appType);
@@ -158,6 +130,68 @@ public class SLAOperations {
return sla;
}
+ public static Date setNominalTime(String strNominalTime, SLARegistrationBean sla) throws CommandException {
+ if (strNominalTime == null || strNominalTime.length() == 0) {
+ return sla.getNominalTime();
+ }
+ Date nominalTime;
+ try {
+ nominalTime = DateUtils.parseDateOozieTZ(strNominalTime);
+ sla.setNominalTime(nominalTime);
+ }
+ catch (ParseException pex) {
+ throw new CommandException(ErrorCode.E0302, strNominalTime, pex);
+ }
+ return nominalTime;
+ }
+
+ public static void setExpectedStart(String strExpectedStart, Date nominalTime, SLARegistrationBean sla)
+ throws CommandException {
+ if (strExpectedStart != null) {
+ float expectedStart = Float.parseFloat(strExpectedStart);
+ if (expectedStart < 0) {
+ throw new CommandException(ErrorCode.E0302, strExpectedStart, "for SLA Expected start time");
+ }
+ else {
+ Date expectedStartTime = new Date(nominalTime.getTime() + (long) (expectedStart * 60 * 1000));
+ sla.setExpectedStart(expectedStartTime);
+ LOG.debug("Setting expected start to " + expectedStartTime + " for job " + sla.getId());
+ }
+ }
+ }
+
+ public static void setExpectedEnd(String strExpectedEnd, Date nominalTime, SLARegistrationBean sla)
+ throws CommandException {
+ if (strExpectedEnd != null) {
+ float expectedEnd = Float.parseFloat(strExpectedEnd);
+ if (expectedEnd < 0) {
+ throw new CommandException(ErrorCode.E0302, strExpectedEnd, "for SLA Expected end time");
+ }
+ else {
+ Date expectedEndTime = new Date(nominalTime.getTime() + (long) (expectedEnd * 60 * 1000));
+ sla.setExpectedEnd(expectedEndTime);
+ LOG.debug("Setting expected end to " + expectedEndTime + " for job " + sla.getId());
+
+ }
+ }
+ }
+
+ public static void setExpectedDuration(String expectedDurationStr, SLARegistrationBean sla) {
+ if (expectedDurationStr != null && expectedDurationStr.length() > 0) {
+ float expectedDuration = Float.parseFloat(expectedDurationStr);
+ if (expectedDuration > 0) {
+ long duration = (long) (expectedDuration * 60 * 1000);
+ LOG.debug("Setting expected duration to " + duration + " for job " + sla.getId());
+ sla.setExpectedDuration(duration);
+ }
+ }
+ else if (sla.getExpectedStart() != null) {
+ long duration = sla.getExpectedEnd().getTime() - sla.getExpectedStart().getTime();
+ LOG.debug("Setting expected duration to " + duration + " for job " + sla.getId());
+ sla.setExpectedDuration(sla.getExpectedEnd().getTime() - sla.getExpectedStart().getTime());
+ }
+ }
+
/**
* Retrieve registration event
* @param jobId the jobId
@@ -165,7 +199,6 @@ public class SLAOperations {
* @throws JPAExecutorException
*/
public static void updateRegistrationEvent(String jobId) throws CommandException, JPAExecutorException {
- JPAService jpaService = Services.get().get(JPAService.class);
SLAService slaService = Services.get().get(SLAService.class);
try {
SLARegistrationBean reg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
@@ -203,7 +236,15 @@ public class SLAOperations {
return createSlaRegistrationEvent(eSla, jobId, null, appType, user, null, log, false);
}
- private static String getTagElement(Element elem, String tagName) {
+ /*
+ * default disableAlert flag
+ */
+ public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, String parentId,
+ AppType appType, String user, String appName, XLog log, boolean rerun) throws CommandException {
+ return createSlaRegistrationEvent(eSla, jobId, null, appType, user, appName, log, rerun, false);
+ }
+
+ public static String getTagElement(Element elem, String tagName) {
if (elem != null && elem.getChild(tagName, elem.getNamespace("sla")) != null) {
return elem.getChild(tagName, elem.getNamespace("sla")).getText().trim();
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java b/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
index 0770bd3..1b8370f 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
@@ -33,7 +33,6 @@ import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Table;
import javax.persistence.Transient;
-
import org.apache.oozie.AppType;
import org.apache.oozie.client.event.Event.MessageType;
import org.apache.oozie.client.rest.JsonBean;
@@ -48,9 +47,21 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "UPDATE_SLA_REG_ALL", query = "update SLARegistrationBean w set w.jobId = :jobId, w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration, w.slaConfig = :slaConfig, w.notificationMsg = :notificationMsg, w.upstreamApps = :upstreamApps, w.appType = :appType, w.appName = :appName, w.user = :user, w.parentId = :parentId, w.jobData = :jobData where w.jobId = :jobId"),
+ @NamedQuery(name = "UPDATE_SLA_CONFIG", query = "update SLARegistrationBean w set w.slaConfig = :slaConfig where w.jobId = :jobId"),
+
+ @NamedQuery(name = "UPDATE_SLA_EXPECTED_VALUE", query = "update SLARegistrationBean w set w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime , w.expectedDuration = :expectedDuration where w.jobId = :jobId"),
+
@NamedQuery(name = "GET_SLA_REG_ON_RESTART", query = "select w.notificationMsg, w.upstreamApps, w.slaConfig, w.jobData from SLARegistrationBean w where w.jobId = :id"),
- @NamedQuery(name = "GET_SLA_REG_ALL", query = "select OBJECT(w) from SLARegistrationBean w where w.jobId = :id") })
+ @NamedQuery(name = "GET_SLA_REG_ALL", query = "select OBJECT(w) from SLARegistrationBean w where w.jobId = :id"),
+
+ @NamedQuery(name = "GET_SLA_CONFIGS", query = "select w.jobId, w.slaConfig from SLARegistrationBean w where w.jobId IN (:ids)"),
+
+ @NamedQuery(name = "GET_SLA_EXPECTED_VALUE_CONFIG", query = "select w.jobId, w.slaConfig, w.expectedStartTS, w.expectedEndTS, w.expectedDuration, w.nominalTimeTS from SLARegistrationBean w where w.jobId = :id"),
+
+ @NamedQuery(name = "GET_SLA_REG_FOR_PARENT_ID", query = "select w.jobId, w.slaConfig from SLARegistrationBean w where w.parentId = :parentId")
+ })
+
public class SLARegistrationBean implements JsonBean {
@Id
@@ -281,10 +292,21 @@ public class SLARegistrationBean implements JsonBean {
slaConfig = slaConfigMapToString();
}
- public Map<String, String> getSlaConfigMap() {
+
+ public Map<String, String> getSLAConfigMap() {
return slaConfigMap;
}
+ public void addToSLAConfigMap(String key, String value) {
+ slaConfigMap.put(key, value);
+ slaConfig = slaConfigMapToString();
+ }
+
+ public void removeFromSLAConfigMap(String key) {
+ slaConfigMap.remove(key);
+ slaConfig = slaConfigMapToString();
+ }
+
private void slaConfigStringToMap() {
if (slaConfig != null) {
String[] splitString = slaConfig.split("},");
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
index 9907dd0..a88dcf6 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
@@ -21,6 +21,7 @@ package org.apache.oozie.sla;
import java.sql.Timestamp;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import javax.persistence.Basic;
import javax.persistence.Column;
@@ -31,6 +32,7 @@ import javax.persistence.NamedQuery;
import javax.persistence.Table;
import org.apache.oozie.AppType;
+import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.JsonTags;
@@ -50,15 +52,22 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration, w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"),
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES", query = "update SLASummaryBean w set w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration , w.lastModifiedTS = :lastModTime where w.jobId = :jobId"),
+
@NamedQuery(name = "UPDATE_SLA_SUMMARY_EVENTPROCESSED", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed where w.jobId = :jobId"),
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME", query = "update SLASummaryBean w set w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"),
+
@NamedQuery(name = "UPDATE_SLA_SUMMARY_ALL", query = "update SLASummaryBean w set w.jobId = :jobId, w.appName = :appName, w.appType = :appType, w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration, w.jobStatus = :jobStatus, w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.lastModifiedTS = :lastModTime, w.user = :user, w.parentId = :parentId, w.eventProcessed = :eventProcessed, w.actualDuration = :actualDuration, w.actualEndTS = :actualEndTS, w.actualStartTS = :actualStartTS where w.jobId = :jobId"),
@NamedQuery(name = "GET_SLA_SUMMARY", query = "select OBJECT(w) from SLASummaryBean w where w.jobId = :id"),
@NamedQuery(name = "GET_SLA_SUMMARY_RECORDS_RESTART", query = "select OBJECT(w) from SLASummaryBean w where w.eventProcessed <= 7 AND w.lastModifiedTS >= :lastModifiedTime"),
- @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED", query = "select w.eventProcessed from SLASummaryBean w where w.jobId = :id")
+ @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED", query = "select w.eventProcessed from SLASummaryBean w where w.jobId = :id"),
+
+ @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED", query = "select w.eventProcessed, w.lastModifiedTS from SLASummaryBean w where w.jobId = :id")
+
})
/**
@@ -431,6 +440,7 @@ public class SLASummaryBean implements JsonBean {
json.put(JsonTags.SLA_SUMMARY_JOB_STATUS, jobStatus);
json.put(JsonTags.SLA_SUMMARY_SLA_STATUS, slaStatus);
json.put(JsonTags.SLA_SUMMARY_LAST_MODIFIED, JsonUtils.formatDateRfc822(lastModifiedTS, timeZoneId));
+
return json;
}
}
@@ -455,4 +465,25 @@ public class SLASummaryBean implements JsonBean {
return json;
}
+ @SuppressWarnings("unchecked")
+ public static JSONObject toJSONObject(List<? extends SLASummaryBean> slaSummaryList,
+ Map<String, Map<String, String>> slaConfigMap, String timeZoneId) {
+ JSONObject json = new JSONObject();
+ JSONArray array = new JSONArray();
+ if (slaSummaryList != null) {
+ for (SLASummaryBean summary : slaSummaryList) {
+ JSONObject slaJson = summary.toJSONObject(timeZoneId);
+ String slaAlertStatus = "";
+ if (slaConfigMap.containsKey(summary.getId())) {
+ slaAlertStatus = slaConfigMap.get(summary.getId()).containsKey(
+ OozieClient.SLA_DISABLE_ALERT) ? "Disabled" : "Enabled";
+ }
+ slaJson.put(JsonTags.SLA_ALERT_STATUS, slaAlertStatus);
+ array.add(slaJson);
+ }
+ }
+ json.put(JsonTags.SLA_SUMMARY_LIST, array);
+ return json;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
index a4562e7..ef1d335 100644
--- a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
+++ b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
@@ -19,6 +19,8 @@
package org.apache.oozie.sla.service;
import java.util.Date;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
@@ -33,6 +35,7 @@ import org.apache.oozie.service.Services;
import org.apache.oozie.sla.SLACalculator;
import org.apache.oozie.sla.SLACalculatorMemory;
import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.util.Pair;
import org.apache.oozie.util.XLog;
import com.google.common.annotations.VisibleForTesting;
@@ -107,7 +110,6 @@ public class SLAService implements Service {
return calcImpl;
}
- @VisibleForTesting
public void runSLAWorker() {
new SLAWorker(calcImpl).run();
}
@@ -181,4 +183,94 @@ public class SLAService implements Service {
calcImpl.removeRegistration(jobId);
}
+ /**
+ * Enable jobs sla alert.
+ *
+ * @param jobIds the job ids
+ * @param isParentJob, if jobIds are parent job
+ * @return true, if successful
+ * @throws ServiceException the service exception
+ */
+ public boolean enableAlert(List<String> jobIds) throws ServiceException {
+ try {
+ return calcImpl.enableAlert(jobIds);
+ }
+ catch (JPAExecutorException jpe) {
+ LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0));
+ throw new ServiceException(jpe);
+ }
+ }
+
+ /**
+ * Enable child jobs sla alert.
+ *
+ * @param jobIds the parent job ids
+ * @param isParentJob, if jobIds are parent job
+ * @return true, if successful
+ * @throws ServiceException the service exception
+ */
+ public boolean enableChildJobAlert(List<String> parentJobIds) throws ServiceException {
+ try {
+ return calcImpl.enableChildJobAlert(parentJobIds);
+ }
+ catch (JPAExecutorException jpe) {
+ LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0));
+ throw new ServiceException(jpe);
+ }
+ }
+
+ /**
+ * Disable jobs Sla alert.
+ *
+ * @param jobIds the job ids
+ * @param isParentJob, if jobIds are parent job
+ * @return true, if successful
+ * @throws ServiceException the service exception
+ */
+ public boolean disableAlert(List<String> jobIds) throws ServiceException {
+ try {
+ return calcImpl.disableAlert(jobIds);
+ }
+ catch (JPAExecutorException jpe) {
+ LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0));
+ throw new ServiceException(jpe);
+ }
+ }
+
+ /**
+ * Disable child jobs Sla alert.
+ *
+ * @param jobIds the parent job ids
+ * @param isParentJob, if jobIds are parent job
+ * @return true, if successful
+ * @throws ServiceException the service exception
+ */
+ public boolean disableChildJobAlert(List<String> parentJobIds) throws ServiceException {
+ try {
+ return calcImpl.disableChildJobAlert(parentJobIds);
+ }
+ catch (JPAExecutorException jpe) {
+ LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0));
+ throw new ServiceException(jpe);
+ }
+ }
+
+ /**
+ * Change jobs Sla definitions
+ * It takes list of pairs of jobid and key/value pairs of el evaluated sla definition.
+ * Support definition are sla-should-start, sla-should-end, sla-nominal-time and sla-max-duration.
+ *
+ * @param jobIdsSLAPair the job ids sla pair
+ * @return true, if successful
+ * @throws ServiceException the service exception
+ */
+ public boolean changeDefinition(List<Pair<String, Map<String, String>>> idSlaDefinitionList)
+ throws ServiceException {
+ try {
+ return calcImpl.changeDefinition(idSlaDefinitionList);
+ }
+ catch (JPAExecutorException jpe) {
+ throw new ServiceException(jpe);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java b/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java
index 7c2620c..1c565ef 100644
--- a/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java
+++ b/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java
@@ -30,12 +30,11 @@ import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionModifiedDateForRangeJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionIdsForDateRangeJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionRunningCountForRangeJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsByDatesForKillJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -139,10 +138,13 @@ public class CoordActionsInDateRange {
throw new XException(ErrorCode.E0308, "'" + range + "'. Start date '" + start + "' is older than end date: '" + end
+ "'");
}
- List<String> list = null;
- JPAService jpaService = Services.get().get(JPAService.class);
- list = jpaService.execute(new CoordJobGetActionIdsForDateRangeJPAExecutor(jobId, start, end));
- return list;
+ List<CoordinatorActionBean> listOfActions = CoordActionQueryExecutor.getInstance().getList(
+ CoordActionQuery.GET_TERMINATED_ACTIONS_FOR_DATES, jobId, start, end);
+ List<String> idsList = new ArrayList<String>();
+ for ( CoordinatorActionBean bean : listOfActions){
+ idsList.add(bean.getId());
+ }
+ return idsList;
}
/**
@@ -156,12 +158,13 @@ public class CoordActionsInDateRange {
private static List<CoordinatorActionBean> getActionsFromDateRange(String jobId, Date start, Date end,
boolean active) throws XException {
List<CoordinatorActionBean> list;
- JPAService jpaService = Services.get().get(JPAService.class);
if (!active) {
- list = jpaService.execute(new CoordJobGetActionsForDatesJPAExecutor(jobId, start, end));
+ list = CoordActionQueryExecutor.getInstance().getList(
+ CoordActionQuery.GET_TERMINATED_ACTIONS_FOR_DATES, jobId, start, end);
}
else {
- list = jpaService.execute(new CoordJobGetActionsByDatesForKillJPAExecutor(jobId, start, end));
+ list = CoordActionQueryExecutor.getInstance().getList(
+ CoordActionQuery.GET_ACTIVE_ACTIONS_FOR_DATES, jobId, start, end);
}
return list;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 6f76b07..b40fec0 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2211,6 +2211,15 @@
</description>
</property>
+ <property>
+ <name>oozie.sla.disable.alerts.older.than</name>
+ <value>48</value>
+ <description>
+ Time threshold, in HOURS, for disabling SLA alerting for jobs whose
+ nominal time is older than this.
+ </description>
+ </property>
+
<!-- ZooKeeper configuration -->
<property>
<name>oozie.zookeeper.connection.string</name>
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/test/java/org/apache/oozie/command/TestSLAAlertXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/TestSLAAlertXCommand.java b/core/src/test/java/org/apache/oozie/command/TestSLAAlertXCommand.java
new file mode 100644
index 0000000..ce59885
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/TestSLAAlertXCommand.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command;
+
+import java.io.StringReader;
+import java.util.Date;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.AppType;
+import org.apache.oozie.BaseEngineException;
+import org.apache.oozie.BundleEngine;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorEngine;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.sla.SLACalculatorMemory;
+import org.apache.oozie.sla.SLAOperations;
+import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.sla.service.SLAService;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XConfiguration;
+
+public class TestSLAAlertXCommand extends XDataTestCase {
+ private Services services;
+ SLACalculatorMemory slaCalcMemory;
+ BundleJobBean bundle;
+ CoordinatorJobBean coord1, coord2;
+ final BundleEngine bundleEngine = new BundleEngine("u");
+ Date startTime;
+ final Date endTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000);
+ final int timeInSec = 60 * 1000;
+ final String data = "2014-01-01T00:00Z";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ Configuration conf = services.get(ConfigurationService.class).getConf();
+ conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService,"
+ + "org.apache.oozie.sla.service.SLAService");
+ conf.setInt(SLAService.CONF_SLA_CHECK_INTERVAL, 600);
+ services.init();
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ LocalOozie.stop();
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testBundleSLAAlertCommands() throws Exception {
+ setupSLAJobs();
+ String jobIdsStr = bundle.getId();
+ String actions = "1,2";
+ String coords = null;
+ bundleEngine.disableSLAAlert(jobIdsStr, actions, null, coords);
+ checkSLAStatus(coord1.getId() + "@1", true);
+ checkSLAStatus(coord1.getId() + "@2", true);
+ checkSLAStatus(coord1.getId() + "@3", false);
+ checkSLAStatus(coord1.getId() + "@5", false);
+ checkSLAStatus(coord1.getId() + "@4", false);
+ checkSLAStatus(coord2.getId() + "@1", true);
+ checkSLAStatus(coord2.getId() + "@1", true);
+
+ bundleEngine.enableSLAAlert(jobIdsStr, null, null, null);
+ checkSLAStatus(coord1.getId() + "@1", false);
+ checkSLAStatus(coord1.getId() + "@2", false);
+ checkSLAStatus(coord1.getId() + "@3", false);
+ checkSLAStatus(coord1.getId() + "@5", false);
+ checkSLAStatus(coord1.getId() + "@4", false);
+ checkSLAStatus(coord2.getId() + "@1", false);
+ checkSLAStatus(coord2.getId() + "@2", false);
+
+ CoordinatorJobBean job1 = CoordJobQueryExecutor.getInstance().get(
+ CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coord1.getId());
+ XConfiguration xConf = new XConfiguration(new StringReader(job1.getConf()));
+ assertEquals(xConf.get(OozieClient.SLA_DISABLE_ALERT), null);
+
+ CoordinatorJobBean job2 = CoordJobQueryExecutor.getInstance().get(
+ CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coord2.getId());
+ xConf = new XConfiguration(new StringReader(job2.getConf()));
+ assertEquals(xConf.get(OozieClient.SLA_DISABLE_ALERT), null);
+
+ bundleEngine.disableSLAAlert(jobIdsStr, null, null, "coord1");
+ checkSLAStatus(coord1.getId() + "@1", true);
+ checkSLAStatus(coord1.getId() + "@2", true);
+ checkSLAStatus(coord1.getId() + "@3", true);
+ checkSLAStatus(coord1.getId() + "@4", true);
+ checkSLAStatus(coord1.getId() + "@5", true);
+ checkSLAStatus(coord2.getId() + "@1", false);
+ checkSLAStatus(coord2.getId() + "@2", false);
+
+ job1 = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB,
+ coord1.getId());
+ xConf = new XConfiguration(new StringReader(job1.getConf()));
+ assertEquals(xConf.get(OozieClient.SLA_DISABLE_ALERT), SLAOperations.ALL_VALUE);
+ bundleEngine.disableSLAAlert(jobIdsStr, null, null, "coord2");
+ // with multiple coordID.
+
+ String dates = "2014-01-01T00:00Z::2014-01-03T00:00Z";
+ bundleEngine.enableSLAAlert(jobIdsStr, null, dates, "coord1," + coord2.getId());
+ checkSLAStatus(coord1.getId() + "@1", false);
+ checkSLAStatus(coord1.getId() + "@2", false);
+ checkSLAStatus(coord1.getId() + "@3", false);
+ checkSLAStatus(coord1.getId() + "@4", true);
+ checkSLAStatus(coord1.getId() + "@5", true);
+ checkSLAStatus(coord2.getId() + "@1", false);
+ checkSLAStatus(coord2.getId() + "@2", false);
+ checkSLAStatus(coord2.getId() + "@3", false);
+ checkSLAStatus(coord2.getId() + "@4", true);
+
+ try {
+ bundleEngine.disableSLAAlert(jobIdsStr, null, null, "dummy");
+ fail("Should throw Exception");
+ }
+ catch (BaseEngineException e) {
+ assertEquals(e.getErrorCode(), ErrorCode.E1026);
+ }
+
+ }
+
+ public void testSLAChangeCommand() throws Exception {
+ setupSLAJobs();
+ String newParams = RestConstants.SLA_SHOULD_END + "=10";
+ String jobIdsStr = bundle.getId();
+ String coords = coord1.getAppName();
+ bundleEngine.changeSLA(jobIdsStr, null, null, coords, newParams);
+
+ assertEquals(getSLACalcStatus(coord1.getId() + "@1").getExpectedEnd().getTime(),
+ getSLACalcStatus(coord1.getId() + "@1").getNominalTime().getTime() + 10 * timeInSec);
+ assertEquals(getSLACalcStatus(coord1.getId() + "@2").getExpectedEnd().getTime(),
+ getSLACalcStatus(coord1.getId() + "@2").getNominalTime().getTime() + 10 * timeInSec);
+
+ assertEquals(getSLACalcStatus(coord1.getId() + "@5").getExpectedEnd().getTime(),
+ getSLACalcStatus(coord1.getId() + "@5").getNominalTime().getTime() + 10 * timeInSec);
+ newParams = "non-valid-param=10";
+ try {
+ bundleEngine.changeSLA(jobIdsStr, null, null, coords, newParams);
+ fail("Should throw Exception");
+ }
+ catch (BaseEngineException e) {
+ assertEquals(e.getErrorCode(), ErrorCode.E1027);
+ }
+ try {
+ new CoordinatorEngine().changeSLA(coord1.getId(), null, null, null, newParams);
+ fail("Should throw Exception");
+ }
+ catch (BaseEngineException e) {
+ assertEquals(e.getErrorCode(), ErrorCode.E1027);
+ }
+ }
+
+ public void testCoordSLAAlertCommands() throws Exception {
+ setupSLAJobs();
+
+ final CoordinatorEngine engine = new CoordinatorEngine("u");
+ String jobIdsStr = coord1.getId();
+ String actions = "1-3,5";
+ String coords = null;
+ engine.disableSLAAlert(jobIdsStr, actions, null, coords);
+ checkSLAStatus(coord1.getId() + "@1", true);
+ checkSLAStatus(coord1.getId() + "@2", true);
+ checkSLAStatus(coord1.getId() + "@3", true);
+ checkSLAStatus(coord1.getId() + "@5", true);
+ checkSLAStatus(coord1.getId() + "@4", false);
+
+ actions = "1-3";
+ engine.enableSLAAlert(jobIdsStr, actions, null, null);
+ checkSLAStatus(coord1.getId() + "@1", false);
+ checkSLAStatus(coord1.getId() + "@2", false);
+ checkSLAStatus(coord1.getId() + "@3", false);
+ checkSLAStatus(coord1.getId() + "@5", true);
+ checkSLAStatus(coord1.getId() + "@4", false);
+
+ engine.enableSLAAlert(jobIdsStr, null, null, null);
+ checkSLAStatus(coord1.getId() + "@1", false);
+ checkSLAStatus(coord1.getId() + "@2", false);
+ checkSLAStatus(coord1.getId() + "@3", false);
+ checkSLAStatus(coord1.getId() + "@5", false);
+ checkSLAStatus(coord1.getId() + "@4", false);
+ CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(
+ CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, jobIdsStr);
+ XConfiguration xConf = new XConfiguration(new StringReader(job.getConf()));
+ assertEquals(xConf.get(OozieClient.SLA_DISABLE_ALERT), null);
+
+ }
+
+ private void setupSLAJobs() throws Exception {
+
+ coord1 = addRecordToCoordJobTable(Job.Status.RUNNING, true, false);
+ Date nominalTime1 = DateUtils.parseDateUTC(data);
+ addRecordToCoordActionTable(coord1.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 1,
+ nominalTime1);
+ Date nominalTime2 = org.apache.commons.lang.time.DateUtils.addDays(nominalTime1, 1);
+
+ addRecordToCoordActionTable(coord1.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 1,
+ nominalTime2);
+
+ Date nominalTime3 = org.apache.commons.lang.time.DateUtils.addDays(nominalTime1, 2);
+ addRecordToCoordActionTable(coord1.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 1,
+ nominalTime3);
+
+ Date nominalTime4 = org.apache.commons.lang.time.DateUtils.addDays(nominalTime1, 3);
+ addRecordToCoordActionTable(coord1.getId(), 4, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 1,
+ nominalTime4);
+ Date nominalTime5 = org.apache.commons.lang.time.DateUtils.addDays(nominalTime1, 4);
+ addRecordToCoordActionTable(coord1.getId(), 5, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 1,
+ nominalTime5);
+
+ coord2 = addRecordToCoordJobTable(Job.Status.RUNNING, true, false);
+ addRecordToCoordActionTable(coord2.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0,
+ nominalTime1);
+ addRecordToCoordActionTable(coord2.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0,
+ nominalTime2);
+ addRecordToCoordActionTable(coord2.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0,
+ nominalTime3);
+ addRecordToCoordActionTable(coord2.getId(), 4, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0,
+ nominalTime4);
+
+ bundle = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+ coord1.setBundleId(bundle.getId());
+ coord1.setAppName("coord1");
+ coord1.setStartTime(nominalTime1);
+ coord1.setMatThrottling(12);
+ coord1.setLastActionNumber(5);
+ coord2.setBundleId(bundle.getId());
+ coord2.setAppName("coord2");
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord1);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord2);
+ registerSLABean(coord1.getId(), AppType.COORDINATOR_JOB, null, null);
+ registerSLABean(coord2.getId(), AppType.COORDINATOR_JOB, null, null);
+ registerSLABean(coord1.getId() + "@1", AppType.COORDINATOR_ACTION, coord1.getId(), nominalTime1);
+ registerSLABean(coord1.getId() + "@2", AppType.COORDINATOR_ACTION, coord1.getId(), nominalTime2);
+ registerSLABean(coord1.getId() + "@3", AppType.COORDINATOR_ACTION, coord1.getId(), nominalTime3);
+ registerSLABean(coord1.getId() + "@4", AppType.COORDINATOR_ACTION, coord1.getId(), nominalTime4);
+ registerSLABean(coord1.getId() + "@5", AppType.COORDINATOR_ACTION, coord1.getId(), nominalTime5);
+ registerSLABean(coord2.getId() + "@1", AppType.COORDINATOR_ACTION, coord2.getId(), nominalTime1);
+ registerSLABean(coord2.getId() + "@2", AppType.COORDINATOR_ACTION, coord2.getId(), nominalTime2);
+ registerSLABean(coord2.getId() + "@3", AppType.COORDINATOR_ACTION, coord2.getId(), nominalTime3);
+ registerSLABean(coord2.getId() + "@4", AppType.COORDINATOR_ACTION, coord2.getId(), nominalTime4);
+
+ checkSLAStatus(coord1.getId() + "@1", false);
+ checkSLAStatus(coord1.getId() + "@2", false);
+ checkSLAStatus(coord1.getId() + "@3", false);
+ checkSLAStatus(coord1.getId() + "@5", false);
+ checkSLAStatus(coord1.getId() + "@4", false);
+ }
+
+ private void registerSLABean(String jobId, AppType appType, String parentId, Date nominalTime) throws Exception {
+ SLARegistrationBean slaRegBean = new SLARegistrationBean();
+ slaRegBean.setNominalTime(nominalTime);
+ slaRegBean.setId(jobId);
+ slaRegBean.setAppType(appType);
+ startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back
+ slaRegBean.setExpectedStart(startTime);
+ slaRegBean.setExpectedDuration(3600 * 1000);
+ slaRegBean.setParentId(parentId);
+ slaRegBean.setExpectedEnd(endTime); // 1 hour ahead
+ Services.get().get(SLAService.class).addRegistrationEvent(slaRegBean);
+ }
+
+ private void checkSLAStatus(String id, boolean status) throws JPAExecutorException {
+ assertEquals(getSLACalcStatus(id).getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT), status);
+ }
+
+ private SLACalcStatus getSLACalcStatus(String jobId) throws JPAExecutorException {
+ return Services.get().get(SLAService.class).getSLACalculator().get(jobId);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
index 5ce9a7f..5f72e57 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
@@ -23,6 +23,7 @@ import java.io.FileWriter;
import java.io.Reader;
import java.io.Writer;
import java.net.URI;
+import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -31,13 +32,22 @@ import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.sla.SLACalculator;
+import org.apache.oozie.sla.SLAOperations;
+import org.apache.oozie.sla.service.SLAService;
import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
@@ -51,6 +61,8 @@ public class TestCoordSubmitXCommand extends XDataTestCase {
protected void setUp() throws Exception {
super.setUp();
services = new Services();
+ services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES,
+ EventHandlerService.class.getName() + "," + SLAService.class.getName());
services.init();
}
@@ -1319,4 +1331,170 @@ public class TestCoordSubmitXCommand extends XDataTestCase {
assertEquals(job.getTimeout(), 43200);
}
+ public void testSubmitWithSLAAlertsDisable() throws Exception {
+ Configuration conf = new XConfiguration();
+ File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
+
+ // CASE 1: Failure case i.e. multiple data-in instances
+ Reader reader = IOUtils.getResourceAsReader("coord-action-sla.xml", -1);
+ Writer writer = new FileWriter(appPathFile);
+ IOUtils.copyCharStream(reader, writer);
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
+ conf.set("start", DateUtils.formatDateOozieTZ(new Date()));
+ conf.set("end", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), 1)));
+ conf.set("frequency", "coord:days(1)");
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ reader = IOUtils.getResourceAsReader("wf-credentials.xml", -1);
+ appPathFile = new File(getTestCaseDir(), "workflow.xml");
+ writer = new FileWriter(appPathFile);
+ IOUtils.copyCharStream(reader, writer);
+ conf.set("wfAppPath", appPathFile.getPath());
+ Date nominalTime = new Date();
+ conf.set("nominal_time", DateUtils.formatDateOozieTZ(nominalTime));
+
+ String coordId = new CoordSubmitXCommand(conf).call();
+ new CoordMaterializeTransitionXCommand(coordId, 3600).call();
+ SLAService slaService = services.get(SLAService.class);
+ SLACalculator calc = slaService.getSLACalculator();
+ SLACalcStatus slaCalc = calc.get(coordId + "@" + 1);
+ assertFalse(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)));
+
+ Configuration conf1=new Configuration(conf);
+ // CASE I: "ALL"
+ conf1.set(OozieClient.SLA_DISABLE_ALERT, "ALL");
+ coordId = new CoordSubmitXCommand(conf1).call();
+ new CoordMaterializeTransitionXCommand(coordId, 3600).call();
+
+ slaService = services.get(SLAService.class);
+ calc = slaService.getSLACalculator();
+ slaCalc = calc.get(coordId + "@" + 1);
+ assertTrue(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)));
+
+ // CASE II: Date Range
+ Configuration conf2=new Configuration(conf);
+ Date startRangeDate = new Date(nominalTime.getTime() - 3600 * 1000);
+ conf2.set(OozieClient.SLA_DISABLE_ALERT,
+ DateUtils.formatDateOozieTZ(startRangeDate) + "::" + DateUtils.formatDateOozieTZ(nominalTime));
+ coordId = new CoordSubmitXCommand(conf2).call();
+ new CoordMaterializeTransitionXCommand(coordId, 3600).call();
+
+ slaCalc = calc.get(coordId + "@" + 1);
+ assertTrue(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)));
+
+ // CASE III: Coord name (negative test)
+ Configuration conf3=new Configuration(conf);
+ conf3.set(OozieClient.SLA_DISABLE_ALERT_COORD, "test-coord-sla-x");
+ coordId = new CoordSubmitXCommand(conf3).call();
+ new CoordMaterializeTransitionXCommand(coordId, 3600).call();
+ slaCalc = calc.get(coordId + "@" + 1);
+ assertFalse(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)));
+
+ // CASE IV: Older than n(hours)
+ Date otherNominalTime = new Date(nominalTime.getTime() - 73 * 3600 * 1000);
+ conf = new XConfiguration();
+ appPathFile = new File(getTestCaseDir(), "coordinator.xml");
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
+ conf.set("wfAppPath", appPathFile.getPath());
+ conf.set("start", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), -1)));
+ conf.set("end", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), 1)));
+
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set("nominal_time", DateUtils.formatDateOozieTZ(otherNominalTime));
+ conf.setInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN, 72);
+ coordId = new CoordSubmitXCommand(conf).call();
+ new CoordMaterializeTransitionXCommand(coordId, 3600).call();
+ slaCalc = calc.get(coordId + "@" + 1);
+ assertTrue(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)));
+
+ // catchup mode
+ conf = new XConfiguration();
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
+ conf.set("wfAppPath", appPathFile.getPath());
+ conf.set("start", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), -1)));
+ conf.set("end", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), 1)));
+
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set("nominal_time",
+ DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), -1)));
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set("nominal_time",
+ DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), -1)));
+ coordId = new CoordSubmitXCommand(conf).call();
+ new CoordMaterializeTransitionXCommand(coordId, 3600).call();
+ slaCalc = calc.get(coordId + "@" + 1);
+ assertTrue(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)));
+
+ // normal mode
+ conf = new XConfiguration();
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
+ conf.set("wfAppPath", appPathFile.getPath());
+ conf.set("start", DateUtils.formatDateOozieTZ(new Date()));
+ conf.set("end", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), 1)));
+
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set("nominal_time", DateUtils.formatDateOozieTZ(new Date()));
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set("nominal_time", DateUtils.formatDateOozieTZ(new Date()));
+ coordId = new CoordSubmitXCommand(conf).call();
+ new CoordMaterializeTransitionXCommand(coordId, 3600).call();
+ slaCalc = calc.get(coordId + "@" + 1);
+ assertFalse(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)));
+
+ }
+
+ public void testSLAAlertWithNewlyCreatedActions() throws Exception {
+ Configuration conf = new XConfiguration();
+ File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
+
+ // CASE 1: Failure case i.e. multiple data-in instances
+ Reader reader = IOUtils.getResourceAsReader("coord-action-sla.xml", -1);
+ Writer writer = new FileWriter(appPathFile);
+ IOUtils.copyCharStream(reader, writer);
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
+ conf.set("start", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addDays(new Date(), -1)));
+ conf.set("end", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), 1)));
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ reader = IOUtils.getResourceAsReader("wf-credentials.xml", -1);
+ appPathFile = new File(getTestCaseDir(), "workflow.xml");
+ writer = new FileWriter(appPathFile);
+ IOUtils.copyCharStream(reader, writer);
+ conf.set("wfAppPath", appPathFile.getPath());
+ Date nominalTime = new Date();
+ conf.set("nominal_time", DateUtils.formatDateOozieTZ(nominalTime));
+
+ String coordId = new CoordSubmitXCommand(conf).call();
+ CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(
+ CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId);
+ job.setMatThrottling(1);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job);
+ new CoordMaterializeTransitionXCommand(coordId, 3600).call();
+ SLAService slaService = services.get(SLAService.class);
+ SLACalculator calc = slaService.getSLACalculator();
+ SLACalcStatus slaCalc = calc.get(coordId + "@" + 1);
+ assertFalse(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)));
+ assertEquals(slaCalc.getExpectedDuration(), 1800000);
+ job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId);
+ assertEquals(job.getLastActionNumber(), 1);
+
+ String newParams = RestConstants.SLA_MAX_DURATION + "=${5 * MINUTES}";
+
+ new CoordSLAChangeXCommand(coordId, null, null, JobUtils.parseChangeValue(newParams)).call();
+ new CoordSLAAlertsDisableXCommand(coordId, null, null).call();
+
+ job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId);
+ job.setMatThrottling(2);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job);
+
+ job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId);
+
+ new CoordMaterializeTransitionXCommand(coordId, 3600).call();
+ job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId);
+ slaCalc = calc.get(coordId + "@" + job.getLastActionNumber());
+ assertEquals(slaCalc.getExpectedDuration(), 300000);
+ // newly action should have sla disable after coord disable command on coord job
+ assertTrue(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)));
+ Element eAction = XmlUtils.parseXml(job.getJobXml());
+ Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
+ assertEquals(SLAOperations.getTagElement(eSla, "max-duration"), "${5 * MINUTES}");
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionQueryExecutor.java
new file mode 100644
index 0000000..85ff5d2
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionQueryExecutor.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+
+public class TestCoordActionQueryExecutor extends XDataTestCase {
+
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testGetTerminatedActionForDates() throws Exception {
+ int actionNum = 1;
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0);
+
+ Path appPath = new Path(getFsTestCaseDir(), "coord");
+ String actionXml = getCoordActionXml(appPath, "coord-action-get.xml");
+ String actionNomialTime = getActionNominalTime(actionXml);
+ Date nominalTime = DateUtils.parseDateOozieTZ(actionNomialTime);
+
+ Date d1 = new Date(nominalTime.getTime() - 1000);
+ Date d2 = new Date(nominalTime.getTime() + 1000);
+ _testGetTerminatedActionForDates(job.getId(), d1, d2, 1);
+
+ d1 = new Date(nominalTime.getTime() + 1000);
+ d2 = new Date(nominalTime.getTime() + 2000);
+ _testGetTerminatedActionForDates(job.getId(), d1, d2, 0);
+
+ cleanUpDBTables();
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+ _testGetTerminatedActionForDates(job.getId(), d1, d2, 0);
+ }
+
+ private void _testGetTerminatedActionForDates(String jobId, Date d1, Date d2, int expected) throws Exception {
+ List<CoordinatorActionBean> actionIds = CoordActionQueryExecutor.getInstance().getList(
+ CoordActionQuery.GET_TERMINATED_ACTIONS_FOR_DATES, jobId, d1, d2);
+ assertEquals(expected, actionIds.size());
+ }
+
+ public void testGetTerminatedActionIdsForDates() throws Exception {
+ int actionNum = 1;
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0);
+
+ Path appPath = new Path(getFsTestCaseDir(), "coord");
+ String actionXml = getCoordActionXml(appPath, "coord-action-get.xml");
+ String actionNomialTime = getActionNominalTime(actionXml);
+ Date nominalTime = DateUtils.parseDateOozieTZ(actionNomialTime);
+
+ Date d1 = new Date(nominalTime.getTime() - 1000);
+ Date d2 = new Date(nominalTime.getTime() + 1000);
+ _testGetTerminatedActionIdsForDates(job.getId(), d1, d2, 1);
+
+ d1 = new Date(nominalTime.getTime() + 1000);
+ d2 = new Date(nominalTime.getTime() + 2000);
+ _testGetTerminatedActionIdsForDates(job.getId(), d1, d2, 0);
+
+ cleanUpDBTables();
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+ _testGetTerminatedActionIdsForDates(job.getId(), d1, d2, 0);
+ }
+
+ private void _testGetTerminatedActionIdsForDates(String jobId, Date d1, Date d2, int expected) throws Exception {
+ List<CoordinatorActionBean> actions = CoordActionQueryExecutor.getInstance().getList(
+ CoordActionQuery.GET_TERMINATED_ACTION_IDS_FOR_DATES, jobId, d1, d2);
+ assertEquals(expected, actions.size());
+ }
+
+}