You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/06/10 06:37:23 UTC

svn commit: r1491333 [2/2] - in /oozie/trunk: ./ client/src/main/java/org/apache/oozie/client/event/message/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/command/wf/ core/src/main/java/org/apache/oozie/event/me...

Added: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.sla;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.AppType;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSLACalculatorMemory extends XDataTestCase {
+
+    private JPAService jpaService;
+
+    @Override
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        Services services = new Services();
+        Configuration conf = services.getConf();
+        conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService,"
+                + "org.apache.oozie.sla.service.SLAService");
+        services.init();
+        jpaService = Services.get().get(JPAService.class);
+        cleanUpDBTables();
+    }
+
+    @Override
+    @After
+    protected void tearDown() throws Exception {
+        Services.get().destroy();
+        super.tearDown();
+    }
+
+    @Test
+    public void testLoadOnRestart() throws Exception {
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+        SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+        String jobId1 = slaRegBean1.getId();
+        SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+        String jobId2 = slaRegBean2.getId();
+        SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3", AppType.WORKFLOW_JOB);
+        String jobId3 = slaRegBean3.getId();
+
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+        slaRegBean1.setAppName("app-name");
+        slaRegBean1.setExpectedDuration(123);
+        slaRegBean1.setExpectedEnd(sdf.parse("2012-02-07"));
+        slaRegBean1.setExpectedStart(sdf.parse("2011-02-07"));
+        slaRegBean1.setNominalTime(sdf.parse("2012-01-06"));
+        slaRegBean1.setUser("user");
+        slaRegBean1.setParentId("parentId");
+        slaRegBean1.setUpstreamApps("upstreamApps");
+        slaRegBean1.setNotificationMsg("notificationMsg");
+        slaRegBean1.setAlertContact("a@abc.com");
+        slaRegBean1.setAlertEvents("MISS");
+        slaRegBean1.setJobData("jobData");
+
+        slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+        slaCalcMemory.addRegistration(jobId2, slaRegBean2);
+        slaCalcMemory.addRegistration(jobId3, slaRegBean3);
+
+        SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
+        SLACalcStatus calc2 = slaCalcMemory.get(jobId2);
+        SLACalcStatus calc3 = slaCalcMemory.get(jobId3);
+
+        calc1.setEventProcessed(5);
+        calc2.setEventProcessed(6);
+        calc3.setEventProcessed(7);
+
+        calc1.setEventStatus(SLAEvent.EventStatus.END_MISS);
+        calc1.setSLAStatus(SLAEvent.SLAStatus.MISS);
+        calc1.setJobStatus(WorkflowJob.Status.FAILED.toString());
+        // set last modified time 5 days back
+        Date lastModifiedTime = new Date(System.currentTimeMillis() - 5*24*60*60*1000);
+        calc1.setLastModifiedTime(lastModifiedTime);
+
+        List<JsonBean> list = new ArrayList<JsonBean>();
+        SLASummaryBean bean = new SLASummaryBean(calc1);
+        bean.setActualStart(sdf.parse("2011-03-09"));
+        bean.setActualEnd(sdf.parse("2011-03-10"));
+        bean.setActualDuration(456);
+        list.add(bean);
+        list.add(new SLASummaryBean(calc2));
+        list.add(new SLASummaryBean(calc3));
+
+        jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+        slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+
+        assertEquals(2, slaCalcMemory.size());
+
+        SLACalcStatus calc = slaCalcMemory.get(jobId1);
+        assertEquals("job-1", calc.getId());
+        assertEquals(AppType.WORKFLOW_JOB, calc.getAppType());
+        assertEquals("app-name", calc.getAppName());
+        assertEquals(123, calc.getExpectedDuration());
+        assertEquals(sdf.parse("2012-02-07"), calc.getExpectedEnd());
+        assertEquals(sdf.parse("2011-02-07"), calc.getExpectedStart());
+        assertEquals(sdf.parse("2012-01-06"), calc.getNominalTime());
+        assertEquals("user", calc.getUser());
+        assertEquals("parentId", calc.getParentId());
+        assertEquals("upstreamApps", calc.getUpstreamApps());
+        assertEquals("notificationMsg", calc.getNotificationMsg());
+        assertEquals("a@abc.com", calc.getAlertContact());
+        assertEquals("MISS", calc.getAlertEvents());
+        assertEquals("jobData", calc.getJobData());
+        assertEquals(sdf.parse("2011-03-09"), calc.getActualStart());
+        assertEquals(sdf.parse("2011-03-10"), calc.getActualEnd());
+        assertEquals(456, calc.getActualDuration());
+        assertEquals(SLAEvent.EventStatus.END_MISS, calc1.getEventStatus());
+        assertEquals(SLAEvent.SLAStatus.MISS, calc1.getSLAStatus());
+        assertEquals(WorkflowJob.Status.FAILED.toString(), calc1.getJobStatus());
+        assertEquals(lastModifiedTime, calc1.getLastModifiedTime());
+
+        assertEquals(5, calc.getEventProcessed());
+        assertEquals(6, slaCalcMemory.get(jobId2).getEventProcessed());
+        // jobId3 should be in history set as eventprocessed is 7 (111)
+        assertNull(slaCalcMemory.get(jobId3));
+        slaCalcMemory.addJobStatus(jobId3, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+                sdf.parse("2011-03-09"), sdf.parse("2011-04-09"));
+        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId3));
+        assertEquals(8, slaSummary.getEventProcessed());
+        assertEquals(sdf.parse("2011-03-09"), slaSummary.getActualStart());
+        assertEquals(sdf.parse("2011-04-09"), slaSummary.getActualEnd());
+        assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
+    }
+
+    @Test
+    public void testSLAEvents() throws Exception {
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+        slaCalcMemory.init(new Configuration(false));
+        SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+        slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1
+        slaRegBean.setExpectedDuration(2 * 3600 * 1000);
+        slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1
+                                                                                               // hour
+        String jobId = slaRegBean.getId();
+        slaCalcMemory.addRegistration(jobId, slaRegBean);
+        assertEquals(1, slaCalcMemory.size());
+        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
+        slaCalcMemory.updateSlaStatus(jobId);
+        assertEquals(2, ehs.getEventQueue().size());
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        // both start miss and end miss (101)
+        assertEquals(5, slaSummary.getEventProcessed());
+        assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
+        assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+
+        assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
+                sdf.parse("2012-01-01"), null);
+        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+                sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
+
+        assertEquals(3, ehs.getEventQueue().size());
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        // All events processed and actual times stored (1000)
+        assertEquals(8, slaSummary.getEventProcessed());
+        assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+        assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
+        assertEquals(SLAEvent.EventStatus.DURATION_MISS, slaSummary.getEventStatus());
+        assertEquals(sdf.parse("2012-01-01").getTime(), slaSummary.getActualStart().getTime());
+        assertEquals(sdf.parse("2012-01-02").getTime(), slaSummary.getActualEnd().getTime());
+        assertEquals(sdf.parse("2012-01-02").getTime() - sdf.parse("2012-01-01").getTime(),
+                slaSummary.getActualDuration());
+        assertEquals(0, slaCalcMemory.size());
+    }
+
+    @Test
+    public void testDuplicateStartMiss() throws Exception {
+        // test start-miss
+        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+        SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+        Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1
+                                                                                     // hour
+                                                                                     // back
+        slaRegBean.setExpectedStart(startTime);
+        slaRegBean.setExpectedDuration(3600 * 1000);
+        slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1
+                                                                                               // hour
+                                                                                               // ahead
+        String jobId = slaRegBean.getId();
+        slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+        slaCalcMemory.updateSlaStatus(jobId);
+        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        assertEquals(1, slaSummary.getEventProcessed());
+        assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
+        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
+                new Date(System.currentTimeMillis()), null);
+        slaCalcMemory.updateSlaStatus(jobId);
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        assertEquals(1, slaSummary.getEventProcessed());
+        assertEquals(SLAStatus.IN_PROCESS, slaSummary.getSLAStatus());
+        assertEquals(WorkflowJob.Status.RUNNING.toString(), slaSummary.getJobStatus());
+        assertEquals(1, ehs.getEventQueue().size());
+    }
+
+    @Test
+    public void testDuplicateEndMiss() throws Exception {
+        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+        SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+        slaCalcMemory.init(new Configuration(false));
+        SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+        Date startTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000); // 1 hour ahead
+        slaRegBean.setExpectedStart(startTime);
+        slaRegBean.setExpectedDuration(3600 * 1000);
+        slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1
+                                                                                               // hour
+                                                                                               // back
+        String jobId = slaRegBean.getId();
+        slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+        slaCalcMemory.updateSlaStatus(jobId);
+        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        // Only end sla should be processed (100)
+        assertEquals(4, slaSummary.getEventProcessed());
+        slaCalcMemory.updateSlaStatus(jobId);
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        assertEquals(4, slaSummary.getEventProcessed());
+        assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+        slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+                new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        // Only Duration sla should be processed as end is already processed
+        // (110)
+        assertEquals(6, slaSummary.getEventProcessed());
+        assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+        // Recieve start event
+        assertTrue(slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
+                new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)));
+        slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        // Start event received so all bits should be processed (111)
+        assertEquals(8, slaSummary.getEventProcessed());
+        assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+        assertEquals(0, slaCalcMemory.size());
+        assertEquals(3, ehs.getEventQueue().size());
+
+    }
+
+    public void testSLAHistorySet() throws Exception {
+            EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+            SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+            slaCalcMemory.init(new Configuration(false));
+            SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+            Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000);
+            slaRegBean.setExpectedStart(startTime); // 1 hour back
+            slaRegBean.setExpectedDuration(1000);
+            slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
+            String jobId = slaRegBean.getId();
+            slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+            slaCalcMemory.updateSlaStatus(jobId);
+            slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
+                    System.currentTimeMillis() - 3600 * 1000), null);
+            slaCalcMemory.updateSlaStatus(jobId);
+            SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+            // The actual end times are not stored, but sla's processed so (111)
+            assertEquals(7, slaSummary.getEventProcessed());
+            // Moved from map to history set
+            assertEquals(0, slaCalcMemory.size());
+            // Add terminal state event so actual end time is stored
+            slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, new Date(
+                    System.currentTimeMillis() - 3600 * 1000), new Date(System.currentTimeMillis()));
+            slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+            // The actual times are stored, so event processed(1000)
+            assertEquals(8, slaSummary.getEventProcessed());
+            assertEquals(3, ehs.getEventQueue().size());
+
+    }
+
+    private SLARegistrationBean _createSLARegistration(String jobId, AppType appType) {
+        SLARegistrationBean bean = new SLARegistrationBean();
+        bean.setJobId(jobId);
+        bean.setAppType(appType);
+        return bean;
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java Mon Jun 10 04:37:20 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.AppType;
 import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowJob;
@@ -36,11 +37,15 @@ import org.apache.oozie.command.coord.Co
 import org.apache.oozie.command.coord.CoordKillXCommand;
 import org.apache.oozie.command.coord.CoordSubmitXCommand;
 import org.apache.oozie.command.wf.KillXCommand;
+import org.apache.oozie.command.wf.ReRunXCommand;
 import org.apache.oozie.command.wf.StartXCommand;
 import org.apache.oozie.command.wf.SubmitXCommand;
 import org.apache.oozie.event.listener.JobEventListener;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -121,6 +126,162 @@ public class TestSLAEventGeneration exte
         _testWorkflowJobCommands(conf, ehs, slas, true);
     }
 
