You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2017/05/08 10:40:38 UTC
[32/37] oozie git commit: OOZIE-2863
SLACalculatorMemory.loadOnRestart causing delay in server start (puru via
satishsaley)
OOZIE-2863 SLACalculatorMemory.loadOnRestart causing delay in server start (puru via satishsaley)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8df784b5
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8df784b5
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8df784b5
Branch: refs/heads/oya
Commit: 8df784b5fb15273af31d8c104f0f7c47c78e82a3
Parents: ee359d7
Author: satishsaley <sa...@apache.org>
Authored: Wed Apr 19 23:11:29 2017 -0700
Committer: satishsaley <sa...@apache.org>
Committed: Wed Apr 19 23:11:29 2017 -0700
----------------------------------------------------------------------
.../org/apache/oozie/sla/SLACalcStatus.java | 65 ++++---
.../apache/oozie/sla/SLACalculatorMemory.java | 192 ++++++++++---------
.../apache/oozie/sla/service/SLAService.java | 5 +
.../apache/oozie/service/TestHASLAService.java | 3 +
.../oozie/sla/TestSLACalculatorMemory.java | 22 ++-
release-log.txt | 1 +
6 files changed, 166 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/8df784b5/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index 3a76dfe..7be16f0 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -36,6 +36,7 @@ public class SLACalcStatus extends SLAEvent {
public static String SLA_ENTITYKEY_PREFIX = "sla-";
private SLARegistrationBean regBean;
+ private SLASummaryBean summary;
private String jobStatus;
private SLAStatus slaStatus;
private EventStatus eventStatus;
@@ -44,6 +45,7 @@ public class SLACalcStatus extends SLAEvent {
private long actualDuration = -1;
private Date lastModifiedTime;
private byte eventProcessed;
+ private String jobId;
private XLog LOG;
@@ -54,27 +56,13 @@ public class SLACalcStatus extends SLAEvent {
}
public SLACalcStatus(SLASummaryBean summary, SLARegistrationBean regBean) {
+ this(summary);
+ updateSLARegistrationBean(regBean);
+ LOG = LogUtils.setLogPrefix(LOG, this);
+ }
+
+ public SLACalcStatus(SLASummaryBean summary) {
this();
- SLARegistrationBean reg = new SLARegistrationBean();
- reg.setNotificationMsg(regBean.getNotificationMsg());
- reg.setUpstreamApps(regBean.getUpstreamApps());
- reg.setAlertContact(regBean.getAlertContact());
- reg.setAlertEvents(regBean.getAlertEvents());
- reg.setJobData(regBean.getJobData());
- if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) {
- reg.addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
- regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT));
- }
- reg.setId(summary.getId());
- reg.setAppType(summary.getAppType());
- reg.setUser(summary.getUser());
- reg.setAppName(summary.getAppName());
- reg.setParentId(summary.getParentId());
- reg.setNominalTime(summary.getNominalTime());
- reg.setExpectedStart(summary.getExpectedStart());
- reg.setExpectedEnd(summary.getExpectedEnd());
- reg.setExpectedDuration(summary.getExpectedDuration());
- setSLARegistrationBean(reg);
setActualStart(summary.getActualStart());
setActualEnd(summary.getActualEnd());
setActualDuration(summary.getActualDuration());
@@ -83,7 +71,8 @@ public class SLACalcStatus extends SLAEvent {
setEventStatus(summary.getEventStatus());
setLastModifiedTime(summary.getLastModifiedTime());
setEventProcessed(summary.getEventProcessed());
- LOG = LogUtils.setLogPrefix(LOG, this);
+ setId(summary.getId());
+ this.summary = summary;
}
/**
@@ -112,17 +101,24 @@ public class SLACalcStatus extends SLAEvent {
return regBean;
}
+ public SLASummaryBean getSLASummaryBean() {
+ return summary;
+ }
+
public void setSLARegistrationBean(SLARegistrationBean slaBean) {
+ if (slaBean != null) {
+ this.jobId = slaBean.getId();
+ }
this.regBean = slaBean;
}
@Override
public String getId() {
- return regBean.getId();
+ return jobId;
}
public void setId(String id) {
- regBean.setId(id);
+ this.jobId = id;
}
@Override
@@ -288,4 +284,27 @@ public class SLACalcStatus extends SLAEvent {
return SLA_ENTITYKEY_PREFIX + this.getId();
}
+ public void updateSLARegistrationBean(SLARegistrationBean slaBean) {
+ SLARegistrationBean reg = new SLARegistrationBean();
+ reg.setNotificationMsg(slaBean.getNotificationMsg());
+ reg.setUpstreamApps(slaBean.getUpstreamApps());
+ reg.setAlertContact(slaBean.getAlertContact());
+ reg.setAlertEvents(slaBean.getAlertEvents());
+ reg.setJobData(slaBean.getJobData());
+ if (slaBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) {
+ reg.addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
+ slaBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT));
+ }
+ reg.setId(summary.getId());
+ reg.setAppType(summary.getAppType());
+ reg.setUser(summary.getUser());
+ reg.setAppName(summary.getAppName());
+ reg.setParentId(summary.getParentId());
+ reg.setNominalTime(summary.getNominalTime());
+ reg.setExpectedStart(summary.getExpectedStart());
+ reg.setExpectedEnd(summary.getExpectedEnd());
+ reg.setExpectedDuration(summary.getExpectedDuration());
+ setSLARegistrationBean(reg);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/8df784b5/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index 347f853..456440a 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -134,42 +134,14 @@ public class SLACalculatorMemory implements SLACalculator {
}
private void loadOnRestart() {
- long slaPendingCount = 0;
- long statusPendingCount = 0;
-
try {
- List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
- modifiedAfter));
+ List<SLASummaryBean> summaryBeans = jpaService
+ .execute(new SLASummaryGetRecordsOnRestartJPAExecutor(modifiedAfter));
for (SLASummaryBean summaryBean : summaryBeans) {
String jobId = summaryBean.getId();
-
- SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
- SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
- SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
-
- // Processed missed jobs
- try {
- SLAXCommandFactory.getSLAEventXCommand(slaCalcStatus).call();
- }
- catch (Throwable e) {
- LOG.error("Error while updating job {0}", slaCalcStatus.getId(), e);
- }
-
- if (slaCalcStatus.getEventProcessed() == 7) {
- historySet.add(jobId);
- statusPendingCount++;
- LOG.debug("Adding job [{0}] to historySet. EventProcessed is [{1}]", slaCalcStatus,
- slaCalcStatus);
- }
- else if (slaCalcStatus.getEventProcessed() < 7) {
- slaMap.put(jobId, slaCalcStatus);
- slaPendingCount++;
- LOG.debug("Adding job [{0}] to slamap. EventProcessed is [{1}]", slaCalcStatus,
- slaCalcStatus);
-
- }
+ slaMap.put(jobId, new SLACalcStatus(summaryBean));
}
- LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount);
+ LOG.info("Loaded {0} SLASummary object after restart", slaMap.size());
}
catch (Exception e) {
LOG.warn("Failed to retrieve SLASummary records on restart", e);
@@ -198,13 +170,24 @@ public class SLACalculatorMemory implements SLACalculator {
return memObj;
}
- private SLACalcStatus getSLACalcStatus(String jobId) throws JPAExecutorException {
+ /**
+ * Get SLACalcStatus from map if SLARegistration is not null, else create a new SLACalcStatus
+ * This function deosn't update slaMap
+ * @param jobId
+ * @return
+ * @throws JPAExecutorException
+ */
+ private SLACalcStatus getOrCreateSLACalcStatus(String jobId) throws JPAExecutorException {
SLACalcStatus memObj;
memObj = slaMap.get(jobId);
- if (memObj == null) {
- memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
- .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get(
- SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
+ // if the request came from immediately after restart don't use map SLACalcStatus.
+ if (memObj == null || memObj.getSLARegistrationBean() == null) {
+ SLARegistrationBean registrationBean = SLARegistrationQueryExecutor.getInstance()
+ .get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
+ SLASummaryBean summaryBean = memObj == null
+ ? SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId)
+ : memObj.getSLASummaryBean();
+ return new SLACalcStatus(summaryBean, registrationBean);
}
return memObj;
}
@@ -235,52 +218,55 @@ public class SLACalculatorMemory implements SLACalculator {
// job might be processed and removed from map by addJobStatus
return;
}
- synchronized (slaCalc) {
- // get eventProcessed on DB for validation in HA
- SLASummaryBean summaryBean = null;
- try {
- summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
- SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
+ boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc);
+ // get eventProcessed on DB for validation in HA
+ SLASummaryBean summaryBean = null;
+ try {
+ summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance())
+ .get(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
+ }
+ catch (JPAExecutorException e) {
+ if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
+ LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId);
+ slaMap.remove(jobId);
+ return;
}
- catch (JPAExecutorException e) {
- if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
- LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId);
- slaMap.remove(jobId);
- return;
- }
- throw e;
+ throw e;
+ }
+ byte eventProc = summaryBean.getEventProcessed();
+ slaCalc.setEventProcessed(eventProc);
+ if (eventProc >= 7) {
+ if (eventProc == 7) {
+ historySet.add(jobId);
}
- byte eventProc = summaryBean.getEventProcessed();
- slaCalc.setEventProcessed(eventProc);
- if (eventProc >= 7) {
- if (eventProc == 7) {
- historySet.add(jobId);
- }
- slaMap.remove(jobId);
- LOG.trace("Removed Job [{0}] from map as SLA processed", jobId);
+ slaMap.remove(jobId);
+ LOG.trace("Removed Job [{0}] from map as SLA processed", jobId);
+ }
+ else {
+ if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
+ // Update last modified time.
+ slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
+ reloadExpectedTimeAndConfig(slaCalc);
+ LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
}
- else {
- if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
- // Update last modified time.
- slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
- reloadExpectedTimeAndConfig(slaCalc);
- LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
+ if (firstCheckAfterRetstart || isChanged(slaCalc)) {
+ LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(),
+ slaCalc.getEventProcessed(), slaCalc.getJobStatus());
+ try {
+ SLAXCommandFactory.getSLAEventXCommand(slaCalc).call();
+ checkEventProc(slaCalc);
}
- if (isChanged(slaCalc)) {
- LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(),
- slaCalc.getEventProcessed(), slaCalc.getJobStatus());
- try {
- SLAXCommandFactory.getSLAEventXCommand(slaCalc).call();
- checkEventProc(slaCalc);
+ catch (XException e) {
+ if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
+ LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId());
+ slaMap.remove(jobId);
}
- catch (XException e) {
- if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
- LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId());
- slaMap.remove(jobId);
+ else {
+ if (firstCheckAfterRetstart) {
+ slaCalc.setSLARegistrationBean(null);
}
}
}
-
}
}
}
@@ -473,7 +459,7 @@ public class SLACalculatorMemory implements SLACalculator {
"Received addJobStatus request for job [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], "
+ "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime);
SLACalcStatus slaCalc = slaMap.get(jobId);
-
+ boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc);
if (slaCalc == null) {
SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
SLARegQuery.GET_SLA_REG_ALL, jobId);
@@ -505,6 +491,9 @@ public class SLACalculatorMemory implements SLACalculator {
checkEventProc(slaCalc);
}
catch (XException e) {
+ if (firstCheckAfterRetstart) {
+ slaCalc.setSLARegistrationBean(null);
+ }
LOG.error(e);
throw new ServiceException(e);
}
@@ -570,12 +559,10 @@ public class SLACalculatorMemory implements SLACalculator {
@SuppressWarnings("rawtypes")
List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
for (String jobId : jobIds) {
- SLACalcStatus slaCalc = getSLACalcStatus(jobId);
- if (slaCalc != null) {
- slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT);
- updateDBSlaConfig(slaCalc, updateList);
- isJobFound = true;
- }
+ SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId);
+ slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT);
+ updateDBSlaConfig(slaCalc, updateList);
+ isJobFound = true;
}
executeBatchQuery(updateList);
return isJobFound;
@@ -593,13 +580,10 @@ public class SLACalculatorMemory implements SLACalculator {
List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
for (String jobId : jobIds) {
- SLACalcStatus slaCalc = getSLACalcStatus(jobId);
- if (slaCalc != null) {
- slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
- Boolean.toString(true));
- updateDBSlaConfig(slaCalc, updateList);
- isJobFound = true;
- }
+ SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId);
+ slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true));
+ updateDBSlaConfig(slaCalc, updateList);
+ isJobFound = true;
}
executeBatchQuery(updateList);
return isJobFound;
@@ -617,12 +601,10 @@ public class SLACalculatorMemory implements SLACalculator {
@SuppressWarnings("rawtypes")
List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) {
- SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFirst());
- if (slaCalc != null) {
- updateParams(slaCalc, jobIdSLAPair.getSecond());
- updateDBSlaExpectedValues(slaCalc, updateList);
- isJobFound = true;
- }
+ SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobIdSLAPair.getFirst());
+ updateParams(slaCalc, jobIdSLAPair.getSecond());
+ updateDBSlaExpectedValues(slaCalc, updateList);
+ isJobFound = true;
}
executeBatchQuery(updateList);
return isJobFound;
@@ -655,4 +637,24 @@ public class SLACalculatorMemory implements SLACalculator {
return childJobIds;
}
+ private boolean checkAndUpdateSLACalcAfterRestart(SLACalcStatus slaCalc) throws JPAExecutorException {
+ if (slaCalc != null && slaCalc.getSLARegistrationBean() == null) {
+ return updateSLARegistartion(slaCalc);
+ }
+ return false;
+ }
+
+ public boolean updateSLARegistartion(SLACalcStatus slaCalc) throws JPAExecutorException {
+ if (slaCalc.getSLARegistrationBean() == null) {
+ synchronized (slaCalc) {
+ if (slaCalc.getSLARegistrationBean() == null) {
+ SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance()
+ .get(SLARegQuery.GET_SLA_REG_ON_RESTART, slaCalc.getId());
+ slaCalc.updateSLARegistrationBean(slaRegBean);
+ return true;
+ }
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/8df784b5/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
index 08cd07e..2d23a22 100644
--- a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
+++ b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
@@ -114,6 +114,11 @@ public class SLAService implements Service {
new SLAWorker(calcImpl).run();
}
+ @VisibleForTesting
+ public void startSLAWorker() {
+ new Thread(new SLAWorker(calcImpl)).start();
+ }
+
private class SLAWorker implements Runnable {
SLACalculator calc;
http://git-wip-us.apache.org/repos/asf/oozie/blob/8df784b5/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
index 3af263e..67fa56e 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
@@ -207,6 +207,7 @@ public class TestHASLAService extends ZKXTestCase {
SLAService slas = Services.get().get(SLAService.class);
SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
slaCalcMem.init(Services.get().getConf());
+ slaCalcMem.updateAllSlaStatus();
List<String> slaMapKeys = new ArrayList<String>();
Iterator<String> itr = slaCalcMem.iterator();
while (itr.hasNext()) {
@@ -226,6 +227,7 @@ public class TestHASLAService extends ZKXTestCase {
dummySlaCalcMem.setEventHandlerService(dummyEhs);
dummyEhs.init(Services.get());
dummySlaCalcMem.init(Services.get().getConf());
+ dummySlaCalcMem.updateAllSlaStatus();
slaMapKeys = new ArrayList<String>();
itr = dummySlaCalcMem.iterator();
while (itr.hasNext()) {
@@ -402,6 +404,7 @@ public class TestHASLAService extends ZKXTestCase {
SLAService slas = Services.get().get(SLAService.class);
SLACalculatorMemory slaCalcMem1 = (SLACalculatorMemory) slas.getSLACalculator();
slaCalcMem1.init(Services.get().get(ConfigurationService.class).getConf());
+ slaCalcMem1.updateAllSlaStatus();
List<String> idList = new ArrayList<String>();
idList.add(id);
slaCalcMem1.disableAlert(idList);
http://git-wip-us.apache.org/repos/asf/oozie/blob/8df784b5/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
index 559e2b3..34011f6 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
@@ -144,6 +144,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory.addRegistration(jobId2, slaRegBean2);
slaCalcMemory.addRegistration(jobId3, slaRegBean3);
+ slaCalcMemory.updateAllSlaStatus();
SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
SLACalcStatus calc2 = slaCalcMemory.get(jobId2);
SLACalcStatus calc3 = slaCalcMemory.get(jobId3);
@@ -190,7 +191,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-
+ slaCalcMemory.updateAllSlaStatus();
SLACalcStatus calc = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(
SLASummaryQuery.GET_SLA_SUMMARY, jobId1), SLARegistrationQueryExecutor.getInstance().get(
@@ -250,7 +251,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
- String jobId1 = slaRegBean1.getId();
+ final String jobId1 = slaRegBean1.getId();
slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
slaCalcMemory.addRegistration(jobId1, slaRegBean1);
@@ -276,6 +277,8 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+ slaCalcMemory.updateAllSlaStatus();
+
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
assertEquals("job-1-W", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
@@ -302,6 +305,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+ slaCalcMemory.updateAllSlaStatus();
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
assertEquals("FAILED", slaSummary.getJobStatus());
@@ -323,8 +327,16 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummaryBean);
- slaCalcMemory = new SLACalculatorMemory();
- slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+ SLAService slaService = Services.get().get(SLAService.class);
+ slaService.startSLAWorker();
+ slaService.addStatusEvent(jobId1, "RUNNING", null, null, null);
+ waitFor(60 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ return SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1)
+ .getEventProcessed() == 7;
+ }
+ });
+
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, slaSummaryBean.getId());
//since job is already running and it's a old job
assertEquals(7, slaSummary.getEventProcessed());
@@ -484,6 +496,8 @@ public class TestSLACalculatorMemory extends XDataTestCase {
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction2);
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+ slaCalcMemory.updateAllSlaStatus();
+
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
slaSummary2 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId2);
http://git-wip-us.apache.org/repos/asf/oozie/blob/8df784b5/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 23aa9ae..bd37dca 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.4.0 release (trunk - unreleased)
+OOZIE-2863 SLACalculatorMemory.loadOnRestart causing delay in server start (puru via satishsaley)
OOZIE-2843 Enhance logging inside ZKLocksService and MemoryLocksService (andras.piros via pbacsko)
OOZIE-2818 Can't overwrite oozie.action.max.output.data on a per-workflow basis (asasvari via pbacsko)
OOZIE-2827 More directly view of the coordinator’s history from perspective of workflow action. (Alonzo Zhou via pbacsko)