You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2014/07/11 22:02:29 UTC

git commit: OOZIE-1911 SLA calculation in HA mode does wrong bit comparison for 'start' and 'duration' (mona)

Repository: oozie
Updated Branches:
  refs/heads/master bdc41b258 -> d5b13db2b


OOZIE-1911 SLA calculation in HA mode does wrong bit comparison for 'start' and 'duration' (mona)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5b13db2
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5b13db2
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5b13db2

Branch: refs/heads/master
Commit: d5b13db2b17578a13afcab7de267ce48000410cd
Parents: bdc41b2
Author: mona <ch...@yahoo-inc.com>
Authored: Fri Jul 11 13:02:22 2014 -0700
Committer: mona <ch...@yahoo-inc.com>
Committed: Fri Jul 11 13:02:22 2014 -0700

----------------------------------------------------------------------
 .../oozie/service/EventHandlerService.java      |  27 ++-
 .../org/apache/oozie/sla/SLACalcStatus.java     |  22 ++-
 .../apache/oozie/sla/SLACalculatorMemory.java   | 176 ++++++++++---------
 .../java/org/apache/oozie/util/LogUtils.java    |   6 +
 .../apache/oozie/service/TestHASLAService.java  |  93 +++++++---
 release-log.txt                                 |   1 +
 6 files changed, 208 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
index 4207a07..6c075ab 100644
--- a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
+++ b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
@@ -33,6 +33,7 @@ import org.apache.oozie.event.WorkflowJobEvent;
 import org.apache.oozie.event.listener.JobEventListener;
 import org.apache.oozie.sla.listener.SLAEventListener;
 import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.XLog;
 
 import java.util.ArrayList;