+    /**
+     * Test for SLA Events generated through wf rerun
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testWorkflowJobSLARerun() throws Exception {
+        EventHandlerService ehs = services.get(EventHandlerService.class);
+        SLAService slas = services.get(SLAService.class);
+
+        String wfXml = IOUtils.getResourceAsString("wf-job-sla.xml", -1);
+        Path appPath = getFsTestCaseDir();
+        writeToFile(wfXml, appPath, "workflow.xml");
+        Configuration conf = new XConfiguration();
+        conf.set(OozieClient.APP_PATH, appPath.toString());
+        conf.set(OozieClient.USER_NAME, getTestUser());
+
+        cal.setTime(new Date());
+        cal.add(Calendar.MINUTE, -40); // for start_miss
+        Date nominal = cal.getTime();
+        String nominalTime = DateUtils.formatDateOozieTZ(nominal);
+        conf.set("nominal_time", nominalTime);
+        cal.setTime(nominal);
+        cal.add(Calendar.MINUTE, 10); // as per the sla xml
+        String expectedStart = DateUtils.formatDateOozieTZ(cal.getTime());
+        cal.setTime(nominal);
+        cal.add(Calendar.MINUTE, 30); // as per the sla xml
+        String expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime());
+
+        // Call SubmitX
+        SubmitXCommand sc = new SubmitXCommand(conf, "UNIT_TESTING");
+        String jobId = sc.call();
+        SLACalcStatus slaEvent = slas.getSLACalculator().get(jobId);
+        assertEquals(jobId, slaEvent.getId());
+        assertEquals("test-wf-job-sla", slaEvent.getAppName());
+        assertEquals(AppType.WORKFLOW_JOB, slaEvent.getAppType());
+        assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime()));
+        assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart()));
+        assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd()));
+
+        slas.runSLAWorker();
+        slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
+        assertEquals(SLAStatus.NOT_STARTED, slaEvent.getSLAStatus());
+        assertEquals(EventStatus.START_MISS, slaEvent.getEventStatus());
+        slas.getSLACalculator().clear();
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        WorkflowJobBean wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+        // set job status to succeeded, so rerun doesn't fail
+        wfBean.setStatus(WorkflowJob.Status.SUCCEEDED);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfBean));
+
+        // change conf for rerun
+        cal.setTime(new Date());
+        cal.add(Calendar.MINUTE, -20); // for start_miss
+        nominalTime = DateUtils.formatDateOozieTZ(cal.getTime());
+
+        conf.set("nominal_time", nominalTime);
+        nominal = cal.getTime();
+        cal.add(Calendar.MINUTE, 10); // as per the sla xml
+        expectedStart = DateUtils.formatDateOozieTZ(cal.getTime());
+        cal.setTime(nominal);
+        cal.add(Calendar.MINUTE, 30); // as per the sla xml
+        expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime());
+
+        ReRunXCommand rerun = new ReRunXCommand(jobId, conf, "UNIT_TESTING");
+        rerun.call();
+        slaEvent = slas.getSLACalculator().get(jobId);
+        // assert for new conf
+        assertNotNull(slaEvent);
+        assertEquals(jobId, slaEvent.getId());
+        assertEquals("test-wf-job-sla", slaEvent.getAppName());
+        assertEquals(AppType.WORKFLOW_JOB, slaEvent.getAppType());
+
+        // assert for new conf
+        assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime()));
+        assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart()));
+        assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd()));
+
+        // assert for values in summary bean to be reset
+        SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+        assertEquals( 0, slaSummary.getEventProcessed());
+        assertEquals(-1, slaSummary.getActualDuration());
+        assertNull(slaSummary.getActualStart());
+        assertNull(slaSummary.getActualEnd());
+        assertNull(slaSummary.getJobStatus());
+        assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
+        assertNull(slaEvent.getEventStatus());
+
+        ehs.getEventQueue().clear();
+        slas.runSLAWorker();
+        slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
+        assertEquals(SLAStatus.NOT_STARTED, slaEvent.getSLAStatus());
+        assertEquals(EventStatus.START_MISS, slaEvent.getEventStatus());
+
+    }
+
+    /**
+     * Test for SLA Events generated through wf action rerun
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testWorkflowActionSLARerun() throws Exception {
+        SLAService slas = services.get(SLAService.class);
+        String wfXml = IOUtils.getResourceAsString("wf-action-sla.xml", -1);
+        Path appPath = getFsTestCaseDir();
+        writeToFile(wfXml, appPath, "workflow.xml");
+        Configuration conf = new XConfiguration();
+        conf.set(OozieClient.APP_PATH, appPath.toString());
+        conf.set(OozieClient.USER_NAME, getTestUser());
+
+        cal.setTime(new Date());
+        cal.add(Calendar.MINUTE, -20); // for start_miss
+        Date nominal = cal.getTime();
+        String nominalTime = DateUtils.formatDateOozieTZ(nominal);
+        conf.set("nominal_time", nominalTime);
+
+        // Call SubmitX
+        SubmitXCommand sc = new SubmitXCommand(conf, "UNIT_TESTING");
+        String jobId = sc.call();
+        String actionId = jobId+"@grouper";
+
+        slas.getSLACalculator().clear();
+        JPAService jpaService = Services.get().get(JPAService.class);
+        WorkflowJobBean wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+        // set job status to succeeded, so rerun doesn't fail
+        wfBean.setStatus(WorkflowJob.Status.SUCCEEDED);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfBean));
+
+        // change conf for rerun
+        cal.setTime(new Date());
+        nominalTime = DateUtils.formatDateOozieTZ(cal.getTime());
+        conf.set("nominal_time", nominalTime);
+        nominal = cal.getTime();
+        cal.add(Calendar.MINUTE, 10); // as per the sla xml
+        String expectedStart = DateUtils.formatDateOozieTZ(cal.getTime());
+        cal.setTime(nominal);
+        cal.add(Calendar.MINUTE, 30); // as per the sla xml
+        String expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime());
+
+        ReRunXCommand rerun = new ReRunXCommand(jobId, conf, "UNIT_TESTING");
+        rerun.call();
+        SLACalcStatus slaEvent = slas.getSLACalculator().get(actionId);
+        assertNotNull(slaEvent);
+        // assert for action configs
+        assertEquals(actionId, slaEvent.getId());
+        assertEquals("test-wf-action-sla", slaEvent.getAppName());
+        assertEquals(AppType.WORKFLOW_ACTION, slaEvent.getAppType());
+        // assert for new conf
+        assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime()));
+        assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart()));
+        assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd()));
+
+    }
+
     @Test
     public void testSLASchema1BackwardCompatibility() throws Exception {
         EventHandlerService ehs = services.get(EventHandlerService.class);
@@ -192,6 +353,7 @@ public class TestSLAEventGeneration exte
         ehs.getEventQueue().poll(); //ignore the wf-action event generated
         ehs.new EventWorker().run();
         Thread.sleep(300); // time for listeners to run
+        ehs.getEventQueue().poll(); // ignore duration event
         slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
         assertEquals(jobId, slaEvent.getId());
         assertNotNull(slaEvent.getActualEnd());
@@ -278,6 +440,7 @@ public class TestSLAEventGeneration exte
         // test that sla processes the Job Event from Kill command
         new CoordKillXCommand(jobId).call();
         ehs.new EventWorker().run();
+        ehs.getEventQueue().poll(); //ignore duration event
         slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
         assertEquals(actionId, slaEvent.getId());
         assertNotNull(slaEvent.getActualEnd());

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java Mon Jun 10 04:37:20 2013
@@ -91,8 +91,8 @@ public class TestSLAJobEventListener ext
         SLACalcStatus serviceObj = slas.getSLACalculator().get("wf1");
         // check that start sla has been calculated
         assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus());
-        assertEquals(0, serviceObj.getSlaProcessed()); //Job switching to running is only partially
-                                                       //sla processed. so state = zero
+        assertEquals(1, serviceObj.getEventProcessed()); //Job switching to running is only partially
+                                                       //sla processed. so state = 1
 
         job = _createSLARegBean("wa1", AppType.WORKFLOW_ACTION);
         slas.addRegistrationEvent(job);
@@ -114,24 +114,27 @@ public class TestSLAJobEventListener ext
                 "coord-app-name1", actualStart, actualEnd);
         listener.onCoordinatorJobEvent(cje);
 
-        // Since serviceObj is removed from memory after END stage
         SLASummaryBean summary = Services.get().get(JPAService.class).execute(new SLASummaryGetJPAExecutor("cj1"));
-        // check that end sla has been calculated
-        assertEquals(2, summary.getSlaProcessed()); //Job in terminal state has finished
-                                                    //sla processing. so state = 2
+        // check that end and duration sla has been calculated
+        assertEquals(6, summary.getEventProcessed());
+
         assertEquals(EventStatus.END_MET, summary.getEventStatus());
 
         job = _createSLARegBean("ca1", AppType.COORDINATOR_ACTION);
         actualEnd = DateUtils.parseDateUTC("2012-07-22T02:00Z");
         slas.addRegistrationEvent(job);
-        assertEquals(3, slas.getSLACalculator().size());
-        CoordinatorActionEvent cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.KILLED, "user1",
+        assertEquals(4, slas.getSLACalculator().size());
+        CoordinatorActionEvent cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.RUNNING, "user1",
+                "coord-app-name1", null, actualEnd, null);
+        listener.onCoordinatorActionEvent(cae);
+        cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.KILLED, "user1",
                 "coord-app-name1", null, actualEnd, null);
         listener.onCoordinatorActionEvent(cae);
         summary = Services.get().get(JPAService.class).execute(new SLASummaryGetJPAExecutor("ca1"));
-        // check that start sla has been calculated
-        assertEquals(2, summary.getSlaProcessed());
+        // check that all events are processed
+        assertEquals(8, summary.getEventProcessed());
         assertEquals(EventStatus.END_MISS, summary.getEventStatus());
+        assertEquals(3, slas.getSLACalculator().size());
 
     }
 

Added: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.sla;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestSLARegistrationGetRecordsOnRestartJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testSLARegistrationGetRecordsOnRestart() throws Exception {
+        Date current = new Date();
+        final String jobId = "0000000-" + current.getTime() + "-TestSLARegGetRestartJPAExecutor-W";
+        SLARegistrationBean reg = new SLARegistrationBean();
+        reg.setJobId(jobId);
+        reg.setNotificationMsg("dummyMessage");
+        reg.setUpstreamApps("upApps");
+        reg.setAlertEvents("miss");
+        reg.setAlertContact("abc@y.com");
+        reg.setJobData("jobData");
+        JPAService jpaService = Services.get().get(JPAService.class);
+        List<JsonBean> insert = new ArrayList<JsonBean>();
+        insert.add(reg);
+        SLACalculationInsertUpdateJPAExecutor slaInsertCmd = new SLACalculationInsertUpdateJPAExecutor(insert, null);
+        jpaService.execute(slaInsertCmd);
+        assertNotNull(jpaService);
+        SLARegistrationGetOnRestartJPAExecutor readCmd = new SLARegistrationGetOnRestartJPAExecutor(jobId);
+        SLARegistrationBean bean = jpaService.execute(readCmd);
+        assertEquals("dummyMessage", bean.getNotificationMsg());
+        assertEquals ("upApps", bean.getUpstreamApps());
+        assertEquals ("miss", bean.getAlertEvents());
+        assertEquals ("abc@y.com", bean.getAlertContact());
+        assertEquals ("jobData", bean.getJobData());
+    }
+
+}

Added: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.sla;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.oozie.AppType;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestSLASummaryGetOnRestartJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testSLARegistrationGet() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+        SLASummaryBean sla1 = new SLASummaryBean();
+        sla1.setJobId("jobId");
+        sla1.setAppName("appName");
+        sla1.setUser("user");
+        sla1.setParentId("parent");
+        sla1.setEventProcessed(7);
+        // set to 5 days back from now
+        sla1.setLastModifiedTime(new Date(System.currentTimeMillis() - 5*24*60*60*1000));
+
+        SLASummaryBean sla2 = new SLASummaryBean();
+        sla2.setJobId("jobId2");
+        sla2.setEventProcessed(6);
+        // set to long time back
+        sla2.setLastModifiedTime(sdf.parse("2009-06-03"));
+
+        List<JsonBean> insert = new ArrayList<JsonBean>();
+        insert.add(sla1);
+        insert.add(sla2);
+        SLACalculationInsertUpdateJPAExecutor slaInsertCmd = new SLACalculationInsertUpdateJPAExecutor(insert, null);
+        jpaService.execute(slaInsertCmd);
+        // get all records modified in last 7 days
+        SLASummaryGetRecordsOnRestartJPAExecutor slaGetOnRestart = new SLASummaryGetRecordsOnRestartJPAExecutor(7);
+        List<SLASummaryBean> beans = jpaService.execute(slaGetOnRestart);
+        assertEquals(1, beans.size());
+        assertEquals("jobId", beans.get(0).getJobId());
+        assertEquals("appName", beans.get(0).getAppName());
+        assertEquals("user", beans.get(0).getUser());
+        assertEquals("parent", beans.get(0).getParentId());
+        assertEquals(7, beans.get(0).getEventProcessed());
+    }
+
+}

Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Mon Jun 10 04:37:20 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1339 Implement SLA Bootstrap Service and fix bugs in SLACalculator (virag)
 OOZIE-1400 REST API to fetch SLA (rohini)
 OOZIE-1375 Generate Job notification events for Workflow Actions (mona)
 OOZIE-1357 Can't view more than 1000 actions of a coordinator and paging does not work (ryota)