You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/06/21 04:05:57 UTC

svn commit: r1495272 [2/2] - in /oozie/trunk: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/command/wf/ core/src/main/java/org...

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java Fri Jun 21 02:05:56 2013
@@ -24,11 +24,20 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.AppType;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.event.SLAEvent;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
 import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.service.EventHandlerService;
@@ -159,8 +168,208 @@ public class TestSLACalculatorMemory ext
         assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
     }
 
+
+    @Test
+    public void testWorkflowJobSLAStatusOnRestart() throws Exception {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+        String jobId1 = slaRegBean1.getId();
+        slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
+        slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
+        slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+        SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
+        calc1.setEventProcessed(1);
+        calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
+        calc1.setJobStatus(WorkflowJob.Status.RUNNING.name());
+        calc1.setLastModifiedTime(new Date());
+        SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
+
+        List<JsonBean> list = new ArrayList<JsonBean>();
+        list.add(slaSummaryBean);
+        jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+
+        // Simulate a lost success event
+        WorkflowJobBean wjb = new WorkflowJobBean();
+        wjb.setId(jobId1);
+        wjb.setStatus(WorkflowJob.Status.SUCCEEDED);
+        wjb.setStartTime(sdf.parse("2012-02-07"));
+        wjb.setEndTime(sdf.parse("2013-02-07"));
+        WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(wjb);
+        jpaService.execute(wfInsertCmd);
+
+        slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+
+        // As job succeeded, it should not be in memory
+        assertEquals(0, slaCalcMemory.size());
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+        assertEquals("job-1", slaSummary.getJobId());
+        assertEquals(8, slaSummary.getEventProcessed());
+        assertEquals(AppType.WORKFLOW_JOB, slaSummary.getAppType());
+        assertEquals("SUCCEEDED", slaSummary.getJobStatus());
+        assertEquals(SLAEvent.SLAStatus.MET, slaSummary.getSLAStatus());
+        assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
+        assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
+        assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(),
+                slaSummary.getActualDuration());
+
+        // Simulate a lost failed event
+        wjb.setStatus(WorkflowJob.Status.FAILED);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
+
+        // Reset the summary Bean
+        calc1.setEventProcessed(1);
+        calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
+        calc1.setJobStatus(WorkflowJob.Status.RUNNING.name());
+        slaSummaryBean = new SLASummaryBean(calc1);
+
+        list = new ArrayList<JsonBean>();
+        list.add(slaSummaryBean);
+        jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+        slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+
+        assertEquals(0, slaCalcMemory.size());
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+        assertEquals("FAILED", slaSummary.getJobStatus());
+        assertEquals(8, slaSummary.getEventProcessed());
+        assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
+        assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
+        assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus());
+
+        // Simulate a lost RUNNING event
+        wjb.setStatus(WorkflowJob.Status.RUNNING);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
+
+        // Reset the summary Bean
+        calc1.setEventProcessed(0);
+        calc1.setSLAStatus(SLAEvent.SLAStatus.NOT_STARTED);
+        calc1.setJobStatus(null);
+        slaSummaryBean = new SLASummaryBean(calc1);
+
+        list = new ArrayList<JsonBean>();
+        list.add(slaSummaryBean);
+        jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+        slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+
+        assertEquals(1, slaCalcMemory.size());
+        SLACalcStatus calc = slaCalcMemory.get(jobId1);
+        assertEquals(1, calc.getEventProcessed());
+        assertEquals("RUNNING", calc.getJobStatus());
+        assertEquals(sdf.parse("2012-02-07"), calc.getActualStart());
+        assertNull(calc.getActualEnd());
+        assertEquals(-1, calc.getActualDuration());
+        assertEquals(SLAEvent.SLAStatus.IN_PROCESS, calc.getSLAStatus());
+    }
+
+    @Test
+    public void testWorkflowActionSLAStatusOnRestart() throws Exception {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_ACTION);
+        String jobId1 = slaRegBean1.getId();
+        slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
+        slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
+        slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+        SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
+        calc1.setEventProcessed(1);
+        calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
+        calc1.setJobStatus(WorkflowAction.Status.RUNNING.name());
+        calc1.setLastModifiedTime(new Date());
+        SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
+
+        List<JsonBean> list = new ArrayList<JsonBean>();
+        list.add(slaSummaryBean);
+        jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+        // Simulate a lost success event
+        WorkflowActionBean wab = new WorkflowActionBean();
+        wab.setId(jobId1);
+        wab.setStatus(WorkflowAction.Status.OK);
+        wab.setStartTime(sdf.parse("2012-02-07"));
+        wab.setEndTime(sdf.parse("2013-02-07"));
+        WorkflowActionInsertJPAExecutor wfInsertCmd = new WorkflowActionInsertJPAExecutor(wab);
+        jpaService.execute(wfInsertCmd);
+
+        slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+
+        // As job succeeded, it should not be in memory
+        assertEquals(0, slaCalcMemory.size());
+        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+        assertEquals("job-1", slaSummary.getJobId());
+        assertEquals(8, slaSummary.getEventProcessed());
+        assertEquals(AppType.WORKFLOW_ACTION, slaSummary.getAppType());
+        assertEquals("OK", slaSummary.getJobStatus());
+        assertEquals(SLAEvent.SLAStatus.MET, slaSummary.getSLAStatus());
+        assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
+        assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
+        assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(),
+                slaSummary.getActualDuration());
+
+    }
+
+    @Test
+    public void testCoordinatorActionSLAStatusOnRestart() throws Exception {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.COORDINATOR_ACTION);
+        String jobId1 = slaRegBean1.getId();
+        slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
+        slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
+        slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+        SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
+        calc1.setEventProcessed(1);
+        calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
+        calc1.setJobStatus(WorkflowAction.Status.RUNNING.name());
+        calc1.setLastModifiedTime(new Date());
+        SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
+
+        List<JsonBean> list = new ArrayList<JsonBean>();
+        list.add(slaSummaryBean);
+        jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+        // Simulate a lost failed event
+        CoordinatorActionBean cab = new CoordinatorActionBean();
+        cab.setId(jobId1);
+        cab.setStatus(CoordinatorAction.Status.FAILED);
+        cab.setLastModifiedTime(sdf.parse("2013-02-07"));
+        cab.setExternalId("wf_job");
+        CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(cab);
+        jpaService.execute(caInsertCmd);
+        WorkflowJobBean wjb = new WorkflowJobBean();
+        wjb.setId("wf_job");
+        wjb.setStartTime(sdf.parse("2012-02-07"));
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
+
+        slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+
+        // As job succeeded, it should not be in memory
+        assertEquals(0, slaCalcMemory.size());
+        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+        assertEquals("job-1", slaSummary.getJobId());
+        assertEquals(8, slaSummary.getEventProcessed());
+        assertEquals(AppType.COORDINATOR_ACTION, slaSummary.getAppType());
+        assertEquals("FAILED", slaSummary.getJobStatus());
+        assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus());
+        assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
+        assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
+        assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(),
+                slaSummary.getActualDuration());
+    }
+
     @Test