@@ -71,6 +72,7 @@ public class EventHandlerService implements Service {
         try {
             Configuration conf = services.getConf();
             LOG = XLog.getLog(getClass());
+            LOG = XLog.resetPrefix(LOG);
             Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null);
             eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
             eventQueue.init(conf);
@@ -206,7 +208,8 @@ public class EventHandlerService implements Service {
         return listenerMap.toString();
     }
 
-    public void queueEvent(Event event) {
+    public synchronized void queueEvent(Event event) {
+        setLogPrefix(LOG, event);
         LOG.debug("Queueing event : {0}", event);
         LOG.trace("Stack trace while queueing event : {0}", event, new Throwable());
         eventQueue.add(event);
@@ -216,7 +219,24 @@ public class EventHandlerService implements Service {
         return eventQueue;
     }
 
+    private void setLogPrefix(XLog logObj, Event event) {
+        logObj = XLog.resetPrefix(logObj);
+        if (event instanceof JobEvent) {
+            JobEvent je = (JobEvent) event;
+            LogUtils.setLogPrefix(je.getId(), je.getAppName(), new XLog.Info());
+        }
+        else if (event instanceof SLAEvent) {
+            SLAEvent se = (SLAEvent) event;
+            LogUtils.setLogPrefix(se.getId(), se.getAppName(), new XLog.Info());
+        }
+    }
+
     public class EventWorker implements Runnable {
+        private XLog workerLog;
+
+        public EventWorker() {
+            workerLog = XLog.getLog(getClass());
+        }
 
         @Override
         public void run() {
@@ -227,7 +247,10 @@ public class EventHandlerService implements Service {
                 if (!eventQueue.isEmpty()) {
                     List<Event> work = eventQueue.pollBatch();
                     for (Event event : work) {
-                        LOG.debug("Processing event : {0}", event);
+                        synchronized (workerLog) {
+                            setLogPrefix(workerLog, event);
+                            LOG.debug("Processing event : {0}", event);
+                        }
                         MessageType msgType = event.getMsgType();
                         List<?> listeners = listenerMap.get(msgType);
                         if (listeners != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index f148db3..5349b33 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -21,16 +21,13 @@ package org.apache.oozie.sla;
 import java.util.Date;
 
 import org.apache.oozie.AppType;
-import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.event.SLAEvent;
-import org.apache.oozie.command.CommandException;
 import org.apache.oozie.lock.LockToken;
-import org.apache.oozie.service.InstrumentationService;
 import org.apache.oozie.service.JobsConcurrencyService;
 import org.apache.oozie.service.MemoryLocksService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.sla.service.SLAService;
-import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.XLog;
 
 /**
@@ -51,9 +48,12 @@ public class SLACalcStatus extends SLAEvent {
     private byte eventProcessed;
     private LockToken lock;
 
+    private XLog LOG;
+
     public SLACalcStatus(SLARegistrationBean reg) {
         this();
         setSLARegistrationBean(reg);
+        setLogPrefix();
     }
 
     public SLACalcStatus(SLASummaryBean summary, SLARegistrationBean regBean) {
@@ -82,6 +82,7 @@ public class SLACalcStatus extends SLAEvent {
         setEventStatus(summary.getEventStatus());
         setLastModifiedTime(summary.getLastModifiedTime());
         setEventProcessed(summary.getEventProcessed());
+        setLogPrefix();
     }
 
     /**
@@ -98,11 +99,13 @@ public class SLACalcStatus extends SLAEvent {
         setActualEnd(a.getActualEnd());
         setActualDuration(a.getActualDuration());
         setEventProcessed(a.getEventProcessed());
+        setLogPrefix();
     }
 
     public SLACalcStatus() {
         setMsgType(MessageType.SLA);
         setLastModifiedTime(new Date());
+        LOG = XLog.getLog(getClass());
     }
 
     public SLARegistrationBean getSLARegistrationBean() {
@@ -292,10 +295,10 @@ public class SLACalcStatus extends SLAEvent {
         if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
             lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
             if (lock == null) {
-                XLog.getLog(getClass()).debug("Could not aquire lock for [{0}]", getEntityKey());
+            LOG.debug("Could not aquire lock for [{0}]", getEntityKey());
             }
             else {
-                XLog.getLog(getClass()).debug("Acquired lock for [{0}]", getEntityKey());
+                LOG.debug("Acquired lock for [{0}]", getEntityKey());
             }
         }
         else {
@@ -321,11 +324,16 @@ public class SLACalcStatus extends SLAEvent {
         if (lock != null) {
             lock.release();
             lock = null;
-            XLog.getLog(getClass()).debug("Released lock for [{0}]", getEntityKey());
+            LOG.debug("Released lock for [{0}]", getEntityKey());
         }
     }
 
     public long getLockTimeOut() {
         return Services.get().getConf().getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000);
     }
+
+    private void setLogPrefix() {
+        LOG = XLog.resetPrefix(LOG);
+        LogUtils.setLogPrefix(this.getId(), this.getAppName(), new XLog.Info());
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index 47c723d..5b30fc0 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -86,7 +86,7 @@ public class SLACalculatorMemory implements SLACalculator {
     private static final XLog LOG = XLog.getLog(SLACalculatorMemory.class);
     // TODO optimization priority based insertion/processing/bumping up-down
     protected Map<String, SLACalcStatus> slaMap;
-    protected static Set<String> historySet;
+    protected Set<String> historySet;
     private static int capacity;
     private static JPAService jpaService;
     protected EventHandlerService eventHandler;
@@ -477,105 +477,108 @@ public class SLACalculatorMemory implements SLACalculator {
             Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue(
                     SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId);
             byte eventProc = ((Byte) eventProcObj).byteValue();
-            slaCalc.setEventProcessed(eventProc);
-            SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
-            // calculation w.r.t current time and status
-            if ((eventProc & 1) == 0) { // first bit (start-processed) unset
-                if (reg.getExpectedStart() != null) {
-                    if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
-                        confirmWithDB(slaCalc);
-                        eventProc = slaCalc.getEventProcessed();
-                        if (eventProc != 8 && (eventProc & 1) == 0) {
-                            // Some DB exception
-                            slaCalc.setEventStatus(EventStatus.START_MISS);
-                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                            eventProc++;
+            if (eventProc >= 7) {
+                if (eventProc == 7) {
+                    historySet.add(jobId);
+                }
+                slaMap.remove(jobId);
+                LOG.trace("Removed Job [{0}] from map as SLA processed", jobId);
+            }
+            else {
+                slaCalc.setEventProcessed(eventProc);
+                SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
+                // calculation w.r.t current time and status
+                if ((eventProc & 1) == 0) { // first bit (start-processed) unset
+                    if (reg.getExpectedStart() != null) {
+                        if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
+                            confirmWithDB(slaCalc);
+                            eventProc = slaCalc.getEventProcessed();
+                            if (eventProc != 8 && (eventProc & 1) == 0) {
+                                // Some DB exception
+                                slaCalc.setEventStatus(EventStatus.START_MISS);
+                                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+                                eventProc++;
+                            }
+                            change = true;
                         }
+                    }
+                    else {
+                        eventProc++; // disable further processing for optional start sla condition
                         change = true;
                     }
                 }
-                else {
-                    eventProc++; // disable further processing for optional
-                                 // start sla condition
-                    change = true;
-                }
-            }
-            // check if second bit (duration-processed) is unset
-            if (((eventProc >> 1) & 1) == 0 && eventProc != 8) {
-                if (reg.getExpectedDuration() == -1) {
-                    eventProc += 2;
-                    change = true;
+                // check if second bit (duration-processed) is unset
+                if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+                    if (reg.getExpectedDuration() == -1) {
+                        eventProc += 2;
+                        change = true;
+                    }
+                    else if (slaCalc.getActualStart() != null) {
+                        if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
+                                .getActualStart().getTime())) {
+                            slaCalc.setEventProcessed(eventProc);
+                            confirmWithDB(slaCalc);
+                            eventProc = slaCalc.getEventProcessed();
+                            if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+                                // Some DB exception
+                                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+                                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+                                eventProc += 2;
+                            }
+                            change = true;
+                        }
+                    }
                 }
-                else if (slaCalc.getActualStart() != null) {
-                    if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
-                            .getActualStart().getTime())) {
+                if (eventProc < 4) {
+                    if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
                         slaCalc.setEventProcessed(eventProc);
                         confirmWithDB(slaCalc);
                         eventProc = slaCalc.getEventProcessed();
-                        if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
-                            // Some DB exception
-                            slaCalc.setEventStatus(EventStatus.DURATION_MISS);
-                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                            eventProc += 2;
-                        }
                         change = true;
                     }
                 }
-            }
-            if (eventProc < 4) {
-                if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
-                    slaCalc.setEventProcessed(eventProc);
-                    confirmWithDB(slaCalc);
-                    eventProc = slaCalc.getEventProcessed();
-                    change = true;
-                }
-            }
-            if (change) {
-                try {
-                    boolean locked = true;
-                    slaCalc.acquireLock();
-                    locked = slaCalc.isLocked();
-                    if (locked) {
-                        // no more processing, no transfer to history set
-                        if (slaCalc.getEventProcessed() >= 8) {
-                            eventProc = 8;
-                            // Should not be > 8. But to handle any corner cases
-                            slaCalc.setEventProcessed(8);
-                            slaMap.remove(jobId);
-                        }
-                        else {
-                            slaCalc.setEventProcessed(eventProc);
-                        }
-                        SLASummaryBean slaSummaryBean = new SLASummaryBean();
-                        slaSummaryBean.setId(slaCalc.getId());
-                        slaSummaryBean.setEventProcessed(eventProc);
-                        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
-                        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
-                        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
-                        slaSummaryBean.setActualStart(slaCalc.getActualStart());
-                        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
-                        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
-                        slaSummaryBean.setLastModifiedTime(new Date());
-                        SLASummaryQueryExecutor.getInstance().executeUpdate(
-                                SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
-                        if (eventProc == 7) {
-                            historySet.add(jobId);
-                            slaMap.remove(jobId);
-                            LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
+                if (change) {
+                    try {
+                        boolean locked = true;
+                        slaCalc.acquireLock();
+                        locked = slaCalc.isLocked();
+                        if (locked) {
+                            // no more processing, no transfer to history set
+                            if (slaCalc.getEventProcessed() >= 8) {
+                                eventProc = 8;
+                                // Should not be > 8. But to handle any corner cases
+                                slaCalc.setEventProcessed(8);
+                                slaMap.remove(jobId);
+                            }
+                            else {
+                                slaCalc.setEventProcessed(eventProc);
+                            }
+                            SLASummaryBean slaSummaryBean = new SLASummaryBean();
+                            slaSummaryBean.setId(slaCalc.getId());
+                            slaSummaryBean.setEventProcessed(eventProc);
+                            slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+                            slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+                            slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+                            slaSummaryBean.setActualStart(slaCalc.getActualStart());
+                            slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+                            slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+                            slaSummaryBean.setLastModifiedTime(new Date());
+                            SLASummaryQueryExecutor.getInstance().executeUpdate(
+                                    SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
+                            if (eventProc == 7) {
+                                historySet.add(jobId);
+                                slaMap.remove(jobId);
+                                LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
+                            }
                         }
                     }
+                    catch (InterruptedException e) {
+                        throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut());
+                    }
+                    finally {
+                        slaCalc.releaseLock();
+                    }
                 }
-                catch (InterruptedException e) {
-                    throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut());
-                }
-                finally {
-                    slaCalc.releaseLock();
-                }
-            }
-            else if (eventProc >= 7) {
-                historySet.add(jobId);
-                slaMap.remove(jobId);
-                LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
             }
         }
     }
@@ -965,7 +968,6 @@ public class SLACalculatorMemory implements SLACalculator {
                     CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId()));
                     if (ca.isTerminalWithFailure()) {
                         isEndMiss = ended = true;
-                        slaCalc.setActualStart(null);
                         slaCalc.setActualEnd(ca.getLastModifiedTime());
                     }
                     if (ca.getExternalId() != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/LogUtils.java b/core/src/main/java/org/apache/oozie/util/LogUtils.java
index fd5b5b6..723ac36 100644
--- a/core/src/main/java/org/apache/oozie/util/LogUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/LogUtils.java
@@ -125,4 +125,10 @@ public class LogUtils {
         XLog.Info.get().setParameters(logInfo);
     }
 
+    public static void setLogPrefix(String jobId, String appName, XLog.Info logInfo) {
+        logInfo.setParameter(DagXLogInfoService.JOB, jobId);
+        logInfo.setParameter(DagXLogInfoService.APP, appName);
+        XLog.Info.get().setParameters(logInfo);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
index eec5369..419e98b 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
@@ -30,14 +30,15 @@ import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.event.SLAEvent;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.SLAEvent;
 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
 import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.event.EventQueue;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
@@ -164,27 +165,26 @@ public class TestHASLAService extends ZKXTestCase {
 
     public void testSLAUpdateWithHA() throws Exception {
 
-        String id1 = "0000000-130521183438837-oozie-test-C@1";
-        String id2 = "0000001-130521183438837-oozie-test-C@1";
-        String id3 = "0000002-130521183438837-oozie-test-C@1";
-        String id4 = "0000003-130521183438837-oozie-test-C@1";
-        String id5 = "0000004-130521183438837-oozie-test-C@1";
-        String id6 = "0000005-130521183438837-oozie-test-C@1";
-        Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000);
+        String id1 = "0000001-130521183438837-oozie-test-C@1";
+        String id2 = "0000002-130521183438837-oozie-test-C@1";
+        String id3 = "0000003-130521183438837-oozie-test-C@1";
+        String id4 = "0000004-130521183438837-oozie-test-C@1";
+        String id5 = "0000005-130521183438837-oozie-test-C@1";
+        String id6 = "0000006-130521183438837-oozie-test-C@1";
+        Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // 2 hrs passed
         Date expectedEndTS1 = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // 1 hour ahead
         Date expectedEndTS2 = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // 1 hour passed
-        // Coord Action 1-4 not started yet
+        // Coord Action of jobs 1-4 not started yet
         createDBEntry(id1, expectedStartTS, expectedEndTS1);
         createDBEntry(id2, expectedStartTS, expectedEndTS1);
         createDBEntry(id3, expectedStartTS, expectedEndTS1);
         createDBEntry(id4, expectedStartTS, expectedEndTS1);
-        // Coord Action 5-6 already started and currently running (to test history set)
+        // Coord Action of jobs 5-6 already started and currently running (to test history set)
         createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2);
         createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2);
 
         SLAService slas = Services.get().get(SLAService.class);
         SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
-        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
         slaCalcMem.init(Services.get().getConf());
         List<String> slaMapKeys = new ArrayList<String>();
         Iterator<String> itr = slaCalcMem.iterator();
@@ -209,14 +209,14 @@ public class TestHASLAService extends ZKXTestCase {
             }
             assertEquals(6, slaMapKeys.size());
 
-            // Coord Action 1,3 run and update status on non-dummy server
+            // Coord Action 1,3 run and update status on *non-dummy* server
             updateCoordAction(id1, "RUNNING");
             slaCalcMem
                     .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null);
             updateCoordAction(id3, "FAILED");
             slaCalcMem.addJobStatus(id3, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, null, new Date());
 
-            // Coord Action 2,4 run and update status on dummy server
+            // Coord Action 2,4 run and update status on *dummy* server
             updateCoordAction(id2, "RUNNING");
             dummySlaCalcMem.addJobStatus(id2, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
                     null);
@@ -244,10 +244,10 @@ public class TestHASLAService extends ZKXTestCase {
 
             Byte eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
                     SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id3);
-            assertEquals(8, eventProc.intValue());
+            assertEquals(8, eventProc.byteValue());
             eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
                     SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id4);
-            assertEquals(8, eventProc.intValue());
+            assertEquals(8, eventProc.byteValue());
 
             // Action 5 was processed as END_MISS in updateAllSlaStatus, put into history set
             assertTrue(slaCalcMem.isJobIdInHistorySet(id5));
@@ -258,10 +258,10 @@ public class TestHASLAService extends ZKXTestCase {
 
             eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
                     SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id5);
-            assertEquals(7, eventProc.intValue());
+            assertEquals(7, eventProc.byteValue());
             eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
                     SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id6);
-            assertEquals(7, eventProc.intValue());
+            assertEquals(7, eventProc.byteValue());
 
             // Action 1 Succeeded on non-dummy server
             updateCoordAction(id1, "SUCCEEDED");
@@ -284,10 +284,10 @@ public class TestHASLAService extends ZKXTestCase {
             assertNull(dummySlaCalcMem.get(id2));
             eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
                     SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id1);
-            assertEquals(8, eventProc.intValue());
+            assertEquals(8, eventProc.byteValue());
             eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
                     SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id2);
-            assertEquals(8, eventProc.intValue());
+            assertEquals(8, eventProc.byteValue());
 
             // Test HistoryPurgeWorker purges Action 5,6 from history set
             updateCoordAction(id5, "SUCCEEDED");
@@ -295,8 +295,59 @@ public class TestHASLAService extends ZKXTestCase {
             assertFalse(slaCalcMem.isJobIdInHistorySet(id5));
             updateCoordAction(id6, "SUCCEEDED");
             dummySlaCalcMem.new HistoryPurgeWorker().run();
-            assertFalse(slaCalcMem.isJobIdInHistorySet(id6));
+            assertFalse(dummySlaCalcMem.isJobIdInHistorySet(id6));
+
+        }
+        finally {
+            if (dummyOozie_1 != null) {
+                dummyOozie_1.teardown();
+            }
+        }
+    }
+
+    public void testNoDuplicateEventsInHA() throws Exception {
+        String id1 = "0000001-130521183438837-oozie-test-C@1";
+        Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // get MISS
+        Date expectedEndTS = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // get MISS
+        createDBEntry(id1, expectedStartTS, expectedEndTS);
+
+        SLAService slas = Services.get().get(SLAService.class);
+        SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
+        slaCalcMem.init(Services.get().getConf()); // loads the job in sla map
+
+        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+        EventQueue ehs_q = ehs.getEventQueue();
 
+        DummyZKOozie dummyOozie_1 = null;
+        try {
+            // start another dummy oozie instance (dummy sla and event handler services)
+            dummyOozie_1 = new DummyZKOozie("a", "http://blah");
+            DummySLACalculatorMemory dummySlaCalcMem = new DummySLACalculatorMemory();
+            dummySlaCalcMem.init(Services.get().getConf());
+            EventHandlerService dummyEhs = new EventHandlerService();
+            dummySlaCalcMem.setEventHandlerService(dummyEhs);
+            dummyEhs.init(Services.get());
+            EventQueue dummyEhs_q = dummyEhs.getEventQueue();
+
+            // Action started on Server 1
+            updateCoordAction(id1, "RUNNING");
+            slaCalcMem
+                    .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null);
+            SLACalcStatus s1 = (SLACalcStatus) ehs_q.poll();
+            assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus());
+
+            // Action ended on Server 2
+            updateCoordAction(id1, "FAILED");
+            dummySlaCalcMem.addJobStatus(id1, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, new Date(
+                    System.currentTimeMillis() - 1800 * 1000),
+                    new Date());
+            dummyEhs_q.poll(); // getting rid of the duration_miss event
+            SLACalcStatus s2 = (SLACalcStatus) dummyEhs_q.poll();
+            assertEquals(SLAStatus.MISS, s2.getSLAStatus());
+
+            slaCalcMem.updateAllSlaStatus();
+            dummySlaCalcMem.updateAllSlaStatus();
+            assertEquals(0, ehs_q.size()); // no dupe event should be created again by Server 1
         }
         finally {
             if (dummyOozie_1 != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a4b456f..81559eb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1911 SLA calculation in HA mode does wrong bit comparison for 'start' and 'duration' (mna)
 OOZIE-1926 make gz blob compression as default (ryota)
 OOZIE-1916 Use Curator leader latch instead of checking the order of Oozie servers (rkanter)
 OOZIE-1886 Queue operation talking longer time (shwethags via rohini)