You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/06/10 06:37:23 UTC
svn commit: r1491333 [1/2] - in /oozie/trunk: ./
client/src/main/java/org/apache/oozie/client/event/message/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/command/wf/
core/src/main/java/org/apache/oozie/event/me...
Author: virag
Date: Mon Jun 10 04:37:20 2013
New Revision: 1491333
URL: http://svn.apache.org/r1491333
Log:
OOZIE-1339 Implement SLA Bootstrap Service and fix bugs in SLACalculator (virag)
Added:
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetRecordsOnRestartJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
Modified:
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/SLAMessage.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAEmailEventListener.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
oozie/trunk/core/src/main/resources/oozie-default.xml
oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java
oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
oozie/trunk/release-log.txt
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/SLAMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/SLAMessage.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/SLAMessage.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/SLAMessage.java Mon Jun 10 04:37:20 2013
@@ -68,7 +68,7 @@ public class SLAMessage extends EventMes
public SLAMessage(SLAEvent.EventStatus eventStatus, SLAEvent.SLAStatus slaStatus, AppType appType, String appName,
String user, String jobId, String parentJobId, Date nominalTime, Date expectedStartTime,
- Date actualStartTime, Date expectedEndTime, Date actualEndTime, long expectedDuration,
+ Date actualStartTime, Date expectedEndTime, Date actualEndTime, long expectedDuration, long actualDuration,
String notificationMessage, String upstreamApps) {
super(MessageType.SLA, appType);
@@ -84,8 +84,7 @@ public class SLAMessage extends EventMes
this.expectedEndTime = expectedEndTime;
this.actualEndTime = actualEndTime;
this.expectedDuration = expectedDuration;
- this.actualDuration = (actualEndTime != null && actualStartTime != null) ? ( actualEndTime.getTime()
- - actualStartTime.getTime()) / (1000 * 60) : -1;
+ this.actualDuration = actualDuration;
this.notificationMessage = notificationMessage;
this.upstreamApps = upstreamApps;
}
@@ -183,7 +182,7 @@ public class SLAMessage extends EventMes
/**
* Get expected end time
*
- * @return
+ * @return expectedEndTime
*/
public Date getExpectedEndTime() {
return expectedEndTime;
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java Mon Jun 10 04:37:20 2013
@@ -279,7 +279,7 @@ public class CoordActionMaterializeComma
}
// insert into new sla reg table too
SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
- AppType.COORDINATOR_ACTION, user, appName, log);
+ AppType.COORDINATOR_ACTION, user, appName, log, false);
}
/**
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Mon Jun 10 04:37:20 2013
@@ -390,7 +390,7 @@ public class CoordMaterializeTransitionX
}
// inserting into new table also
SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
- AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG);
+ AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false);
}
private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Mon Jun 10 04:37:20 2013
@@ -49,6 +49,8 @@ import org.apache.oozie.executor.jpa.JPA
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLAOperations;
+import org.apache.oozie.sla.service.SLAService;
import org.apache.oozie.util.InstrumentUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.ParamChecker;
@@ -333,7 +335,9 @@ public class CoordRerunXCommand extends
refreshAction(coordJob, coordAction);
}
updateAction(coordJob, coordAction, actionXml);
-
+ if (SLAService.isEnabled()) {
+ SLAOperations.updateRegistrationEvent(coordAction.getId());
+ }
queue(new CoordActionNotificationXCommand(coordAction), 100);
queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java Mon Jun 10 04:37:20 2013
@@ -32,9 +32,11 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.oozie.AppType;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
@@ -50,9 +52,14 @@ import org.apache.oozie.service.HadoopAc
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.WorkflowStoreService;
+import org.apache.oozie.sla.SLAOperations;
+import org.apache.oozie.sla.service.SLAService;
import org.apache.oozie.util.ConfigUtils;
+import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.ELUtils;
import org.apache.oozie.util.InstrumentUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.ParamChecker;
@@ -65,6 +72,8 @@ import org.apache.oozie.workflow.Workflo
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.WorkflowLib;
import org.apache.oozie.workflow.lite.NodeHandler;
+import org.jdom.Element;
+import org.jdom.JDOMException;
/**
* This is a RerunXCommand which is used for rerunn.
@@ -156,6 +165,20 @@ public class ReRunXCommand extends Workf
catch (WorkflowException e) {
throw new CommandException(e);
}
+
+ if (SLAService.isEnabled()) {
+ Element wfElem = XmlUtils.parseXml(app.getDefinition());
+ ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
+ Element eSla = XmlUtils.getSLAElement(wfElem);
+ String jobSlaXml = null;
+ if (eSla != null) {
+ jobSlaXml = SubmitXCommand.resolveSla(eSla, evalSla);
+ }
+ String appName = ELUtils.resolveAppName(app.getName(), conf);
+ writeSLARegistration(wfElem, jobSlaXml, newWfInstance.getId(),
+ conf.get(SubWorkflowActionExecutor.PARENT_ID), conf.get(OozieClient.USER_NAME), appName,
+ evalSla);
+ }
wfBean.setAppName(app.getName());
wfBean.setProtoActionConf(protoActionConf.toXmlString());
}
@@ -171,6 +194,9 @@ public class ReRunXCommand extends Workf
catch (URISyntaxException ex) {
throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
}
+ catch (Exception ex) {
+ throw new CommandException(ErrorCode.E1007, ex.getMessage(), ex);
+ }
for (int i = 0; i < actions.size(); i++) {
if (!nodesToSkip.contains(actions.get(i).getName())) {
@@ -207,6 +233,33 @@ public class ReRunXCommand extends Workf
return null;
}
+
+ @SuppressWarnings("unchecked")
+ private void writeSLARegistration(Element wfElem, String jobSlaXml, String id, String parentId, String user,
+ String appName, ELEvaluator evalSla) throws JDOMException, CommandException {
+ if (jobSlaXml != null && jobSlaXml.length() > 0) {
+ Element eSla = XmlUtils.parseXml(jobSlaXml);
+ // insert into new table
+ SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, LOG,
+ true);
+ }
+ // Add sla for wf actions
+ for (Element action : (List<Element>) wfElem.getChildren("action", wfElem.getNamespace())) {
+ Element actionSla = XmlUtils.getSLAElement(action);
+ if (actionSla != null) {
+ String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
+ actionSla = XmlUtils.parseXml(actionSlaXml);
+ if (!nodesToSkip.contains(action.getAttributeValue("name"))) {
+ String actionId = Services.get().get(UUIDService.class)
+ .generateChildId(jobId, action.getAttributeValue("name") + "");
+ SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, user,
+ appName, LOG, true);
+ }
+ }
+ }
+
+ }
+
/**
* Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the
* skipped node list
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java Mon Jun 10 04:37:20 2013
@@ -271,7 +271,7 @@ public class SubmitXCommand extends Work
}
// insert into new table
SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName,
- log);
+ log, false);
}
// Add sla for wf actions
for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
@@ -282,7 +282,7 @@ public class SubmitXCommand extends Work
String actionId = Services.get().get(UUIDService.class)
.generateChildId(jobId, action.getAttributeValue("name") + "");
SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION,
- user, appName, log);
+ user, appName, log, false);
}
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java Mon Jun 10 04:37:20 2013
@@ -99,7 +99,7 @@ public class MessageFactory {
SLAMessage slaMessage = new SLAMessage(event.getEventStatus(), event.getSLAStatus(), event.getAppType(),
event.getAppName(), event.getUser(), event.getId(), event.getParentId(), event.getNominalTime(),
event.getExpectedStart(), event.getActualStart(), event.getExpectedEnd(), event.getActualEnd(),
- event.getExpectedDuration(), event.getNotificationMsg(), event.getUpstreamApps());
+ event.getExpectedDuration(), event.getActualDuration(), event.getNotificationMsg(), event.getUpstreamApps());
return slaMessage;
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -17,17 +17,13 @@
*/
package org.apache.oozie.executor.jpa.sla;
-import java.sql.Timestamp;
-
import javax.persistence.EntityManager;
import javax.persistence.Query;
-import org.apache.oozie.AppType;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.executor.jpa.JPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.sla.SLARegistrationBean;
-import org.apache.oozie.util.DateUtils;
/**
* Load the list of SLARegistrationBean and return the list.
@@ -50,57 +46,13 @@ public class SLARegistrationGetJPAExecut
try {
Query q = em.createNamedQuery("GET_SLA_REG_ALL");
q.setParameter("id", id);
- Object[] obj = (Object[]) q.getSingleResult();
- return getBeanFromObj(obj);
+ SLARegistrationBean slaRegBean= (SLARegistrationBean) q.getSingleResult();
+ slaRegBean.setSlaConfig(slaRegBean.getSlaConfig());
+ return slaRegBean;
}
catch (Exception e) {
throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
}
}
- private SLARegistrationBean getBeanFromObj(Object[] arr) {
- SLARegistrationBean bean = new SLARegistrationBean();
- if (arr[0] != null) {
- bean.setJobId((String) arr[0]);
- }
- if (arr[1] != null) {
- bean.setAppType(AppType.valueOf((String) arr[1]));
- }
- if (arr[2] != null) {
- bean.setAppName((String) arr[2]);
- }
- if (arr[3] != null) {
- bean.setUser((String) arr[3]);
- }
- if (arr[4] != null) {
- bean.setNominalTime(DateUtils.toDate((Timestamp) arr[4]));
- }
- if (arr[5] != null) {
- bean.setExpectedStart(DateUtils.toDate((Timestamp) arr[5]));
- }
- if (arr[6] != null) {
- bean.setExpectedEnd(DateUtils.toDate((Timestamp) arr[6]));
- }
- if (arr[7] != null) {
- bean.setExpectedDuration((Long) arr[7]);
- }
- if (arr[8] != null) {
- bean.setJobData((String) arr[8]);
- }
- if (arr[9] != null) {
- bean.setParentId((String) arr[9]);
- }
- if (arr[10] != null) {
- bean.setNotificationMsg((String) arr[10]);
- }
- if (arr[11] != null) {
- bean.setUpstreamApps((String) arr[11]);
- }
- if (arr[12] != null) {
- bean.setSlaConfig((String) arr[12]);
- }
-
- return bean;
- }
-
}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa.sla;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.executor.jpa.JPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.sla.SLARegistrationBean;
+
+/**
+ * Load SLARegistrationBean on restart
+ */
+public class SLARegistrationGetOnRestartJPAExecutor implements JPAExecutor<SLARegistrationBean> {
+
+ private String id;
+
+ public SLARegistrationGetOnRestartJPAExecutor(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getName() {
+ return "SLARegistrationGetOnRestartJPAExecutor";
+ }
+
+ @Override
+ public SLARegistrationBean execute(EntityManager em) throws JPAExecutorException {
+ try {
+ Query q = em.createNamedQuery("GET_SLA_REG_ON_RESTART");
+ q.setParameter("id", id);
+ Object[] obj = (Object[]) q.getSingleResult();
+ return getBeanFromObj(obj);
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ }
+
+ private SLARegistrationBean getBeanFromObj(Object[] arr) {
+ SLARegistrationBean bean = new SLARegistrationBean();
+ if (arr[0] != null) {
+ bean.setNotificationMsg((String) arr[0]);
+ }
+ if (arr[1] != null) {
+ bean.setUpstreamApps((String) arr[1]);
+ }
+ if (arr[2] != null) {
+ bean.setSlaConfig((String) arr[2]);
+ }
+ if (arr[3] != null) {
+ bean.setJobData((String) arr[3]);
+ }
+ return bean;
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetRecordsOnRestartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetRecordsOnRestartJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetRecordsOnRestartJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetRecordsOnRestartJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa.sla;
+
+import java.sql.Timestamp;
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.executor.jpa.JPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.sla.SLASummaryBean;
+
+/**
+ * Load the list of SLASummaryBean when Oozie restarts and return the list.
+ */
+public class SLASummaryGetRecordsOnRestartJPAExecutor implements JPAExecutor<List<SLASummaryBean>> {
+
+ private int days;
+ public SLASummaryGetRecordsOnRestartJPAExecutor (int days) {
+ this.days = days;
+ }
+
+ @Override
+ public String getName() {
+ return "SLASummaryGetRecordsOnRestartJPAExecutor";
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<SLASummaryBean> execute(EntityManager em) throws JPAExecutorException {
+ List<SLASummaryBean> ssBean;
+ try {
+ Query q = em.createNamedQuery("GET_SLA_SUMMARY_RECORDS_RESTART");
+ Timestamp ts = new Timestamp(System.currentTimeMillis() - days * 24 * 60 * 60 * 1000);
+ q.setParameter("lastModifiedTime", ts);
+ ssBean = q.getResultList();
+ return ssBean;
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa.sla;
+
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.executor.jpa.JPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.sla.SLASummaryBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Update the slaStatus, eventStatus, eventProcessed, jobStatus, actualStart,
+ * actualEnd, actualDuration and lastModifiedTime of SLAsummaryBean
+ */
+public class SLASummaryUpdateForSLAStatusActualTimesJPAExecutor implements JPAExecutor<Void> {
+
+ private SLASummaryBean slaSummaryBean = null;
+
+ public SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(SLASummaryBean slaSummaryBean) {
+ ParamChecker.notNull(slaSummaryBean, "slaSummaryBean");
+ this.slaSummaryBean = slaSummaryBean;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+ * EntityManager)
+ */
+ @Override
+ public Void execute(EntityManager em) throws JPAExecutorException {
+ try {
+ Query q = em.createNamedQuery("UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES");
+ q.setParameter("jobId", slaSummaryBean.getJobId());
+ q.setParameter("slaStatus", slaSummaryBean.getSLAStatusString());
+ q.setParameter("lastModifiedTS", new Date());
+ q.setParameter("eventStatus", slaSummaryBean.getEventStatusString());
+ q.setParameter("jobStatus", slaSummaryBean.getJobStatus());
+ q.setParameter("eventProcessed", slaSummaryBean.getEventProcessed());
+ q.setParameter("actualStartTS", slaSummaryBean.getActualStart());
+ q.setParameter("actualEndTS", slaSummaryBean.getActualEnd());
+ q.setParameter("actualDuration", slaSummaryBean.getActualDuration());
+ q.executeUpdate();
+ return null;
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+ */
+ @Override
+ public String getName() {
+ return "SLASummaryUpdateForSLAStatusActualTimesJPAExecutor";
+ }
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa.sla;
+
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.executor.jpa.JPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.sla.SLASummaryBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Update the slaStatus, eventStatus, eventProcessed and lastModifiedTime of SLAsummaryBean
+ */
+public class SLASummaryUpdateForSLAStatusJPAExecutor implements JPAExecutor<Void> {
+
+ private SLASummaryBean slaSummaryBean = null;
+
+ public SLASummaryUpdateForSLAStatusJPAExecutor(SLASummaryBean slaSummaryBean) {
+ ParamChecker.notNull(slaSummaryBean, "slaSummaryBean");
+ this.slaSummaryBean = slaSummaryBean;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+ * EntityManager)
+ */
+ @Override
+ public Void execute(EntityManager em) throws JPAExecutorException {
+ try {
+ Query q = em.createNamedQuery("UPDATE_SLA_SUMMARY_FOR_SLA_STATUS");
+
+ q.setParameter("jobId", slaSummaryBean.getJobId());
+ q.setParameter("slaStatus", slaSummaryBean.getSLAStatusString());
+ q.setParameter("lastModifiedTS", new Date());
+ q.setParameter("eventStatus", slaSummaryBean.getEventStatusString());
+ q.setParameter("eventProcessed", slaSummaryBean.getEventProcessed());
+ q.executeUpdate();
+ return null;
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+ */
+ @Override
+ public String getName() {
+ return "SLASummaryUpdateForSLAStatusJPAExecutor";
+ }
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java Mon Jun 10 04:37:20 2013
@@ -34,9 +34,8 @@ public class SLACalcStatus extends SLAEv
private EventStatus eventStatus;
private Date actualStart;
private Date actualEnd;
- private long actualDuration;
+ private long actualDuration = -1;
private Date lastModifiedTime;
- private byte slaProcessed;
private byte eventProcessed;
public SLACalcStatus(SLARegistrationBean reg) {
@@ -44,9 +43,16 @@ public class SLACalcStatus extends SLAEv
setSLARegistrationBean(reg);
}
- public SLACalcStatus(SLASummaryBean summary) {
+ public SLACalcStatus(SLASummaryBean summary, SLARegistrationBean regBean) {
this();
SLARegistrationBean reg = new SLARegistrationBean();
+ reg.setNotificationMsg(regBean.getNotificationMsg());
+ reg.setUpstreamApps(regBean.getUpstreamApps());
+ reg.setAlertContact(regBean.getAlertContact());
+ reg.setAlertEvents(regBean.getAlertEvents());
+ reg.setJobData(regBean.getJobData());
+ reg.setJobId(summary.getJobId());
+ reg.setAppType(summary.getAppType());
reg.setUser(summary.getUser());
reg.setAppName(summary.getAppName());
reg.setParentId(summary.getParentId());
@@ -54,17 +60,15 @@ public class SLACalcStatus extends SLAEv
reg.setExpectedStart(summary.getExpectedStart());
reg.setExpectedEnd(summary.getExpectedEnd());
reg.setExpectedDuration(summary.getExpectedDuration());
+ setSLARegistrationBean(reg);
setActualStart(summary.getActualStart());
setActualEnd(summary.getActualEnd());
setActualDuration(summary.getActualDuration());
setSLAStatus(summary.getSLAStatus());
- setSLARegistrationBean(reg);
- setJobId(reg.getId());
setJobStatus(summary.getJobStatus());
setEventStatus(summary.getEventStatus());
setLastModifiedTime(summary.getLastModifiedTime());
- setSlaProcessed(summary.getSlaProcessed());
- setEventProcessed((byte) summary.getEventStatus().ordinal());
+ setEventProcessed(summary.getEventProcessed());
}
/**
@@ -81,15 +85,11 @@ public class SLACalcStatus extends SLAEv
setActualEnd(a.getActualEnd());
setActualDuration(a.getActualDuration());
setLastModifiedTime(a.getLastModifiedTime());
- setSlaProcessed(a.getSlaProcessed());
setEventProcessed(a.getEventProcessed());
}
public SLACalcStatus() {
setMsgType(MessageType.SLA);
- setSlaProcessed((byte) 0);
- setEventProcessed((byte) 0);
- setActualDuration(-1);
setLastModifiedTime(new Date());
}
@@ -173,28 +173,13 @@ public class SLACalcStatus extends SLAEv
}
/**
- * Get whether sla has been processed
- * and actual times stored for this job
- *
- * @return 0 = sla not processed
- * 1 = sla processed but actual times not stored
- * 2 = sla processed as well as actual times stored
- */
- public byte getSlaProcessed() {
- return slaProcessed;
- }
-
- public void setSlaProcessed(int slaProcessed) {
- this.slaProcessed = (byte) slaProcessed;
- }
-
- /**
* Get which type of sla event has been processed needed when calculator
* periodically loops to update all jobs' sla
*
* @return byte 1st bit set (from LSB) = start processed
* 2nd bit set = duration processed
* 3rd bit set = end processed
+ * only 4th bit set = everything processed
*/
public byte getEventProcessed() {
return eventProcessed;
@@ -279,4 +264,9 @@ public class SLACalcStatus extends SLAEv
return regBean.getMsgType();
}
+ @Override
+ public String toString() {
+ return "ID: " + getId() + " SLAStatus: " + slaStatus + " EventProcessed: "+eventProcessed;
+ }
+
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculator.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculator.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculator.java Mon Jun 10 04:37:20 2013
@@ -48,4 +48,6 @@ public interface SLACalculator {
SLACalcStatus get(String jobId);
+ boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException;
+
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java Mon Jun 10 04:37:20 2013
@@ -36,7 +36,11 @@ import org.apache.oozie.client.event.SLA
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusActualTimesJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.ServiceException;
@@ -53,16 +57,48 @@ public class SLACalculatorMemory impleme
private static Map<String, SLACalcStatus> slaMap;
private static Set<String> historySet;
private static int capacity;
- private static JPAService jpa;
+ private static JPAService jpaService;
private EventHandlerService eventHandler;
+ private static int modifiedAfter;
@Override
public void init(Configuration conf) throws ServiceException {
capacity = conf.getInt(SLAService.CONF_CAPACITY, 5000);
slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
historySet = Collections.synchronizedSet(new HashSet<String>());
- jpa = Services.get().get(JPAService.class);
+ jpaService = Services.get().get(JPAService.class);
eventHandler = Services.get().get(EventHandlerService.class);
+ // load events modified after
+ modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
+ loadOnRestart();
+
+ }
+
+ private void loadOnRestart() {
+ try {
+ List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
+ modifiedAfter));
+ for (SLASummaryBean summaryBean : summaryBeans) {
+ String jobId = summaryBean.getJobId();
+ if (summaryBean.getEventProcessed() == 7) {
+ historySet.add(jobId);
+ }
+ else {
+ try {
+ SLARegistrationBean slaRegBean = jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(
+ jobId));
+ SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
+ slaMap.put(jobId, slaCalcStatus);
+ }
+ catch (JPAExecutorException e) {
+ XLog.getLog(SLAService.class).warn("Cannot retrieve registration record for " + jobId, e);
+ }
+ }
+ }
+ }
+ catch (JPAExecutorException e) {
+ XLog.getLog(SLAService.class).warn("Failed to retrieve SLASummary records on restart", e);
+ }
}
@Override
@@ -101,7 +137,6 @@ public class SLACalculatorMemory impleme
@Override
public void updateSlaStatus(String jobId) throws JPAExecutorException, ServiceException {
SLACalcStatus slaCalc = slaMap.get(jobId);
- List<JsonBean> updateList = new ArrayList<JsonBean>();
synchronized (slaCalc) {
boolean change = false;
byte eventProc = slaCalc.getEventProcessed();
@@ -119,10 +154,11 @@ public class SLACalculatorMemory impleme
}
else {
eventProc++; //disable further processing for optional start sla condition
+ change = true;
}
+
}
- byte eventProcCopy = eventProc;
- if (((eventProcCopy >> 1) & 1) == 0) { // check if second bit (duration-processed) is unset
+ if (((eventProc >> 1) & 1) == 0) { // check if second bit (duration-processed) is unset
if (slaCalc.getActualStart() != null) {
if (reg.getExpectedDuration() < Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()
- slaCalc.getActualStart().getTime()) {
@@ -133,12 +169,11 @@ public class SLACalculatorMemory impleme
}
}
}
- if (eventProc < 5) {
+ if (eventProc < 4) {
if (reg.getExpectedEnd().getTime() < Calendar.getInstance(TimeZone.getTimeZone("UTC"))
.getTimeInMillis()) {
slaCalc.setEventStatus(EventStatus.END_MISS);
slaCalc.setSLAStatus(SLAStatus.MISS);
- slaCalc.setSlaProcessed(1);
change = true;
eventHandler.queueEvent(new SLACalcStatus(slaCalc));
eventProc += 4;
@@ -146,14 +181,18 @@ public class SLACalculatorMemory impleme
}
if (change) {
slaCalc.setEventProcessed(eventProc);
- slaCalc.setLastModifiedTime(new Date());
- updateList.add(new SLASummaryBean(slaCalc));
- jpa.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
- if (slaCalc.getEventProcessed() == 7 && slaCalc.getSlaProcessed() != 2) {
+ SLASummaryBean slaSummaryBean = new SLASummaryBean();
+ slaSummaryBean.setJobId(slaCalc.getId());
+ slaSummaryBean.setEventProcessed(eventProc);
+ slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+ slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+ jpaService.execute(new SLASummaryUpdateForSLAStatusJPAExecutor(slaSummaryBean));
+ if (eventProc == 7) {
historySet.add(jobId);
slaMap.remove(jobId);
XLog.getLog(SLAService.class).trace("Removed Job [{0}] from map after End-processed", jobId);
}
+
}
}
}
@@ -181,22 +220,51 @@ public class SLACalculatorMemory impleme
*/
@Override
public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
- List<JsonBean> insertList = new ArrayList<JsonBean>();
try {
if (slaMap.size() < capacity) {
SLACalcStatus slaCalc = new SLACalcStatus(reg);
slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
slaMap.put(jobId, slaCalc);
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
insertList.add(reg);
insertList.add(new SLASummaryBean(slaCalc));
- jpa.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
XLog.getLog(SLAService.class).trace("SLA Registration Event - Job:" + jobId);
return true;
}
else {
- XLog.getLog(SLAService.class).error(
- "SLACalculator memory capacity reached. Cannot add new SLA Registration entry for job [{0}]",
- reg.getId());
+ XLog.getLog(SLAService.class)
+ .error("SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
+ reg.getId());
+ }
+ }
+ catch (JPAExecutorException jpa) {
+ throw jpa;
+ }
+ return false;
+ }
+
+ /**
+ * Update job into the map for SLA tracking
+ */
+ @Override
+ public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
+ try {
+ if (slaMap.size() < capacity) {
+ SLACalcStatus slaCalc = new SLACalcStatus(reg);
+ slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
+ slaMap.put(jobId, slaCalc);
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ updateList.add(reg);
+ updateList.add(new SLASummaryBean(slaCalc));
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
+ XLog.getLog(SLAService.class).trace("SLA Registration Event - Job:" + jobId);
+ return true;
+ }
+ else {
+ XLog.getLog(SLAService.class)
+ .error("SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
+ reg.getId());
}
}
catch (JPAExecutorException jpa) {
@@ -213,62 +281,50 @@ public class SLACalculatorMemory impleme
public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
Date endTime) throws JPAExecutorException, ServiceException {
SLACalcStatus slaCalc = slaMap.get(jobId);
- List<JsonBean> updateList = new ArrayList<JsonBean>();
SLASummaryBean slaInfo = null;
boolean hasSla = false;
if (slaCalc != null) {
synchronized (slaCalc) {
- byte eventProc = slaCalc.getEventProcessed();
slaCalc.setJobStatus(jobStatus);
switch (jobEventStatus) {
case STARTED:
slaInfo = processJobStartSLA(slaCalc, startTime);
- eventProc += 1;
break;
case SUCCESS:
slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
- slaInfo.setSlaProcessed(2);
- eventProc += 6;
break;
case FAILURE:
slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
- slaInfo.setSlaProcessed(2);
- eventProc += 6;
break;
default:
XLog.getLog(SLAService.class).debug("Unknown Job Status [{0}]", jobEventStatus);
return false;
}
- slaCalc.setEventProcessed(eventProc);
- slaInfo.setLastModifiedTime(new Date());
- if (slaInfo.getSlaProcessed() == 2) {
+
+ if (slaCalc.getEventProcessed() == 7) {
+ slaInfo.setEventProcessed(8);
slaMap.remove(jobId);
}
hasSla = true;
}
+ XLog.getLog(SLAService.class).trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
}
else if (historySet.contains(jobId)) {
- slaInfo = jpa.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaInfo = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
slaInfo.setJobStatus(jobStatus);
slaInfo.setActualStart(startTime);
slaInfo.setActualEnd(endTime);
if (endTime != null) {
slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
}
- slaInfo.setSlaProcessed(2);
+ slaInfo.setEventProcessed(8);
historySet.remove(jobId);
hasSla = true;
}
+
if (hasSla) {
- slaInfo.setLastModifiedTime(new Date());
- updateList.add(slaInfo);
- if (jpa != null) {
- jpa.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
- }
- XLog.getLog(SLAService.class)
- .trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
+ jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaInfo));
}
-
return hasSla;
}
@@ -281,20 +337,27 @@ public class SLACalculatorMemory impleme
*/
public SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) {
slaCalc.setActualStart(actualStart);
- slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
+ if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
+ slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
+ }
SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
Date expecStart = reg.getExpectedStart();
byte eventProc = slaCalc.getEventProcessed();
- if (((eventProc & 1) == 0) && expecStart != null) {
- if (actualStart.getTime() > expecStart.getTime()) {
- slaCalc.setEventStatus(EventStatus.START_MISS);
- }
- else {
- slaCalc.setEventStatus(EventStatus.START_MET);
+ // set event proc here
+ if (((eventProc & 1) == 0)) {
+ if (expecStart != null) {
+ if (actualStart.getTime() > expecStart.getTime()) {
+ slaCalc.setEventStatus(EventStatus.START_MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.START_MET);
+ }
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ eventProc += 1;
+ slaCalc.setEventProcessed(eventProc);
}
- return new SLASummaryBean(slaCalc);
+ return getSLASummaryBean(slaCalc);
}
/**
@@ -313,25 +376,35 @@ public class SLACalculatorMemory impleme
long expectedDuration = reg.getExpectedDuration();
long actualDuration = actualEnd.getTime() - actualStart.getTime();
slaCalc.setActualDuration(actualDuration);
- if (actualDuration > expectedDuration) {
- slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- }
- else {
- slaCalc.setEventStatus(EventStatus.DURATION_MET);
+ //check event proc
+ byte eventProc = slaCalc.getEventProcessed();
+ if (((eventProc >> 1) & 1) == 0) {
+ if (actualDuration > expectedDuration) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.DURATION_MET);
+ }
+ eventProc += 2;
+ slaCalc.setEventProcessed(eventProc);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- Date expectedEnd = reg.getExpectedEnd();
- if (actualEnd.getTime() > expectedEnd.getTime()) {
- slaCalc.setEventStatus(EventStatus.END_MISS);
- slaCalc.setSLAStatus(SLAStatus.MISS);
- }
- else {
- slaCalc.setEventStatus(EventStatus.END_MET);
- slaCalc.setSLAStatus(SLAStatus.MET);
+ if (eventProc < 4) {
+ Date expectedEnd = reg.getExpectedEnd();
+ if (actualEnd.getTime() > expectedEnd.getTime()) {
+ slaCalc.setEventStatus(EventStatus.END_MISS);
+ slaCalc.setSLAStatus(SLAStatus.MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.END_MET);
+ slaCalc.setSLAStatus(SLAStatus.MET);
+ }
+ eventProc += 4;
+ slaCalc.setEventProcessed(eventProc);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- return new SLASummaryBean(slaCalc);
+ return getSLASummaryBean(slaCalc);
}
/**
@@ -344,25 +417,46 @@ public class SLACalculatorMemory impleme
* @return SLASummaryBean
*/
public SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) {
- SLASummaryBean summ;
slaCalc.setActualStart(actualStart);
slaCalc.setActualEnd(actualEnd);
SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
long expectedDuration = reg.getExpectedDuration();
long actualDuration = actualEnd.getTime() - actualStart.getTime();
slaCalc.setActualDuration(actualDuration);
- if (actualDuration > expectedDuration) {
- slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+
+ byte eventProc = slaCalc.getEventProcessed();
+ if (((eventProc >> 1) & 1) == 0) {
+ if (actualDuration > expectedDuration) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.DURATION_MET);
+ }
+ eventProc += 2;
+ slaCalc.setEventProcessed(eventProc);
eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
- else {
- slaCalc.setEventStatus(EventStatus.DURATION_MET);
+ if (eventProc < 4) {
+ slaCalc.setEventStatus(EventStatus.END_MISS);
+ slaCalc.setSLAStatus(SLAStatus.MISS);
+ eventProc += 4;
+ slaCalc.setEventProcessed(eventProc);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
- slaCalc.setEventStatus(EventStatus.END_MISS);
- slaCalc.setSLAStatus(SLAStatus.MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- summ = new SLASummaryBean(slaCalc);
- return summ;
+ return getSLASummaryBean(slaCalc);
+ }
+
+ private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) {
+ SLASummaryBean slaSummaryBean = new SLASummaryBean();
+ slaSummaryBean.setActualStart(slaCalc.getActualStart());
+ slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+ slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+ slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+ slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+ slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed());
+ slaSummaryBean.setJobId(slaCalc.getId());
+ slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+ return slaSummaryBean;
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLAOperations.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLAOperations.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLAOperations.java Mon Jun 10 04:37:20 2013
@@ -24,6 +24,9 @@ import org.apache.oozie.AppType;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.event.SLAEvent.EventStatus;
import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.SLARegistrationBean;
@@ -41,7 +44,7 @@ public class SLAOperations {
private static final String ALERT_EVENTS = "alert-events";
public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, String parentId,
- AppType appType, String user, String appName, XLog log) throws CommandException {
+ AppType appType, String user, String appName, XLog log, boolean rerun) throws CommandException {
if (eSla == null || !SLAService.isEnabled()) {
log.debug("Not registering SLA for job [{0}]. Sla-Xml null OR SLAService not enabled", jobId);
return null;
@@ -134,23 +137,46 @@ public class SLAOperations {
sla.setUser(user);
sla.setParentId(parentId);
- SLAService slaService = Services.get().get(SLAService.class);
- try {
+ SLAService slaService = Services.get().get(SLAService.class);
+ try {
+ if (!rerun) {
slaService.addRegistrationEvent(sla);
}
- catch (ServiceException e) {
- throw new CommandException(ErrorCode.E1007, " id " + jobId, e.getMessage(), e);
+ else {
+ slaService.updateRegistrationEvent(sla);
}
+ }
+ catch (ServiceException e) {
+ throw new CommandException(ErrorCode.E1007, " id " + jobId, e.getMessage(), e);
+ }
return sla;
}
+ /**
+ * Retrieve registration event
+ * @param jobId the jobId
+ * @throws CommandException
+ * @throws JPAExecutorException
+ */
+ public static void updateRegistrationEvent(String jobId) throws CommandException, JPAExecutorException {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ SLAService slaService = Services.get().get(SLAService.class);
+ try {
+ slaService.updateRegistrationEvent(jpaService.execute(new SLARegistrationGetJPAExecutor(jobId)));
+ }
+ catch (ServiceException e) {
+ throw new CommandException(ErrorCode.E1007, " id " + jobId, e.getMessage(), e);
+ }
+
+ }
+
/*
* parentId null
*/
public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, AppType appType,
String user, String appName, XLog log) throws CommandException {
- return createSlaRegistrationEvent(eSla, jobId, null, appType, user, appName, log);
+ return createSlaRegistrationEvent(eSla, jobId, null, appType, user, appName, log, false);
}
/*
@@ -158,7 +184,7 @@ public class SLAOperations {
*/
public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, String parentId,
AppType appType, String user, XLog log) throws CommandException {
- return createSlaRegistrationEvent(eSla, jobId, parentId, appType, user, null, log);
+ return createSlaRegistrationEvent(eSla, jobId, parentId, appType, user, null, log, false);
}
/*
@@ -166,7 +192,7 @@ public class SLAOperations {
*/
public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, AppType appType,
String user, XLog log) throws CommandException {
- return createSlaRegistrationEvent(eSla, jobId, null, appType, user, null, log);
+ return createSlaRegistrationEvent(eSla, jobId, null, appType, user, null, log, false);
}
private static String getTagElement(Element elem, String tagName) {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java Mon Jun 10 04:37:20 2013
@@ -42,9 +42,10 @@ import org.json.simple.JSONObject;
@Entity
@NamedQueries({
- @NamedQuery(name = "GET_SLA_REG_ALL", query = "select w.jobId, w.appType, w.appName, w.user, w.nominalTimeTS," +
- "w.expectedStartTS, w.expectedEndTS, w.expectedDuration, w.jobData, w.parentId, w.notificationMsg," +
- "w.upstreamApps, w.slaConfig from SLARegistrationBean w where w.jobId = :id") })
+
+ @NamedQuery(name = "GET_SLA_REG_ON_RESTART", query = "select w.notificationMsg, w.upstreamApps, w.slaConfig, w.jobData from SLARegistrationBean w where w.jobId = :id"),
+
+ @NamedQuery(name = "GET_SLA_REG_ALL", query = "select OBJECT(w) from SLARegistrationBean w where w.jobId = :id") })
public class SLARegistrationBean extends JsonSLARegistrationEvent implements Writable {
@Basic
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java Mon Jun 10 04:37:20 2013
@@ -42,7 +42,15 @@ import org.json.simple.JSONObject;
@Entity
@Table(name = "SLA_SUMMARY")
@NamedQueries({
- @NamedQuery(name = "GET_SLA_SUMMARY", query = "select OBJECT(w) from SLASummaryBean w where w.jobId = :id") })
+
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_SLA_STATUS", query = "update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"),
+
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES", query = "update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.jobStatus = :jobStatus, w.lastModifiedTS = :lastModifiedTS, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration where w.jobId = :jobId"),
+
+ @NamedQuery(name = "GET_SLA_SUMMARY", query = "select OBJECT(w) from SLASummaryBean w where w.jobId = :id"),
+
+ @NamedQuery(name = "GET_SLA_SUMMARY_RECORDS_RESTART", query = "select OBJECT(w) from SLASummaryBean w where w.eventProcessed <= 7 AND w.lastModifiedTS >= :lastModifiedTime") })
+
/**
* Class to store all the SLA related details (summary) per job
*/
@@ -114,8 +122,8 @@ public class SLASummaryBean implements J
@Basic
@Index
- @Column(name = "sla_processed")
- private byte slaProcessed = 0;
+ @Column(name = "event_processed")
+ private byte eventProcessed = 0;
@Basic
@Index
@@ -129,16 +137,21 @@ public class SLASummaryBean implements J
SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
setJobId(slaCalc.getId());
setAppName(reg.getAppName());
+ setAppType(reg.getAppType());
+ setNominalTime(reg.getNominalTime());
setExpectedStart(reg.getExpectedStart());
setExpectedEnd(reg.getExpectedEnd());
setExpectedDuration(reg.getExpectedDuration());
setJobStatus(slaCalc.getJobStatus());
setSLAStatus(slaCalc.getSLAStatus());
setEventStatus(slaCalc.getEventStatus());
- setSlaProcessed(slaCalc.getSlaProcessed());
setLastModifiedTime(slaCalc.getLastModifiedTime());
setUser(reg.getUser());
setParentId(reg.getParentId());
+ setEventProcessed(slaCalc.getEventProcessed());
+ setActualDuration(slaCalc.getActualDuration());
+ setActualEnd(slaCalc.getActualEnd());
+ setActualStart(slaCalc.getActualStart());
}
public String getJobId() {
@@ -234,6 +247,14 @@ public class SLASummaryBean implements J
return (slaStatus != null ? SLAEvent.SLAStatus.valueOf(slaStatus) : null);
}
+ public String getSLAStatusString() {
+ return slaStatus;
+ }
+
+ public String getEventStatusString() {
+ return eventStatus;
+ }
+
public void setSLAStatus(SLAEvent.SLAStatus stage) {
this.slaStatus = (stage != null ? stage.name() : null);
}
@@ -262,12 +283,12 @@ public class SLASummaryBean implements J
this.appType = appType.toString();
}
- public byte getSlaProcessed() {
- return slaProcessed;
+ public byte getEventProcessed() {
+ return eventProcessed;
}
- public void setSlaProcessed(int slaProcessed) {
- this.slaProcessed = (byte) slaProcessed;
+ public void setEventProcessed(int eventProcessed) {
+ this.eventProcessed = (byte)eventProcessed;
}
public Date getLastModifiedTime() {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAEmailEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAEmailEventListener.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAEmailEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAEmailEventListener.java Mon Jun 10 04:37:20 2013
@@ -309,9 +309,7 @@ public class SLAEmailEventListener exten
printField(body, EmailField.EXPECTED_END_TIME.toString(), event.getExpectedEnd(), false);
printField(body, EmailField.ACTUAL_END_TIME.toString(), event.getActualEnd(), false);
printField(body, EmailField.EXPECTED_DURATION.toString(), event.getExpectedDuration(), false);
- long actualDuration = (event.getActualEnd() != null && event.getActualStart() != null) ? (event.getActualEnd()
- .getTime() - event.getActualStart().getTime()) / (1000 * 60) : -1;
- printField(body, EmailField.ACTUAL_DURATION.toString(), actualDuration, false);
+ printField(body, EmailField.ACTUAL_DURATION.toString(), event.getActualDuration(), false);
try {
msg.setText(body.toString());
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java Mon Jun 10 04:37:20 2013
@@ -18,18 +18,25 @@
package org.apache.oozie.sla.service;
import java.util.Date;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLACalcStatus;
import org.apache.oozie.sla.SLACalculator;
import org.apache.oozie.sla.SLACalculatorMemory;
import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.sla.SLASummaryBean;
import org.apache.oozie.util.XLog;
import com.google.common.annotations.VisibleForTesting;
@@ -39,11 +46,13 @@ public class SLAService implements Servi
public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl";
public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events";
+ public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after";
private static SLACalculator calcImpl;
private static boolean slaEnabled = false;
private EventHandlerService eventHandler;
public static XLog LOG;
+ private JPAService jpaService;
@Override
public void init(Services services) throws ServiceException {
@@ -130,6 +139,21 @@ public class SLAService implements Servi
return false;
}
+ public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
+ try {
+ if (calcImpl.updateRegistration(reg.getId(), reg)) {
+ return true;
+ }
+ else {
+ LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId());
+ }
+ }
+ catch (JPAExecutorException ex) {
+ LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex);
+ }
+ return false;
+ }
+
public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime)
throws ServiceException {
try {
Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Mon Jun 10 04:37:20 2013
@@ -1905,4 +1905,12 @@
</description>
</property>
+ <property>
+ <name>oozie.sla.service.SLAService.events.modified.after</name>
+ <value>7</value>
+ <description> Retrive records which have been modified in last x amount of days (x is 7 by default)
+ </description>
+ </property>
+
+
</configuration>
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java Mon Jun 10 04:37:20 2013
@@ -180,6 +180,8 @@ public class TestJMSSLAEventListener ext
durationMissBean.setExpectedEnd(expectedEndDate);
durationMiss.setActualEnd(actualEndDate);
durationMissBean.setExpectedDuration(expectedDuration);
+ long actualDuration = actualEndDate.getTime() - actualStartDate.getTime();
+ durationMiss.setActualDuration(actualDuration);
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
@@ -202,7 +204,7 @@ public class TestJMSSLAEventListener ext
assertEquals(expectedEndDate, durationMissMsg.getExpectedEndTime());
assertEquals(actualEndDate, durationMissMsg.getActualEndTime());
assertEquals(expectedDuration, durationMissMsg.getExpectedDuration());
- assertEquals((actualEndDate.getTime() - actualStartDate.getTime()) / (1000 * 60), durationMissMsg.getActualDuration());
+ assertEquals(actualDuration, durationMissMsg.getActualDuration());
assertEquals("notification of duration miss", durationMissMsg.getNotificationMessage());
}
@@ -363,6 +365,8 @@ public class TestJMSSLAEventListener ext
durationMetBean.setExpectedEnd(expectedEndDate);
durationMet.setActualEnd(actualEndDate);
durationMetBean.setExpectedDuration(expectedDuration);
+ long actualDuration = actualEndDate.getTime() - actualStartDate.getTime();
+ durationMet.setActualDuration(actualDuration);
ConnectionContext jmsContext = getConnectionContext();
@@ -386,7 +390,7 @@ public class TestJMSSLAEventListener ext
assertEquals(expectedEndDate, durationMissMsg.getExpectedEndTime());
assertEquals(actualEndDate, durationMissMsg.getActualEndTime());
assertEquals(expectedDuration, durationMissMsg.getExpectedDuration());
- assertEquals((actualEndDate.getTime() - actualStartDate.getTime()) / (1000 * 60), durationMissMsg.getActualDuration());
+ assertEquals(actualDuration, durationMissMsg.getActualDuration());
assertEquals("notification of duration met", durationMissMsg.getNotificationMessage());
}
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java Mon Jun 10 04:37:20 2013
@@ -183,7 +183,6 @@ public class TestV2SLAServlet extends Da
bean.setActualEnd(actualEnd.getTime());
bean.setExpectedDuration(10);
bean.setActualDuration(15);
- bean.setSlaProcessed((byte) 1);
bean.setUser("testuser");
bean.setLastModifiedTime(currentTime);
list.add(bean);
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -101,7 +101,6 @@ public class TestSLACalculationJPAExecut
assertEquals(actStart, sBean.getActualStart());
assertEquals(actEnd, sBean.getActualEnd());
assertEquals(2000, sBean.getActualDuration());
- assertEquals(1, sBean.getSlaProcessed());
assertEquals(actEnd, sBean.getLastModifiedTime());
}
@@ -153,7 +152,6 @@ public class TestSLACalculationJPAExecut
assertEquals(actStart, sBean.getActualStart());
assertEquals(newDate, sBean.getActualEnd());
assertEquals(2000, sBean.getActualDuration());
- assertEquals(1, sBean.getSlaProcessed());
assertEquals(newDate, sBean.getLastModifiedTime());
}
@@ -183,7 +181,6 @@ public class TestSLACalculationJPAExecut
bean1 = new SLASummaryBean();
bean1.setJobId(wfId1);
bean1.setActualEnd(newDate);
- bean1.setSlaProcessed(1);
List<JsonBean> updateList = new ArrayList<JsonBean>();
updateList.add(bean1);
@@ -208,7 +205,6 @@ public class TestSLACalculationJPAExecut
SLASummaryGetJPAExecutor readCmd = new SLASummaryGetJPAExecutor(wfId1);
SLASummaryBean sBean = jpaService.execute(readCmd);
// isSlaProcessed should NOT be changed to 1
- assertFalse(sBean.getSlaProcessed() == 1);
// actualEnd should be null as before
assertNull(sBean.getActualEnd());
@@ -234,7 +230,6 @@ public class TestSLACalculationJPAExecut
bean.setActualStart(aStart);
bean.setActualEnd(aEnd);
bean.setActualDuration(aDur);
- bean.setSlaProcessed((byte) slaProc);
bean.setLastModifiedTime(lastMod);
return bean;
}