-    public void testSLAEvents() throws Exception {
+    public void testSLAEvents1() throws Exception {
         SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
         EventHandlerService ehs = Services.get().get(EventHandlerService.class);
         slaCalcMemory.init(new Configuration(false));
@@ -174,6 +383,7 @@ public class TestSLACalculatorMemory ext
         assertEquals(1, slaCalcMemory.size());
         SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
         assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
+        assertEquals("PREP", slaSummary.getJobStatus());
         slaCalcMemory.updateSlaStatus(jobId);
         assertEquals(2, ehs.getEventQueue().size());
         slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
@@ -187,6 +397,11 @@ public class TestSLACalculatorMemory ext
         assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
                 sdf.parse("2012-01-01"), null);
+        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUSPENDED.toString(), EventStatus.SUSPEND,
+                sdf.parse("2012-01-01"), null);
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        assertEquals(WorkflowJob.Status.SUSPENDED.toString(), slaSummary.getJobStatus());
+        assertEquals(5, slaSummary.getEventProcessed());
         slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
                 sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
 
@@ -205,6 +420,67 @@ public class TestSLACalculatorMemory ext
     }
 
     @Test
+    public void testSLAEvents2() throws Exception {
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+        slaCalcMemory.init(new Configuration(false));
+        SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+        slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
+        slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000));
+
+        String jobId = slaRegBean.getId();
+        slaCalcMemory.addRegistration(jobId, slaRegBean);
+        assertEquals(1, slaCalcMemory.size());
+        slaCalcMemory.updateSlaStatus(jobId);
+        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        // Duration bit should be processed as expected duration is not set
+        assertEquals(3, slaSummary.getEventProcessed());
+        // check only start event in queue
+        assertEquals(1, ehs.getEventQueue().size());
+        ehs.getEventQueue().clear();
+
+        // set back to 1, to make duration event not processed
+        slaSummary.setEventProcessed(1);
+        List<JsonBean> list = new ArrayList<JsonBean>();
+        list.add(slaSummary);
+        jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+                sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        // all should be processed
+        assertEquals(8, slaSummary.getEventProcessed());
+        // check only end event is in queue
+        assertEquals(1, ehs.getEventQueue().size());
+        ehs.getEventQueue().clear();
+
+        slaSummary.setEventProcessed(1);
+        list = new ArrayList<JsonBean>();
+        list.add(slaSummary);
+        jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+        slaRegBean = _createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+        slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
+        slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000));
+
+        jobId = slaRegBean.getId();
+        slaCalcMemory.addRegistration(jobId, slaRegBean);
+        assertEquals(1, slaCalcMemory.size());
+
+        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null,
+                sdf.parse("2012-01-02"));
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        // Actual start null, so all events processed
+        assertEquals(8, slaSummary.getEventProcessed());
+        assertEquals(1, ehs.getEventQueue().size());
+        assertNull(slaSummary.getActualStart());
+        assertEquals(sdf.parse("2012-01-02"), slaSummary.getActualEnd());
+        assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+        assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
+    }
+
+    @Test
     public void testDuplicateStartMiss() throws Exception {
         // test start-miss
         EventHandlerService ehs = Services.get().get(EventHandlerService.class);

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java Fri Jun 21 02:05:56 2013
@@ -216,7 +216,7 @@ public class TestSLAEventGeneration exte
         assertEquals(-1, slaSummary.getActualDuration());
         assertNull(slaSummary.getActualStart());
         assertNull(slaSummary.getActualEnd());
-        assertNull(slaSummary.getJobStatus());
+        assertEquals("PREP", slaSummary.getJobStatus());
         assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
         assertNull(slaEvent.getEventStatus());
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java Fri Jun 21 02:05:56 2013
@@ -128,19 +128,20 @@ public class TestSLAService extends XDat
         assertTrue(output.toString().contains(sla2.getId() + " Sla START - MISS!!!"));
         assertTrue(output.toString().contains(sla2.getId() + " Sla END - MISS!!!"));
         output.setLength(0);
-
+        // As expected duration is not set, duration shall be processed and job removed from map
+        assertEquals(2, slas.getSLACalculator().size());
         // test same job multiple events (start-met, end-met) through job status event
         sla1 = _createSLARegistration("action-1", AppType.COORDINATOR_ACTION);
         sla1.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead
         sla1.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 3600 * 1000)); //2 hours ahead
         slas.addRegistrationEvent(sla1);
-        assertEquals(4, slas.getSLACalculator().size());
+        assertEquals(3, slas.getSLACalculator().size());
         slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
                 new Date());
         slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS,
                 new Date(), new Date());
         slas.runSLAWorker();
-        assertEquals(3, ehs.getEventQueue().size());
+        assertEquals(2, ehs.getEventQueue().size());
         ehs.new EventWorker().run();
         assertTrue(output.toString().contains(sla1.getId() + " Sla START - MET!!!"));
         assertTrue(output.toString().contains(sla1.getId() + " Sla END - MET!!!"));

Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Fri Jun 21 02:05:56 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1424 Improve SLA reliability on restart, fix bugs related to SLA and event generation (virag)
 OOZIE-1417 Exlude **/oozie/store/* **/oozie/examples/* from clover reports (dennisyv via virag)
 OOZIE-1426 Fix bugs in SLA UI (rohini)
 OOZIE-1421 UI for SLA (virag, rohini)