You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2014/07/11 22:02:29 UTC
git commit: OOZIE-1911 SLA calculation in HA mode does wrong bit
comparison for 'start' and 'duration' (mona)
Repository: oozie
Updated Branches:
refs/heads/master bdc41b258 -> d5b13db2b
OOZIE-1911 SLA calculation in HA mode does wrong bit comparison for 'start' and 'duration' (mona)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5b13db2
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5b13db2
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5b13db2
Branch: refs/heads/master
Commit: d5b13db2b17578a13afcab7de267ce48000410cd
Parents: bdc41b2
Author: mona <ch...@yahoo-inc.com>
Authored: Fri Jul 11 13:02:22 2014 -0700
Committer: mona <ch...@yahoo-inc.com>
Committed: Fri Jul 11 13:02:22 2014 -0700
----------------------------------------------------------------------
.../oozie/service/EventHandlerService.java | 27 ++-
.../org/apache/oozie/sla/SLACalcStatus.java | 22 ++-
.../apache/oozie/sla/SLACalculatorMemory.java | 176 ++++++++++---------
.../java/org/apache/oozie/util/LogUtils.java | 6 +
.../apache/oozie/service/TestHASLAService.java | 93 +++++++---
release-log.txt | 1 +
6 files changed, 208 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
index 4207a07..6c075ab 100644
--- a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
+++ b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
@@ -33,6 +33,7 @@ import org.apache.oozie.event.WorkflowJobEvent;
import org.apache.oozie.event.listener.JobEventListener;
import org.apache.oozie.sla.listener.SLAEventListener;
import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.XLog;
import java.util.ArrayList;
@@ -71,6 +72,7 @@ public class EventHandlerService implements Service {
try {
Configuration conf = services.getConf();
LOG = XLog.getLog(getClass());
+ LOG = XLog.resetPrefix(LOG);
Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null);
eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
eventQueue.init(conf);
@@ -206,7 +208,8 @@ public class EventHandlerService implements Service {
return listenerMap.toString();
}
- public void queueEvent(Event event) {
+ public synchronized void queueEvent(Event event) {
+ setLogPrefix(LOG, event);
LOG.debug("Queueing event : {0}", event);
LOG.trace("Stack trace while queueing event : {0}", event, new Throwable());
eventQueue.add(event);
@@ -216,7 +219,24 @@ public class EventHandlerService implements Service {
return eventQueue;
}
+ private void setLogPrefix(XLog logObj, Event event) {
+ logObj = XLog.resetPrefix(logObj);
+ if (event instanceof JobEvent) {
+ JobEvent je = (JobEvent) event;
+ LogUtils.setLogPrefix(je.getId(), je.getAppName(), new XLog.Info());
+ }
+ else if (event instanceof SLAEvent) {
+ SLAEvent se = (SLAEvent) event;
+ LogUtils.setLogPrefix(se.getId(), se.getAppName(), new XLog.Info());
+ }
+ }
+
public class EventWorker implements Runnable {
+ private XLog workerLog;
+
+ public EventWorker() {
+ workerLog = XLog.getLog(getClass());
+ }
@Override
public void run() {
@@ -227,7 +247,10 @@ public class EventHandlerService implements Service {
if (!eventQueue.isEmpty()) {
List<Event> work = eventQueue.pollBatch();
for (Event event : work) {
- LOG.debug("Processing event : {0}", event);
+ synchronized (workerLog) {
+ setLogPrefix(workerLog, event);
+ LOG.debug("Processing event : {0}", event);
+ }
MessageType msgType = event.getMsgType();
List<?> listeners = listenerMap.get(msgType);
if (listeners != null) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index f148db3..5349b33 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -21,16 +21,13 @@ package org.apache.oozie.sla;
import java.util.Date;
import org.apache.oozie.AppType;
-import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.event.SLAEvent;
-import org.apache.oozie.command.CommandException;
import org.apache.oozie.lock.LockToken;
-import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.JobsConcurrencyService;
import org.apache.oozie.service.MemoryLocksService;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.service.SLAService;
-import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.XLog;
/**
@@ -51,9 +48,12 @@ public class SLACalcStatus extends SLAEvent {
private byte eventProcessed;
private LockToken lock;
+ private XLog LOG;
+
public SLACalcStatus(SLARegistrationBean reg) {
this();
setSLARegistrationBean(reg);
+ setLogPrefix();
}
public SLACalcStatus(SLASummaryBean summary, SLARegistrationBean regBean) {
@@ -82,6 +82,7 @@ public class SLACalcStatus extends SLAEvent {
setEventStatus(summary.getEventStatus());
setLastModifiedTime(summary.getLastModifiedTime());
setEventProcessed(summary.getEventProcessed());
+ setLogPrefix();
}
/**
@@ -98,11 +99,13 @@ public class SLACalcStatus extends SLAEvent {
setActualEnd(a.getActualEnd());
setActualDuration(a.getActualDuration());
setEventProcessed(a.getEventProcessed());
+ setLogPrefix();
}
public SLACalcStatus() {
setMsgType(MessageType.SLA);
setLastModifiedTime(new Date());
+ LOG = XLog.getLog(getClass());
}
public SLARegistrationBean getSLARegistrationBean() {
@@ -292,10 +295,10 @@ public class SLACalcStatus extends SLAEvent {
if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
if (lock == null) {
- XLog.getLog(getClass()).debug("Could not aquire lock for [{0}]", getEntityKey());
+ LOG.debug("Could not aquire lock for [{0}]", getEntityKey());
}
else {
- XLog.getLog(getClass()).debug("Acquired lock for [{0}]", getEntityKey());
+ LOG.debug("Acquired lock for [{0}]", getEntityKey());
}
}
else {
@@ -321,11 +324,16 @@ public class SLACalcStatus extends SLAEvent {
if (lock != null) {
lock.release();
lock = null;
- XLog.getLog(getClass()).debug("Released lock for [{0}]", getEntityKey());
+ LOG.debug("Released lock for [{0}]", getEntityKey());
}
}
public long getLockTimeOut() {
return Services.get().getConf().getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000);
}
+
+ private void setLogPrefix() {
+ LOG = XLog.resetPrefix(LOG);
+ LogUtils.setLogPrefix(this.getId(), this.getAppName(), new XLog.Info());
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index 47c723d..5b30fc0 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -86,7 +86,7 @@ public class SLACalculatorMemory implements SLACalculator {
private static final XLog LOG = XLog.getLog(SLACalculatorMemory.class);
// TODO optimization priority based insertion/processing/bumping up-down
protected Map<String, SLACalcStatus> slaMap;
- protected static Set<String> historySet;
+ protected Set<String> historySet;
private static int capacity;
private static JPAService jpaService;
protected EventHandlerService eventHandler;
@@ -477,105 +477,108 @@ public class SLACalculatorMemory implements SLACalculator {
Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue(
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId);
byte eventProc = ((Byte) eventProcObj).byteValue();
- slaCalc.setEventProcessed(eventProc);
- SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
- // calculation w.r.t current time and status
- if ((eventProc & 1) == 0) { // first bit (start-processed) unset
- if (reg.getExpectedStart() != null) {
- if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
- confirmWithDB(slaCalc);
- eventProc = slaCalc.getEventProcessed();
- if (eventProc != 8 && (eventProc & 1) == 0) {
- // Some DB exception
- slaCalc.setEventStatus(EventStatus.START_MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- eventProc++;
+ if (eventProc >= 7) {
+ if (eventProc == 7) {
+ historySet.add(jobId);
+ }
+ slaMap.remove(jobId);
+ LOG.trace("Removed Job [{0}] from map as SLA processed", jobId);
+ }
+ else {
+ slaCalc.setEventProcessed(eventProc);
+ SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
+ // calculation w.r.t current time and status
+ if ((eventProc & 1) == 0) { // first bit (start-processed) unset
+ if (reg.getExpectedStart() != null) {
+ if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
+ confirmWithDB(slaCalc);
+ eventProc = slaCalc.getEventProcessed();
+ if (eventProc != 8 && (eventProc & 1) == 0) {
+ // Some DB exception
+ slaCalc.setEventStatus(EventStatus.START_MISS);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ eventProc++;
+ }
+ change = true;
}
+ }
+ else {
+ eventProc++; // disable further processing for optional start sla condition
change = true;
}
}
- else {
- eventProc++; // disable further processing for optional
- // start sla condition
- change = true;
- }
- }
- // check if second bit (duration-processed) is unset
- if (((eventProc >> 1) & 1) == 0 && eventProc != 8) {
- if (reg.getExpectedDuration() == -1) {
- eventProc += 2;
- change = true;
+ // check if second bit (duration-processed) is unset
+ if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+ if (reg.getExpectedDuration() == -1) {
+ eventProc += 2;
+ change = true;
+ }
+ else if (slaCalc.getActualStart() != null) {
+ if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
+ .getActualStart().getTime())) {
+ slaCalc.setEventProcessed(eventProc);
+ confirmWithDB(slaCalc);
+ eventProc = slaCalc.getEventProcessed();
+ if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+ // Some DB exception
+ slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ eventProc += 2;
+ }
+ change = true;
+ }
+ }
}
- else if (slaCalc.getActualStart() != null) {
- if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
- .getActualStart().getTime())) {
+ if (eventProc < 4) {
+ if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
slaCalc.setEventProcessed(eventProc);
confirmWithDB(slaCalc);
eventProc = slaCalc.getEventProcessed();
- if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
- // Some DB exception
- slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- eventProc += 2;
- }
change = true;
}
}
- }
- if (eventProc < 4) {
- if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
- slaCalc.setEventProcessed(eventProc);
- confirmWithDB(slaCalc);
- eventProc = slaCalc.getEventProcessed();
- change = true;
- }
- }
- if (change) {
- try {
- boolean locked = true;
- slaCalc.acquireLock();
- locked = slaCalc.isLocked();
- if (locked) {
- // no more processing, no transfer to history set
- if (slaCalc.getEventProcessed() >= 8) {
- eventProc = 8;
- // Should not be > 8. But to handle any corner cases
- slaCalc.setEventProcessed(8);
- slaMap.remove(jobId);
- }
- else {
- slaCalc.setEventProcessed(eventProc);
- }
- SLASummaryBean slaSummaryBean = new SLASummaryBean();
- slaSummaryBean.setId(slaCalc.getId());
- slaSummaryBean.setEventProcessed(eventProc);
- slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
- slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
- slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
- slaSummaryBean.setActualStart(slaCalc.getActualStart());
- slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
- slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
- slaSummaryBean.setLastModifiedTime(new Date());
- SLASummaryQueryExecutor.getInstance().executeUpdate(
- SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
- if (eventProc == 7) {
- historySet.add(jobId);
- slaMap.remove(jobId);
- LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
+ if (change) {
+ try {
+ boolean locked = true;
+ slaCalc.acquireLock();
+ locked = slaCalc.isLocked();
+ if (locked) {
+ // no more processing, no transfer to history set
+ if (slaCalc.getEventProcessed() >= 8) {
+ eventProc = 8;
+ // Should not be > 8. But to handle any corner cases
+ slaCalc.setEventProcessed(8);
+ slaMap.remove(jobId);
+ }
+ else {
+ slaCalc.setEventProcessed(eventProc);
+ }
+ SLASummaryBean slaSummaryBean = new SLASummaryBean();
+ slaSummaryBean.setId(slaCalc.getId());
+ slaSummaryBean.setEventProcessed(eventProc);
+ slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+ slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+ slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+ slaSummaryBean.setActualStart(slaCalc.getActualStart());
+ slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+ slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+ slaSummaryBean.setLastModifiedTime(new Date());
+ SLASummaryQueryExecutor.getInstance().executeUpdate(
+ SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
+ if (eventProc == 7) {
+ historySet.add(jobId);
+ slaMap.remove(jobId);
+ LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
+ }
}
}
+ catch (InterruptedException e) {
+ throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut());
+ }
+ finally {
+ slaCalc.releaseLock();
+ }
}
- catch (InterruptedException e) {
- throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut());
- }
- finally {
- slaCalc.releaseLock();
- }
- }
- else if (eventProc >= 7) {
- historySet.add(jobId);
- slaMap.remove(jobId);
- LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
}
}
}
@@ -965,7 +968,6 @@ public class SLACalculatorMemory implements SLACalculator {
CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId()));
if (ca.isTerminalWithFailure()) {
isEndMiss = ended = true;
- slaCalc.setActualStart(null);
slaCalc.setActualEnd(ca.getLastModifiedTime());
}
if (ca.getExternalId() != null) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/LogUtils.java b/core/src/main/java/org/apache/oozie/util/LogUtils.java
index fd5b5b6..723ac36 100644
--- a/core/src/main/java/org/apache/oozie/util/LogUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/LogUtils.java
@@ -125,4 +125,10 @@ public class LogUtils {
XLog.Info.get().setParameters(logInfo);
}
+ public static void setLogPrefix(String jobId, String appName, XLog.Info logInfo) {
+ logInfo.setParameter(DagXLogInfoService.JOB, jobId);
+ logInfo.setParameter(DagXLogInfoService.APP, appName);
+ XLog.Info.get().setParameters(logInfo);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
index eec5369..419e98b 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
@@ -30,14 +30,15 @@ import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.event.EventQueue;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
@@ -164,27 +165,26 @@ public class TestHASLAService extends ZKXTestCase {
public void testSLAUpdateWithHA() throws Exception {
- String id1 = "0000000-130521183438837-oozie-test-C@1";
- String id2 = "0000001-130521183438837-oozie-test-C@1";
- String id3 = "0000002-130521183438837-oozie-test-C@1";
- String id4 = "0000003-130521183438837-oozie-test-C@1";
- String id5 = "0000004-130521183438837-oozie-test-C@1";
- String id6 = "0000005-130521183438837-oozie-test-C@1";
- Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000);
+ String id1 = "0000001-130521183438837-oozie-test-C@1";
+ String id2 = "0000002-130521183438837-oozie-test-C@1";
+ String id3 = "0000003-130521183438837-oozie-test-C@1";
+ String id4 = "0000004-130521183438837-oozie-test-C@1";
+ String id5 = "0000005-130521183438837-oozie-test-C@1";
+ String id6 = "0000006-130521183438837-oozie-test-C@1";
+ Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // 2 hrs passed
Date expectedEndTS1 = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // 1 hour ahead
Date expectedEndTS2 = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // 1 hour passed
- // Coord Action 1-4 not started yet
+ // Coord Action of jobs 1-4 not started yet
createDBEntry(id1, expectedStartTS, expectedEndTS1);
createDBEntry(id2, expectedStartTS, expectedEndTS1);
createDBEntry(id3, expectedStartTS, expectedEndTS1);
createDBEntry(id4, expectedStartTS, expectedEndTS1);
- // Coord Action 5-6 already started and currently running (to test history set)
+ // Coord Action of jobs 5-6 already started and currently running (to test history set)
createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2);
createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2);
SLAService slas = Services.get().get(SLAService.class);
SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
- EventHandlerService ehs = Services.get().get(EventHandlerService.class);
slaCalcMem.init(Services.get().getConf());
List<String> slaMapKeys = new ArrayList<String>();
Iterator<String> itr = slaCalcMem.iterator();
@@ -209,14 +209,14 @@ public class TestHASLAService extends ZKXTestCase {
}
assertEquals(6, slaMapKeys.size());
- // Coord Action 1,3 run and update status on non-dummy server
+ // Coord Action 1,3 run and update status on *non-dummy* server
updateCoordAction(id1, "RUNNING");
slaCalcMem
.addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null);
updateCoordAction(id3, "FAILED");
slaCalcMem.addJobStatus(id3, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, null, new Date());
- // Coord Action 2,4 run and update status on dummy server
+ // Coord Action 2,4 run and update status on *dummy* server
updateCoordAction(id2, "RUNNING");
dummySlaCalcMem.addJobStatus(id2, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
null);
@@ -244,10 +244,10 @@ public class TestHASLAService extends ZKXTestCase {
Byte eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id3);
- assertEquals(8, eventProc.intValue());
+ assertEquals(8, eventProc.byteValue());
eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id4);
- assertEquals(8, eventProc.intValue());
+ assertEquals(8, eventProc.byteValue());
// Action 5 was processed as END_MISS in updateAllSlaStatus, put into history set
assertTrue(slaCalcMem.isJobIdInHistorySet(id5));
@@ -258,10 +258,10 @@ public class TestHASLAService extends ZKXTestCase {
eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id5);
- assertEquals(7, eventProc.intValue());
+ assertEquals(7, eventProc.byteValue());
eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id6);
- assertEquals(7, eventProc.intValue());
+ assertEquals(7, eventProc.byteValue());
// Action 1 Succeeded on non-dummy server
updateCoordAction(id1, "SUCCEEDED");
@@ -284,10 +284,10 @@ public class TestHASLAService extends ZKXTestCase {
assertNull(dummySlaCalcMem.get(id2));
eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id1);
- assertEquals(8, eventProc.intValue());
+ assertEquals(8, eventProc.byteValue());
eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id2);
- assertEquals(8, eventProc.intValue());
+ assertEquals(8, eventProc.byteValue());
// Test HistoryPurgeWorker purges Action 5,6 from history set
updateCoordAction(id5, "SUCCEEDED");
@@ -295,8 +295,59 @@ public class TestHASLAService extends ZKXTestCase {
assertFalse(slaCalcMem.isJobIdInHistorySet(id5));
updateCoordAction(id6, "SUCCEEDED");
dummySlaCalcMem.new HistoryPurgeWorker().run();
- assertFalse(slaCalcMem.isJobIdInHistorySet(id6));
+ assertFalse(dummySlaCalcMem.isJobIdInHistorySet(id6));
+
+ }
+ finally {
+ if (dummyOozie_1 != null) {
+ dummyOozie_1.teardown();
+ }
+ }
+ }
+
+ public void testNoDuplicateEventsInHA() throws Exception {
+ String id1 = "0000001-130521183438837-oozie-test-C@1";
+ Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // get MISS
+ Date expectedEndTS = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // get MISS
+ createDBEntry(id1, expectedStartTS, expectedEndTS);
+
+ SLAService slas = Services.get().get(SLAService.class);
+ SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
+ slaCalcMem.init(Services.get().getConf()); // loads the job in sla map
+
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ EventQueue ehs_q = ehs.getEventQueue();
+ DummyZKOozie dummyOozie_1 = null;
+ try {
+ // start another dummy oozie instance (dummy sla and event handler services)
+ dummyOozie_1 = new DummyZKOozie("a", "http://blah");
+ DummySLACalculatorMemory dummySlaCalcMem = new DummySLACalculatorMemory();
+ dummySlaCalcMem.init(Services.get().getConf());
+ EventHandlerService dummyEhs = new EventHandlerService();
+ dummySlaCalcMem.setEventHandlerService(dummyEhs);
+ dummyEhs.init(Services.get());
+ EventQueue dummyEhs_q = dummyEhs.getEventQueue();
+
+ // Action started on Server 1
+ updateCoordAction(id1, "RUNNING");
+ slaCalcMem
+ .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null);
+ SLACalcStatus s1 = (SLACalcStatus) ehs_q.poll();
+ assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus());
+
+ // Action ended on Server 2
+ updateCoordAction(id1, "FAILED");
+ dummySlaCalcMem.addJobStatus(id1, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, new Date(
+ System.currentTimeMillis() - 1800 * 1000),
+ new Date());
+ dummyEhs_q.poll(); // getting rid of the duration_miss event
+ SLACalcStatus s2 = (SLACalcStatus) dummyEhs_q.poll();
+ assertEquals(SLAStatus.MISS, s2.getSLAStatus());
+
+ slaCalcMem.updateAllSlaStatus();
+ dummySlaCalcMem.updateAllSlaStatus();
+ assertEquals(0, ehs_q.size()); // no dupe event should be created again by Server 1
}
finally {
if (dummyOozie_1 != null) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a4b456f..81559eb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1911 SLA calculation in HA mode does wrong bit comparison for 'start' and 'duration' (mna)
OOZIE-1926 make gz blob compression as default (ryota)
OOZIE-1916 Use Curator leader latch instead of checking the order of Oozie servers (rkanter)
OOZIE-1886 Queue operation talking longer time (shwethags via rohini)